Как быстро попробовать CQRS/ES в Laravel или пишем банк на PHP

Недавно в подкасте «Цинковый прод» мы с товарищами обсуждали паттерн CQRS/ES и некоторые особенности её реализации в Elixir. Т.к. я в работе использую Laravel, грех было не покопаться в интернетах и не найти как же можно потягать этот подход в экосистеме данного фреймворка.

Всех приглашаю под кат, постарался максимально тезисно описать тему.

Немножко определений

CQRS (Command Query Responsibility Segregation) — выделение в отдельные сущности операции чтения и записи. Например пишем в мастер, читаем из реплики. CQRS. Факты и заблуждения — поможет досконально познать дзен CQRS.
ES (Event Sourcing) — хранение всех изменений состояния какой-либо сущности или набора сущностей.
CQRS/ES — это архитектурный подход при котором мы сохраняем все события изменения состояния какой либо сущности в таблице событий и добавляем к этому агрегат и проектор.
Агрегат — хранит в памяти свойства, необходимые для принятия решений бизнес логики (для ускорения записи), принимает решения (бизнес логика) и публикует события.
Проектор — слушает события и пишет в отдельные таблицы или базы (для ускорения чтения).

В бой

Laravel event projector — библиотека CQRS/ES для Laravel
Larabank — репозиторий с реализованным CQRS/ES подходом. Его и возьмем на пробу.

Конфигурация библиотеки подскажет куда смотреть и расскажет, что это такое. Смотрим файл event-projector.php. Из необходимого для описания работы:

  • projectors — регистрируем проекторы;
  • reactors — регистрируем реакторы. Реактор — в данной библиотеке добавляет сайд-эффекты в обработку событий, например в этом репозитории, если три раза попытаться превысить лимит снятия средств, то пишется событие MoreMoneyNeeded и отправляется письмо пользователю о его финансовых трудностях;
  • replay_chunk_size — размер чанка повтора. Одна из фич ES — возможность восстановить историю по событиям. Laravel event projector подготовился к утечке памяти во время такой операции с помощью данной настройки.

Обращаем внимание на миграции. Кроме стандартных Laravel таблиц имеем

  • stored_events — основная ES таблица с несколькими колонками неструктурированных данных под мета данные событий, строкой храним типы событий. Важная колонка aggregate_uuid — хранит uuid агрегата, для получения всех событий относящихся к нему;
  • accounts — таблица проектора счетов пользователя, необходима для быстрой отдачи актуальных данных о состоянии баланса;
  • transaction_counts — таблица проектора количества транзакций пользователя, необходима для быстрой отдачи количества совершенных транзакций.

А теперь предлагаю отправиться в путь вместе с запросом на создание нового счета.

Создание счета

Стандартный resource роутинг описывает AccountsController. Нас интересует метод store

public function store(Request $request)
{
    $newUuid = Str::uuid();
    // Обращаемся к агрегату, сообщаем ему uuid событий
    // которые в него должны входить
    AccountAggregateRoot::retrieve($newUuid)
        // Добавляем в массив событий на отправку событие создания нового счета
        ->createAccount($request->name, auth()->user()->id)
        // Отправляем массив событий на отправку в очередь на запись
        ->persist();
    return back();
}

AccountAggregateRoot наследует библиотечный AggregateRoot. Посмторим на методы, которые вызывал контроллер.

// Берем uuid и получаем все его события
public static function retrieve(string $uuid): AggregateRoot
{
    $aggregateRoot = (new static());
    $aggregateRoot->aggregateUuid = $uuid;
    return $aggregateRoot->reconstituteFromEvents();
}
public function createAccount(string $name, string $userId)
{
    // Добавляем событие в массив событий на отправку
    // у событий, отправляемых в recordThat, есть хуки, но о них позже,
    // т.к. на создание счета их нет)
    $this->recordThat(new AccountCreated($name, $userId));
    return $this;
}

Метод persist вызывает метод storeMany у модели указанной в конфигурации event-projector.php как stored_event_model в нашем случае StoredEvent

public static function storeMany(array $events, string $uuid = null): void
{
    collect($events)
        ->map(function (ShouldBeStored $domainEvent) use ($uuid) {
            $storedEvent = static::createForEvent($domainEvent, $uuid);
            return [$domainEvent, $storedEvent];
        })
        ->eachSpread(function (ShouldBeStored $event, StoredEvent $storedEvent) {
            // Вызываем все проекторы, которые не реализуют интерфейс
            // QueuedProjector*
            Projectionist::handleWithSyncProjectors($storedEvent);
            if (method_exists($event, 'tags')) {
                $tags = $event->tags();
            }
            // Отправляем в очередь джобу обработки и записи события
            $storedEventJob = call_user_func(
                [config('event-projector.stored_event_job'), 'createForEvent'],
                $storedEvent,
                $tags ?? []
            );
            dispatch($storedEventJob->onQueue(config('event-projector.queue')));
        });
}

*QueuedProjector

Проекторы AccountProjector и TransactionCountProjector реализуют Projector поэтому реагировать на события будут синхронно вместе с их записью.

Ок, счет создали. Предлагаю рассмотреть как же клиент будет его читать.

Отображение счета
// Идем в таблицу `accounts` и берем счет по id
public function index()
{
    $accounts = Account::where('user_id', Auth::user()->id)->get();
    return view('accounts.index', compact('accounts'));
}

Если проектор счетов реализует интерфейс QueuedProjector, то пользователь ничего не увидит пока событие не будет обработано по очереди.

Напоследок изучим, как работает пополнение и снятие денег со счета.

Пополнение и снятие

Снова смотрим в контроллер AccountsController:

// Получаем события с uuid агрегата
// в зависимости от запроса вызываем пополнение
// или снятие денег, затем отправляем на запись
public function update(Account $account, UpdateAccountRequest $request)
{
    $aggregateRoot = AccountAggregateRoot::retrieve($account->uuid);
    $request->adding()
        ? $aggregateRoot->addMoney($request->amount)
        : $aggregateRoot->subtractMoney($request->amount);
    $aggregateRoot->persist();
    return back();
}

Рассмотрим AccountAggregateRoot

при пополнении счета:

public function addMoney(int $amount)
{
    $this->recordThat(new MoneyAdded($amount));
    return $this;
}
// Помните говорил о "хуке" в recordThat
// AggregateRoot*?
// В нем вызывается метод apply(ShouldBeStored $event),
// который в свою очередь вызывает метод 'apply' . EventClassName агрегата
// Хук, который срабатывает при обработке `MoneyAdded`
protected function applyMoneyAdded(MoneyAdded $event)
{
    $this->accountLimitHitInARow = 0;
    $this->balance += $event->amount;
}

*AggregateRoot

при снятии средств:

public function subtractMoney(int $amount)
{
    if (!$this->hasSufficientFundsToSubtractAmount($amount)) {
        // Пишем событие о попытке снять больше лимита
        $this->recordThat(new AccountLimitHit());
        // Если слишком много попыток шлем событие, что
        // нужно больше золота, на которое реагирует реактор
        // и отправляет сообщение пользователю
        if ($this->needsMoreMoney()) {
            $this->recordThat(new MoreMoneyNeeded());
        }
        $this->persist();
        throw CouldNotSubtractMoney::notEnoughFunds($amount);
    }
    $this->recordThat(new MoneySubtracted($amount));
}
protected function applyMoneySubtracted(MoneySubtracted $event)
{
    $this->balance -= $event->amount;
    $this->accountLimitHitInARow = 0;
}

Заключение

Постарался максимально без воды описать процесс «онбординга» в CQRS/ES на Laravel. Концепция очень интересная, но не без особенностей. Прежде, чем внедрять помните о:

  • eventual consistency;
  • желательно использовать в отдельных доменах DDD, не стоит делать большую систему полностью на этом паттерне;
  • изменения в схеме таблицы событий могут быть очень болезненны;
  • ответственно стоит подойти к выбору гранулярности событий, чем больше будет конкретных событий, тем больше их будет в таблице и большее количество ресурсов будет необходимо на работу с ними.