Создание микросервисного приложения на Go по принципу CQRS

Эта статья является пошаговым руководством по разработке упрощенного приложения социальной сети, в которой любой может публиковать анонимные сообщения.

Исходный код доступен на GitHub

Архитектура

Приложение построено с использованием шаблона Command Query Responsibility Segregation (CQRS). Цель состоит в том, чтобы разделить команды и запросы на отдельные сервисы. Команды должны выполнять запись в базу данных, а запросы считывать конечные данные. Такое разделение позволяет независимо масштабировать обе стороны, что дает преимущество, так как операций чтения обычно больше, чем записи. Также это значит, что мы можем иметь разные модели данных для каждого сервиса. Сторона запросов может возвращать данные в материализованых представлениях, которые создаются независимо и асинхронно от командной стороны.

Приложение, описанное в этой статье, называется Meower – социальная сеть для кошек.

Архитектура социальной сети Meower

Здесь есть три сервиса: Pusher, Meow и Query. Сервис Meow обрабатывает командную часть, предоставляя конечную точку в виде HTTP POST для создания «Мяу»-сообщений. Сервис Query слушает события и вставляет сообщения в базу данных Elasticsearch. Он предоставляет конечные точки для чтения сообщений упорядоченных по времени и выполнения полнотекстового поиска. Сервис Pusher отправляет только что созданные сообщения клиентам по протоколу WebSocket.

Заметьте, что сервисы Meow и Query не сильно ограничены, так как используют одну и ту же базу данных. Хоть это и противоречит всей идее, но сделает многие вещи немного проще. Эта структура только для разработки (здесь нет SSL, нет репликаций, все хранилища эфемерны и т. д.).

Предварительные требования

Если вы еще этого не сделали, установите Docker, Go и менеджер зависимостей golang/dep.

Создайте директорию для проекта внутри $GOPATH.

На протяжении этой статьи предполагается, что директорией проекта является github.com/tinrab/meower. Каждый раз, когда вы добавляете пакет в файл с кодом Go, убедитесь, что все обновлено с помощью команды dep ensure.

Утилиты

Для начала, сделаем несколько утилит и фасад для работы со сторонними сервисами.

Создадим папку util и файл util/util.go в ней.

package util
import (
  "encoding/json"
  "net/http"
)
func ResponseOk(w http.ResponseWriter, body interface{}) {
  w.WriteHeader(http.StatusOK)
  w.Header().Set("Content-Type", "application/json")
  json.NewEncoder(w).Encode(body)
}
func ResponseError(w http.ResponseWriter, code int, message string) {
  w.WriteHeader(code)
  w.Header().Set("Content-Type", "application/json")
  body := map[string]string{
    "error": message,
  }
  json.NewEncoder(w).Encode(body)
}

PostgreSQL

Создадим папку schema и файл schema/model.go в ней.

package schema
import (
  "time"
)
type Meow struct {
  ID        string    `json:"id"`
  Body      string    `json:"body"`
  CreatedAt time.Time `json:"created_at"`
}

Создадим папку db и файл db/repository.go в ней.

package db
import (
  "context"
  "github.com/tinrab/meower/schema"
)
type Repository interface {
  Close()
  InsertMeow(ctx context.Context, meow schema.Meow) error
  ListMeows(ctx context.Context, skip uint64, take uint64) ([]schema.Meow, error)
}
var impl Repository
func SetRepository(repository Repository) {
  impl = repository
}
func Close() {
  impl.Close()
}
func InsertMeow(ctx context.Context, meow schema.Meow) error {
  return impl.InsertMeow(ctx, meow)
}
func ListMeows(ctx context.Context, skip uint64, take uint64) ([]schema.Meow, error) {
  return impl.ListMeows(ctx, skip, take)
}

Это простой способ для достижения инверсии управления. Используя интерфейс Repository вы получаете возможность внедрять конкретную реализацию во время выполнения, и все вызовы функций будут делегированы объекту impl.

Вы можете реализовать базу данных в памяти удовлетворяющую интерфейсу Repository и использовать её во время разработки и тестирования.

Создадим файл docker-compose.yaml в корневой директории проекта и определим в нем сервис postgres.

version: "3.6"
services:
  postgres:
    build: "./postgres"
    restart: "always"
    environment:
      POSTGRES_DB: "meower"
      POSTGRES_USER: "meower"
      POSTGRES_PASSWORD: "123456"

Создадим папку postgres и файл postgres/up.sql, который будет содержать определения таблиц.

DROP TABLE IF EXISTS meows;
CREATE TABLE meows (
  id VARCHAR(32) PRIMARY KEY,
  body TEXT NOT NULL,
  created_at TIMESTAMP WITH TIME ZONE NOT NULL
);

Создадим файл postgres/Dockerfile и скопируем файл postgres/up.sql в контейнер как 1.sql.

FROM postgres:10.3
COPY up.sql /docker-entrypoint-initdb.d/1.sql
CMD ["postgres"]

SQL-файлы внутри папки /docker-entrypoint-initdb.d будут выполнены в алфавитном порядке.

Реализуем интерфейс Repository для базы данных PostgreSQL в файле db/postgres.go используя пакет lib/pq.

package db
import (
  "context"
  "database/sql"
  _ "github.com/lib/pq"
  "github.com/tinrab/meower/schema"
)
type PostgresRepository struct {
  db *sql.DB
}
func NewPostgres(url string) (*PostgresRepository, error) {
  db, err := sql.Open("postgres", url)
  if err != nil {
    return nil, err
  }
  return &PostgresRepository{
    db,
  }, nil
}
func (r *PostgresRepository) Close() {
  r.db.Close()
}
func (r *PostgresRepository) InsertMeow(ctx context.Context, meow schema.Meow) error {
  _, err := r.db.Exec("INSERT INTO meows(id, body, created_at) VALUES($1, $2, $3)", meow.ID, meow.Body, meow.CreatedAt)
  return err
}
func (r *PostgresRepository) ListMeows(ctx context.Context, skip uint64, take uint64) ([]schema.Meow, error) {
  rows, err := r.db.Query("SELECT * FROM meows ORDER BY id DESC OFFSET $1 LIMIT $2", skip, take)
  if err != nil {
    return nil, err
  }
  defer rows.Close()
  // Parse all rows into an array of Meows
  meows := []schema.Meow{}
  for rows.Next() {
    meow := schema.Meow{}
    if err = rows.Scan(&meow.ID, &meow.Body, &meow.CreatedAt); err == nil {
      meows = append(meows, meow)
    }
  }
  if err = rows.Err(); err != nil {
    return nil, err
  }
  return meows, nil
}

Сообщения Meow упорядочены по первичному ключу, так как ключи упорядочены по времени. Это избавляет нас от добавления дополнительного индекса.

NATS

Добавим сервис nats в docker-compose.yaml.

services:
  nats:
    image: "nats-streaming:0.9.2"
    restart: "always"

Создадим папку event и файл event/messages.go, который содержит типы сообщений.

package event
import (
  "time"
)
type Message interface {
  Key() string
}
type MeowCreatedMessage struct {
  ID        string
  Body      string
  CreatedAt time.Time
}
func (m *MeowCreatedMessage) Key() string {
  return "meow.created"
}

Похожим способом, как мы делали с доступом к базе данных, определим функционал для работы с хранилищем сообщений в файле event/event.go.

package event
import "github.com/tinrab/meower/schema"
type EventStore interface {
  Close()
  PublishMeowCreated(meow schema.Meow) error
  SubscribeMeowCreated() (<-chan MeowCreatedMessage, error)
  OnMeowCreated(f func(MeowCreatedMessage)) error
}
var impl EventStore
func SetEventStore(es EventStore) {
  impl = es
}
func Close() {
  impl.Close()
}
func PublishMeowCreated(meow schema.Meow) error {
  return impl.PublishMeowCreated(meow)
}
func SubscribeMeowCreated() (<-chan MeowCreatedMessage, error) {
  return impl.SubscribeMeowCreated()
}
func OnMeowCreated(f func(MeowCreatedMessage)) error {
  return impl.OnMeowCreated(f)
}

Теперь реализуем интерфейс EventStore для NATS в файле event/nats.go.

package event
import (
  "bytes"
  "encoding/gob"
  "github.com/nats-io/go-nats"
  "github.com/tinrab/meower/schema"
)
type NatsEventStore struct {
  nc                      *nats.Conn
  meowCreatedSubscription *nats.Subscription
  meowCreatedChan         chan MeowCreatedMessage
}
func NewNats(url string) (*NatsEventStore, error) {
  nc, err := nats.Connect(url)
  if err != nil {
    return nil, err
  }
  return &NatsEventStore{nc: nc}, nil
}
func (e *NatsEventStore) Close() {
  if e.nc != nil {
    e.nc.Close()
  }
  if e.meowCreatedSubscription != nil {
    e.meowCreatedSubscription.Unsubscribe()
  }
  close(e.meowCreatedChan)
}
func (e *NatsEventStore) PublishMeowCreated(meow schema.Meow) error {
  m := MeowCreatedMessage{meow.ID, meow.Body, meow.CreatedAt}
  data, err := e.writeMessage(&m)
  if err != nil {
    return err
  }
  return e.nc.Publish(m.Key(), data)
}
func (mq *NatsEventStore) writeMessage(m Message) ([]byte, error) {
  b := bytes.Buffer{}
  err := gob.NewEncoder(&b).Encode(m)
  if err != nil {
    return nil, err
  }
  return b.Bytes(), nil
}

Здесь существует два различных подхода для реализации функций подписок. При использовании этого API, используйте тот, который больше нравится.

Один из способов – это использовать функцию обратного вызова.

func (e *NatsEventStore) OnMeowCreated(f func(MeowCreatedMessage)) (err error) {
  m := MeowCreatedMessage{}
  e.meowCreatedSubscription, err = e.nc.Subscribe(m.Key(), func(msg *nats.Msg) {
    e.readMessage(msg.Data, &m)
    f(m)
  })
  return
}
func (mq *NatsEventStore) readMessage(data []byte, m interface{}) error {
  b := bytes.Buffer{}
  b.Write(data)
  return gob.NewDecoder(&b).Decode(m)
}

Другой способ – это вернуть канал. Здесь создается промежуточный канал для преобразования сообщений в подходящий тип.

func (e *NatsEventStore) SubscribeMeowCreated() (<-chan MeowCreatedMessage, error) {
  m := MeowCreatedMessage{}
  e.meowCreatedChan = make(chan MeowCreatedMessage, 64)
  ch := make(chan *nats.Msg, 64)
  var err error
  e.meowCreatedSubscription, err = e.nc.ChanSubscribe(m.Key(), ch)
  if err != nil {
    return nil, err
  }
  // Decode message
  go func() {
    for {
      select {
      case msg := <-ch:
        e.readMessage(msg.Data, &m)
        e.meowCreatedChan <- m
      }
    }
  }()
  return (<-chan MeowCreatedMessage)(e.meowCreatedChan), nil
}

Elasticsearch

Обновим файл docker-compose.yaml.

services:
  elasticsearch:
  image: 'docker.elastic.co/elasticsearch/elasticsearch:6.2.3'

Создадим интерфейс хранилища в файле search/repository.go.

package search
import (
  "context"
  "github.com/tinrab/meower/schema"
)
type Repository interface {
  Close()
  InsertMeow(ctx context.Context, meow schema.Meow) error
  SearchMeows(ctx context.Context, query string, skip uint64, take uint64) ([]schema.Meow, error)
}
var impl Repository
func SetRepository(repository Repository) {
  impl = repository
}
func Close() {
  impl.Close()
}
func InsertMeow(ctx context.Context, meow schema.Meow) error {
  return impl.InsertMeow(ctx, meow)
}
func SearchMeows(ctx context.Context, query string, skip uint64, take uint64) ([]schema.Meow, error) {
  return impl.SearchMeows(ctx, query, skip, take)
}

Реализуем этот интерфейс для Elasticsearch в файле search/elastic.go используя пакет olivere/elastic.

package search
import (
  "context"
  "encoding/json"
  "log"
  "github.com/olivere/elastic"
  "github.com/tinrab/meower/schema"
)
type ElasticRepository struct {
  client *elastic.Client
}
func NewElastic(url string) (*ElasticRepository, error) {
  client, err := elastic.NewClient(
    elastic.SetURL(url),
    elastic.SetSniff(false),
  )
  if err != nil {
    return nil, err
  }
  return &ElasticRepository{client}, nil
}
func (r *ElasticRepository) Close() {
}
func (r *ElasticRepository) InsertMeow(ctx context.Context, meow schema.Meow) error {
  _, err := r.client.Index().
    Index("meows").
    Type("meow").
    Id(meow.ID).
    BodyJson(meow).
    Refresh("wait_for").
    Do(ctx)
  return err
}
func (r *ElasticRepository) SearchMeows(ctx context.Context, query string, skip uint64, take uint64) ([]schema.Meow, error) {
  result, err := r.client.Search().
    Index("meows").
    Query(
      elastic.NewMultiMatchQuery(query, "body").
        Fuzziness("3").
        PrefixLength(1).
        CutoffFrequency(0.0001),
    ).
    From(int(skip)).
    Size(int(take)).
    Do(ctx)
  if err != nil {
    return nil, err
  }
  meows := []schema.Meow{}
  for _, hit := range result.Hits.Hits {
    var meow schema.Meow
    if err = json.Unmarshal(*hit.Source, &meow); err != nil {
      log.Println(err)
    }
    meows = append(meows, meow)
  }
  return meows, nil
}

Сервис Meow

Создадим папку meow-service и в файле meow-services/main.go получим значения конфигурации из переменных среды.

package main
import (
  "fmt"
  "log"
  "net/http"
  "time"
  "github.com/gorilla/handlers"
  "github.com/gorilla/mux"
  "github.com/kelseyhightower/envconfig"
  "github.com/tinrab/meower/db"
  "github.com/tinrab/meower/event"
  "github.com/tinrab/retry"
)
type Config struct {
  PostgresDB       string `envconfig:"POSTGRES_DB"`
  PostgresUser     string `envconfig:"POSTGRES_USER"`
  PostgresPassword string `envconfig:"POSTGRES_PASSWORD"`
  NatsAddress      string `envconfig:"NATS_ADDRESS"`
}
func main() {
  var cfg Config
  err := envconfig.Process("", &cfg)
  if err != nil {
    log.Fatal(err)
  }
  // ...
}

Подключимся к PostgreSQL и внедрим репозиторий. Код ниже повторяет соединение каждые 2 секунды с помощью пакета tinrab/retry.

retry.ForeverSleep(2*time.Second, func(attempt int) error {
  addr := fmt.Sprintf("postgres://%s:%s@postgres/%s?sslmode=disable", cfg.PostgresUser, cfg.PostgresPassword, cfg.PostgresDB)
  repo, err := db.NewPostgres(addr)
  if err != nil {
    log.Println(err)
    return err
  }
  db.SetRepository(repo)
  return nil
})
defer db.Close()

Подключимся к NATS.

retry.ForeverSleep(2*time.Second, func(_ int) error {
  es, err := event.NewNats(fmt.Sprintf("nats://%s", cfg.NatsAddress))
  if err != nil {
    log.Println(err)
    return err
  }
  event.SetEventStore(es)
  return nil
})
defer event.Close()

Наконец, запустим HTTP-сервер.

func newRouter() (router *mux.Router) {
  router = mux.NewRouter()
  router.HandleFunc("/meows", createMeowHandler).
    Methods("POST").
    Queries("body", "{body}")
  return
}
func main() {
  // ...
  router := newRouter()
  if err := http.ListenAndServe(":8080", router); err != nil {
    log.Fatal(err)
  }
}

Роутер связывает конечную точку для POST-запросов с обработчиком createMeowHandler. Объявим его в файле meow-service/handlers.go.

package main
import (
  "html/template"
  "log"
  "net/http"
  "time"
  "github.com/segmentio/ksuid"
  "github.com/tinrab/meower/db"
  "github.com/tinrab/meower/event"
  "github.com/tinrab/meower/schema"
  "github.com/tinrab/meower/util"
)
func createMeowHandler(w http.ResponseWriter, r *http.Request) {
  type response struct {
    ID string `json:"id"`
  }
  ctx := r.Context()
  // Read parameters
  body := template.HTMLEscapeString(r.FormValue("body"))
  if len(body) < 1 || len(body) > 140 {
    util.ResponseError(w, http.StatusBadRequest, "Invalid body")
    return
  }
  // Create meow
  createdAt := time.Now().UTC()
  id, err := ksuid.NewRandomWithTime(createdAt)
  if err != nil {
    util.ResponseError(w, http.StatusInternalServerError, "Failed to create meow")
    return
  }
  meow := schema.Meow{
    ID:        id.String(),
    Body:      body,
    CreatedAt: createdAt,
  }
  if err := db.InsertMeow(ctx, meow); err != nil {
    log.Println(err)
    util.ResponseError(w, http.StatusInternalServerError, "Failed to create meow")
    return
  }
  // Publish event
  if err := event.PublishMeowCreated(meow); err != nil {
    log.Println(err)
  }
  // Return new meow
  util.ResponseOk(w, response{ID: meow.ID})
}

Новое сообщение создано, добавлено в базу данных и событие опубликовано.

Сервис Query

Создадим папку query-service и прочитаем переменные конфигурации в файле query-service/main.go.

package main
import (
  "fmt"
  "log"
  "net/http"
  "time"
  "github.com/gorilla/mux"
  "github.com/kelseyhightower/envconfig"
  "github.com/tinrab/meower/db"
  "github.com/tinrab/meower/event"
  "github.com/tinrab/meower/search"
  "github.com/tinrab/retry"
)
type Config struct {
  PostgresDB           string `envconfig:"POSTGRES_DB"`
  PostgresUser         string `envconfig:"POSTGRES_USER"`
  PostgresPassword     string `envconfig:"POSTGRES_PASSWORD"`
  NatsAddress          string `envconfig:"NATS_ADDRESS"`
  ElasticsearchAddress string `envconfig:"ELASTICSEARCH_ADDRESS"`
}
func main() {
  var cfg Config
  err := envconfig.Process("", &cfg)
  if err != nil {
    log.Fatal(err)
  }
  // ...
}

Затем подключимся к PostgreSQL, Elasticsearch и NATS.

// Connect to PostgreSQL
retry.ForeverSleep(2*time.Second, func(attempt int) error {
  addr := fmt.Sprintf("postgres://%s:%s@postgres/%s?sslmode=disable", cfg.PostgresUser, cfg.PostgresPassword, cfg.PostgresDB)
  repo, err := db.NewPostgres(addr)
  if err != nil {
    log.Println(err)
    return err
  }
  db.SetRepository(repo)
  return nil
})
defer db.Close()
// Connect to ElasticSearch
retry.ForeverSleep(2*time.Second, func(_ int) error {
  es, err := search.NewElastic(fmt.Sprintf("http://%s", cfg.ElasticsearchAddress))
  if err != nil {
    log.Println(err)
    return err
  }
  search.SetRepository(es)
  return nil
})
defer search.Close()
// Connect to Nats
retry.ForeverSleep(2*time.Second, func(_ int) error {
  es, err := event.NewNats(fmt.Sprintf("nats://%s", cfg.NatsAddress))
  if err != nil {
    log.Println(err)
    return err
  }
  err = es.OnMeowCreated(onMeowCreated)
  if err != nil {
    log.Println(err)
    return err
  }
  event.SetEventStore(es)
  return nil
})
defer event.Close()

Здесь сервис подписывается на событие OnMeowCreated с помощью функции onMeowCreated.

Запустим HTTP-сервер.

func newRouter() (router *mux.Router) {
  router = mux.NewRouter()
  router.HandleFunc("/meows", listMeowsHandler).
    Methods("GET")
  router.HandleFunc("/search", searchMeowsHandler).
    Methods("GET")
  return
}
func main() {
  // ...
  router := newRouter()
  if err := http.ListenAndServe(":8080", router); err != nil {
    log.Fatal(err)
  }
}

Затем в файле query-service/handlers.go объявим функцию onMeowCreated для вставки сообщений в Elasticsearch когда получено событие OnMeowCreated.

package main
import (
  "context"
  "log"
  "net/http"
  "strconv"
  "github.com/tinrab/meower/db"
  "github.com/tinrab/meower/event"
  "github.com/tinrab/meower/schema"
  "github.com/tinrab/meower/search"
  "github.com/tinrab/meower/util"
)
func onMeowCreated(m event.MeowCreatedMessage) {
  meow := schema.Meow{
    ID:        m.ID,
    Body:      m.Body,
    CreatedAt: m.CreatedAt,
  }
  if err := search.InsertMeow(context.Background(), meow); err != nil {
    log.Println(err)
  }
}

Напишем функцию-обработчик searchMeowsHandler, которая выполняет полнотекстовый поиск и возвращает сообщения ограниченные параметрами skip и take.

func searchMeowsHandler(w http.ResponseWriter, r *http.Request) {
  var err error
  ctx := r.Context()
  // Read parameters
  query := r.FormValue("query")
  if len(query) == 0 {
    util.ResponseError(w, http.StatusBadRequest, "Missing query parameter")
    return
  }
  skip := uint64(0)
  skipStr := r.FormValue("skip")
  take := uint64(100)
  takeStr := r.FormValue("take")
  if len(skipStr) != 0 {
    skip, err = strconv.ParseUint(skipStr, 10, 64)
    if err != nil {
      util.ResponseError(w, http.StatusBadRequest, "Invalid skip parameter")
      return
    }
  }
  if len(takeStr) != 0 {
    take, err = strconv.ParseUint(takeStr, 10, 64)
    if err != nil {
      util.ResponseError(w, http.StatusBadRequest, "Invalid take parameter")
      return
    }
  }
  // Search meows
  meows, err := search.SearchMeows(ctx, query, skip, take)
  if err != nil {
    log.Println(err)
    util.ResponseOk(w, []schema.Meow{})
    return
  }
  util.ResponseOk(w, meows)
}

Напишем обработчик listMeowsHandler, который будет возвращать все сообщения отсортированные по времени их создания.

func listMeowsHandler(w http.ResponseWriter, r *http.Request) {
  ctx := r.Context()
  var err error
  // Read parameters
  skip := uint64(0)
  skipStr := r.FormValue("skip")
  take := uint64(100)
  takeStr := r.FormValue("take")
  if len(skipStr) != 0 {
    skip, err = strconv.ParseUint(skipStr, 10, 64)
    if err != nil {
      util.ResponseError(w, http.StatusBadRequest, "Invalid skip parameter")
      return
    }
  }
  if len(takeStr) != 0 {
    take, err = strconv.ParseUint(takeStr, 10, 64)
    if err != nil {
      util.ResponseError(w, http.StatusBadRequest, "Invalid take parameter")
      return
    }
  }
  // Fetch meows
  meows, err := db.ListMeows(ctx, skip, take)
  if err != nil {
    log.Println(err)
    util.ResponseError(w, http.StatusInternalServerError, "Could not fetch meows")
    return
  }
  util.ResponseOk(w, meows)
}

Сервис Pusher

Создадим папку pusher-service.

Сообщения

Создадим файл pusher-service/messages.go и объявим сообщения, которые будут отправлены по протоколу WebSocket.

package main
import (
  "time"
)
const (
  KindMeowCreated = iota + 1
)
type MeowCreatedMessage struct {
  Kind      uint32    `json:"kind"`
  ID        string    `json:"id"`
  Body      string    `json:"body"`
  CreatedAt time.Time `json:"created_at"`
}
func newMeowCreatedMessage(id string, body string, createdAt time.Time) *MeowCreatedMessage {
  return &MeowCreatedMessage{
    Kind:      KindMeowCreated,
    ID:        id,
    Body:      body,
    CreatedAt: createdAt,
  }
}

Клиент

Создадим файл pusher-service/client.go и объявим структуру, описывающую подключенный клиент.

package main
import "github.com/gorilla/websocket"
type Client struct {
  hub      *Hub
  id       int
  socket   *websocket.Conn
  outbound chan []byte
}
func newClient(hub *Hub, socket *websocket.Conn) *Client {
  return &Client{
    hub:      hub,
    socket:   socket,
    outbound: make(chan []byte),
  }
}
func (client *Client) write() {
  for {
    select {
    case data, ok := <-client.outbound:
      if !ok {
        client.socket.WriteMessage(websocket.CloseMessage, []byte{})
        return
      }
      client.socket.WriteMessage(websocket.TextMessage, data)
    }
  }
}
func (client Client) close() {
  client.socket.Close()
  close(client.outbound)
}

Хаб

Создадим файл pusher-service/hub.go для структуры Hub, которая будет управлять всеми клиентами.

package main
import (
  "encoding/json"
  "log"
  "net/http"
  "sync"
  "github.com/gorilla/websocket"
)
var upgrader = websocket.Upgrader{
  CheckOrigin: func(r *http.Request) bool { return true },
}
type Hub struct {
  clients    []*Client
  nextID     int
  register   chan *Client
  unregister chan *Client
  mutex      *sync.Mutex
}
func newHub() *Hub {
  return &Hub{
    clients:    make([]*Client, 0),
    nextID:     0,
    register:   make(chan *Client),
    unregister: make(chan *Client),
    mutex:      &sync.Mutex{},
  }
}

Напишем функцию Run.

func (hub *Hub) run() {
  for {
    select {
    case client := <-hub.register:
      hub.onConnect(client)
    case client := <-hub.unregister:
      hub.onDisconnect(client)
    }
  }
}

Напишем функцию для отправки сообщений.

func (hub *Hub) broadcast(message interface{}, ignore *Client) {
  data, _ := json.Marshal(message)
  for _, c := range hub.clients {
    if c != ignore {
      c.outbound <- data
    }
  }
}
func (hub *Hub) send(message interface{}, client *Client) {
  data, _ := json.Marshal(message)
  client.outbound <- data
}

Напишем функцию для обновления HTTP-запросов к подключениям WebSocket.

func (hub *Hub) handleWebSocket(w http.ResponseWriter, r *http.Request) {
  socket, err := upgrader.Upgrade(w, r, nil)
  if err != nil {
    log.Println(err)
    http.Error(w, "could not upgrade", http.StatusInternalServerError)
    return
  }
  client := newClient(hub, socket)
  hub.register <- client
  go client.write()
}

Когда клиент подключится, добавим его в список.

func (hub *Hub) onConnect(client *Client) {
  log.Println("client connected: ", client.socket.RemoteAddr())
  // Make new client
  hub.mutex.Lock()
  defer hub.mutex.Unlock()
  client.id = hub.nextID
  hub.nextID++
  hub.clients = append(hub.clients, client)
}

Когда клиент отключится, удалим его из списка.

func (hub *Hub) onDisconnect(client *Client) {
  log.Println("client disconnected: ", client.socket.RemoteAddr())
  client.close()
  hub.mutex.Lock()
  defer hub.mutex.Unlock()
  // Find index of client
  i := -1
  for j, c := range hub.clients {
    if c.id == client.id {
      i = j
      break
    }
  }
  // Delete client from list
  copy(hub.clients[i:], hub.clients[i+1:])
  hub.clients[len(hub.clients)-1] = nil
  hub.clients = hub.clients[:len(hub.clients)-1]
}

Точка входа

Создадим файл pusher-service/main.go.

package main
import (
  "fmt"
  "log"
  "net/http"
  "time"
  "github.com/kelseyhightower/envconfig"
  "github.com/tinrab/meower/event"
  "github.com/tinrab/retry"
)
type Config struct {
  NatsAddress string `envconfig:"NATS_ADDRESS"`
}
func main() {
  var cfg Config
  err := envconfig.Process("", &cfg)
  if err != nil {
    log.Fatal(err)
  }
  // Connect to Nats
  hub := newHub()
  retry.ForeverSleep(2*time.Second, func(_ int) error {
    es, err := event.NewNats(fmt.Sprintf("nats://%s", cfg.NatsAddress))
    if err != nil {
      log.Println(err)
      return err
    }
    // Push messages to clients
    err = es.OnMeowCreated(func(m event.MeowCreatedMessage) {
      log.Printf("Meow received: %v\n", m)
      hub.broadcast(newMeowCreatedMessage(m.ID, m.Body, m.CreatedAt), nil)
    })
    if err != nil {
      log.Println(err)
      return err
    }
    event.SetEventStore(es)
    return nil
  })
  defer event.Close()
  // Run WebSocket server
  go hub.run()
  http.HandleFunc("/pusher", hub.handleWebSocket)
  err = http.ListenAndServe(":8080", nil)
  if err != nil {
    log.Fatal(err)
  }
}

Docker image

Укажем все сервисы и зависимости между ними в файле docker-compose.yaml.

services:
  meow:
    build: "."
    command: "meow-service"
    depends_on:
      - "postgres"
      - "nats"
    ports:
      - "8080"
    environment:
      POSTGRES_DB: "meower"
      POSTGRES_USER: "meower"
      POSTGRES_PASSWORD: "123456"
      NATS_ADDRESS: "nats:4222"
  query:
    build: "."
    command: "query-service"
    depends_on:
      - "postgres"
      - "nats"
    ports:
      - "8080"
    environment:
      POSTGRES_DB: "meower"
      POSTGRES_USER: "meower"
      POSTGRES_PASSWORD: "123456"
      NATS_ADDRESS: "nats:4222"
      ELASTICSEARCH_ADDRESS: "elasticsearch:9200"
  pusher:
    build: "."
    command: "pusher-service"
    depends_on:
      - "nats"
    ports:
      - "8080"
    environment:
      NATS_ADDRESS: "nats:4222"

Создадим Dockerfile в корневой директории проекта. Этот образ строится в два этапа и содержит исполняемые файлы всех сервисов. Смотрите Multi-Stage Docker Builds for Kubernetes для дополнительной информации.

FROM golang:1.10.2-alpine3.7 AS build
RUN apk --no-cache add gcc g++ make ca-certificates  
WORKDIR /go/src/github.com/tinrab/meower
COPY Gopkg.lock Gopkg.toml ./
COPY vendor vendor
COPY util util
COPY event event
COPY db db
COPY search search
COPY schema schema
COPY meow-service meow-service
COPY query-service query-service
COPY pusher-service pusher-service
RUN go install ./...
FROM alpine:3.7
WORKDIR /usr/bin
COPY --from=build /go/bin .  

Обратный прокси

Обратный прокси будет отправлять трафик на фронтенд и обратно.

Обновим файл docker-compose.yaml.

services:
  nginx:
    build: "./nginx"
    ports:
      - "8080:80"
    depends_on:
      - "meow"
      - "query"
      - "pusher"

Создадим папку nginx и файл nginx/Dockerfile.

FROM nginx:1.13.12
COPY nginx.conf /etc/nginx/nginx.conf
CMD ["nginx", "-g", "daemon off;"]

Опишем конфигурацию для NGINX в файле nginx/nginx.conf.

user nginx;
worker_processes 1;
events {
  worker_connections 1024;
}
http {
  upstream meows_POST {
    server meow:8080;
  }
  upstream meows_GET {
    server query:8080;
  }
  upstream search_GET {
    server query:8080;
  }
  upstream pusher {
    server pusher:8080;
  }
  server {
    proxy_set_header X-Real-IP $remote_addr;
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    proxy_set_header Host $http_host;
    add_header Access-Control-Allow-Origin *;
    location /meows {
      limit_except GET POST OPTIONS {
        deny all;
      }
      proxy_pass http://meows_$request_method;
    }
    location /search {
      limit_except GET OPTIONS {
        deny all;
      }
      proxy_pass http://search_GET;
    }
    location /pusher {
      proxy_set_header Upgrade $http_upgrade;
      proxy_set_header Connection "upgrade";
      proxy_pass http://pusher;
    }
  }
}

Фронтенд

Создадим приложение vue с использованием vue-cli 3.0+. Укажем Vuex при выборе функций.

$ vue create frontend
$ cd frontend

Добавим все необходимые зависимости.

$ yarn add bootstrap timeago.js axios vue-native-websocket

Откроем main.js и импортируем файл Bootstrap SCSS.

import 'bootstrap/scss/bootstrap.scss';

Изменим store.js.

import Vue from 'vue';
import Vuex from 'vuex';
import axios from 'axios';
import VueNativeSock from 'vue-native-websocket';
const BACKEND_URL = 'http://localhost:8080';
const PUSHER_URL = 'ws://localhost:8080/pusher';
const SET_MEOWS = 'SET_MEOWS';
const CREATE_MEOW = 'CREATE_MEOW';
const SEARCH_SUCCESS = 'SEARCH_SUCCESS';
const SEARCH_ERROR = 'SEARCH_ERROR';
const MESSAGE_MEOW_CREATED = 1;
Vue.use(Vuex);
const store = new Vuex.Store({
  state: {
    meows: [],
    searchResults: [],
  },
  mutations: {
  },
  actions: {
  },
});
Vue.use(VueNativeSock, PUSHER_URL, { store, format: 'json' });
export default store;

Объявим мутации WebSocket.

mutations: {
  SOCKET_ONOPEN(state, event) {},
  SOCKET_ONCLOSE(state, event) {},
  SOCKET_ONERROR(state, event) {
    console.error(event);
  },
  SOCKET_ONMESSAGE(state, message) {
    switch (message.kind) {
      case MESSAGE_MEOW_CREATED:
        this.commit(CREATE_MEOW, { id: message.id, body: message.body });
    }
  },
  // ...
},

Объявим мутации для обновления сообщений.

mutations: {
  // ...
  [SET_MEOWS](state, meows) {
    state.meows = meows;
  },
  [CREATE_MEOW](state, meow) {
    state.meows = [meow, ...state.meows];
  },
  [SEARCH_SUCCESS](state, meows) {
    state.searchResults = meows;
  },
  [SEARCH_ERROR](state) {
    state.searchResults = [];
  },
},

Добавим действие для получения сообщений.

actions: {
  getMeows({ commit }) {
    axios
      .get(`${BACKEND_URL}/meows`)
      .then(({ data }) => {
        commit(SET_MEOWS, data);
      })
      .catch((err) => console.error(err));
  },
  // ...
},

Добавим действие для создания сообщений.

actions: {
  // ...
  async createMeow({ commit }, meow) {
    const { data } = await axios.post(`${BACKEND_URL}/meows`, null, {
      params: {
        body: meow.body,
      },
    });
  },
  // ...
},

Добавим действие для поиска сообщений.

actions: {
  // ...
  async searchMeows({ commit }, query) {
    if (query.length == 0) {
      commit(SEARCH_SUCCESS, []);
      return;
    }
    axios
      .get(`${BACKEND_URL}/search`, {
        params: { query },
      })
      .then(({ data }) => commit(SEARCH_SUCCESS, data))
      .catch((err) => {
        console.error(err);
        commit(SEARCH_ERROR);
      });
  },
},

В конце файла store.js выполним действие getMeows чтобы получить сообщения после загрузки страницы.

store.dispatch('getMeows');
export default store;

Компонент Meow

Создадим компонент в файле src/components/Meow.vue, который будет отображать одно сообщение.

<template>
  <div class="card">
    <div class="card-body">
      <p class="card-text" v-html="body"></p>
      <p class="card-text">
        <small class="text-muted">
          {{time}}
        </small>
      </p>
    </div>
  </div>
</template>
<script>
import timeago from 'timeago.js';
export default {
  props: ['meow'],
  computed: {
    body() {
      return this.meow.body;
    },
    time() {
      return timeago().format(Date.parse(this.meow.created_at));
    },
  },
};
</script>
<style lang="scss" scoped>
.card {
  margin-bottom: 1rem;
}
.card-body {
  padding: 0.5rem;
  p {
    margin-bottom: 0;
  }
}
</style>

Компонент Timeline

В файле src/components/Timeline.vue определим компонент для отображения списка сообщений и формы для публикации новых.

<template>
  <div>
    <form v-on:submit.prevent="createMeow">
      <div class="input-group">
        <input v-model.trim="meowBody" type="text" class="form-control" placeholder="What's happening?">
        <div class="input-group-append">
          <button class="btn btn-primary" type="submit">Meow</button>
        </div>
      </div>
    </form>
    <div class="mt-4">
      <Meow v-for="meow in meows" :key="meow.id" :meow="meow" />
    </div>
  </div>
</template>
<script>
import { mapState } from 'vuex';
import Meow from '@/components/Meow';
export default {
  data() {
    return {
      meowBody: '',
    };
  },
  computed: mapState({
    meows: (state) => state.meows,
  }),
  methods: {
    createMeow() {
      if (this.meowBody.length != 0) {
        this.$store.dispatch('createMeow', { body: this.meowBody });
        this.meowBody = '';
      }
    },
  },
  components: {
    Meow,
  },
};
</script>

Компонент для поиска опишем в src/components/Search.vue. Он похож на Timeline и будет осуществлять поиск при каждом изменении поля ввода.

<template>
  <div>
    <input @keyup="searchMeows" v-model.trim="query" type="text" class="form-control" placeholder="Search...">
    <div class="mt-4">
      <Meow v-for="meow in meows" :key="meow.id" :meow="meow" />
    </div>
  </div>
</template>
<script>
import { mapState } from 'vuex';
import Meow from '@/components/Meow';
export default {
  data() {
    return {
      query: '',
    };
  },
  computed: mapState({
    meows: (state) => state.searchResults,
  }),
  methods: {
    searchMeows() {
      if (this.query != this.lastQuery) {
        this.$store.dispatch('searchMeows', this.query);
        this.lastQuery = this.query;
      }
    },
  },
  components: {
    Meow,
  },
};
</script>

Макет приложения

Включим компоненты Search и Timeline в src/App.vue.

<template>
  <div class="container py-5">
    <div class="row mb-4">
      <h1 class="col-12">Meower</h1>
    </div>
    <div class="row">
      <Timeline class="col" />
      <Search class="col" />
    </div>
  </div>
</template>
<script>
import Timeline from '@/components/Timeline';
import Search from '@/components/Search';
export default {
  components: {
    Timeline,
    Search,
  },
};
</script>
<style lang="scss" scoped>
.container {
  max-width: 768px;
}
</style>

Завершение

На этом этапе все должно работать так, как и ожидалось. Для запуска приложения сначала соберем образ Docker с помощью Docker Compose, а затем запустим сервер для разработки Vue.

$ docker-compose up -d --build
$ cd frontend && yarn serve

Так выглядит Meower

Демонстрация работы приложения Meower

Весь исходный код доступен на GitHub.