Недавно в подкасте «Цинковый прод» мы с товарищами обсуждали паттерн CQRS/ES и некоторые особенности её реализации в Elixir. Т.к. я в работе использую Laravel, грех было не покопаться в интернетах и не найти как же можно потягать этот подход в экосистеме данного фреймворка.
Всех приглашаю под кат, постарался максимально тезисно описать тему.
Немножко определений
CQRS (Command Query Responsibility Segregation) — выделение в отдельные сущности операции чтения и записи. Например пишем в мастер, читаем из реплики. CQRS. Факты и заблуждения — поможет досконально познать дзен CQRS.
ES (Event Sourcing) — хранение всех изменений состояния какой-либо сущности или набора сущностей.
CQRS/ES — это архитектурный подход при котором мы сохраняем все события изменения состояния какой либо сущности в таблице событий и добавляем к этому агрегат и проектор.
Агрегат — хранит в памяти свойства, необходимые для принятия решений бизнес логики (для ускорения записи), принимает решения (бизнес логика) и публикует события.
Проектор — слушает события и пишет в отдельные таблицы или базы (для ускорения чтения).
В бой
Laravel event projector — библиотека CQRS/ES для Laravel
Larabank — репозиторий с реализованным CQRS/ES подходом. Его и возьмем на пробу.
Конфигурация библиотеки подскажет куда смотреть и расскажет, что это такое. Смотрим файл event-projector.php. Из необходимого для описания работы:
projectors
— регистрируем проекторы;reactors
— регистрируем реакторы. Реактор — в данной библиотеке добавляет сайд-эффекты в обработку событий, например в этом репозитории, если три раза попытаться превысить лимит снятия средств, то пишется событие MoreMoneyNeeded и отправляется письмо пользователю о его финансовых трудностях;replay_chunk_size
— размер чанка повтора. Одна из фич ES — возможность восстановить историю по событиям. Laravel event projector подготовился к утечке памяти во время такой операции с помощью данной настройки.
Обращаем внимание на миграции. Кроме стандартных Laravel таблиц имеем
stored_events
— основная ES таблица с несколькими колонками неструктурированных данных под мета данные событий, строкой храним типы событий. Важная колонкаaggregate_uuid
— хранит uuid агрегата, для получения всех событий относящихся к нему;accounts
— таблица проектора счетов пользователя, необходима для быстрой отдачи актуальных данных о состоянии баланса;transaction_counts
— таблица проектора количества транзакций пользователя, необходима для быстрой отдачи количества совершенных транзакций.
А теперь предлагаю отправиться в путь вместе с запросом на создание нового счета.
Создание счета
Стандартный resource
роутинг описывает AccountsController. Нас интересует метод store
public function store(Request $request)
{
$newUuid = Str::uuid();
// Обращаемся к агрегату, сообщаем ему uuid событий
// которые в него должны входить
AccountAggregateRoot::retrieve($newUuid)
// Добавляем в массив событий на отправку событие создания нового счета
->createAccount($request->name, auth()->user()->id)
// Отправляем массив событий на отправку в очередь на запись
->persist();
return back();
}
AccountAggregateRoot наследует библиотечный AggregateRoot. Посмторим на методы, которые вызывал контроллер.
// Берем uuid и получаем все его события
public static function retrieve(string $uuid): AggregateRoot
{
$aggregateRoot = (new static());
$aggregateRoot->aggregateUuid = $uuid;
return $aggregateRoot->reconstituteFromEvents();
}
public function createAccount(string $name, string $userId)
{
// Добавляем событие в массив событий на отправку
// у событий, отправляемых в recordThat, есть хуки, но о них позже,
// т.к. на создание счета их нет)
$this->recordThat(new AccountCreated($name, $userId));
return $this;
}
Метод persist
вызывает метод storeMany
у модели указанной в конфигурации event-projector.php как stored_event_model
в нашем случае StoredEvent
public static function storeMany(array $events, string $uuid = null): void
{
collect($events)
->map(function (ShouldBeStored $domainEvent) use ($uuid) {
$storedEvent = static::createForEvent($domainEvent, $uuid);
return [$domainEvent, $storedEvent];
})
->eachSpread(function (ShouldBeStored $event, StoredEvent $storedEvent) {
// Вызываем все проекторы, которые не реализуют интерфейс
// QueuedProjector*
Projectionist::handleWithSyncProjectors($storedEvent);
if (method_exists($event, 'tags')) {
$tags = $event->tags();
}
// Отправляем в очередь джобу обработки и записи события
$storedEventJob = call_user_func(
[config('event-projector.stored_event_job'), 'createForEvent'],
$storedEvent,
$tags ?? []
);
dispatch($storedEventJob->onQueue(config('event-projector.queue')));
});
}
Проекторы AccountProjector и TransactionCountProjector реализуют Projector
поэтому реагировать на события будут синхронно вместе с их записью.
Ок, счет создали. Предлагаю рассмотреть как же клиент будет его читать.
Отображение счета
// Идем в таблицу `accounts` и берем счет по id
public function index()
{
$accounts = Account::where('user_id', Auth::user()->id)->get();
return view('accounts.index', compact('accounts'));
}
Если проектор счетов реализует интерфейс QueuedProjector, то пользователь ничего не увидит пока событие не будет обработано по очереди.
Напоследок изучим, как работает пополнение и снятие денег со счета.
Пополнение и снятие
Снова смотрим в контроллер AccountsController:
// Получаем события с uuid агрегата
// в зависимости от запроса вызываем пополнение
// или снятие денег, затем отправляем на запись
public function update(Account $account, UpdateAccountRequest $request)
{
$aggregateRoot = AccountAggregateRoot::retrieve($account->uuid);
$request->adding()
? $aggregateRoot->addMoney($request->amount)
: $aggregateRoot->subtractMoney($request->amount);
$aggregateRoot->persist();
return back();
}
Рассмотрим AccountAggregateRoot
при пополнении счета:
public function addMoney(int $amount)
{
$this->recordThat(new MoneyAdded($amount));
return $this;
}
// Помните говорил о "хуке" в recordThat
// AggregateRoot*?
// В нем вызывается метод apply(ShouldBeStored $event),
// который в свою очередь вызывает метод 'apply' . EventClassName агрегата
// Хук, который срабатывает при обработке `MoneyAdded`
protected function applyMoneyAdded(MoneyAdded $event)
{
$this->accountLimitHitInARow = 0;
$this->balance += $event->amount;
}
при снятии средств:
public function subtractMoney(int $amount)
{
if (!$this->hasSufficientFundsToSubtractAmount($amount)) {
// Пишем событие о попытке снять больше лимита
$this->recordThat(new AccountLimitHit());
// Если слишком много попыток шлем событие, что
// нужно больше золота, на которое реагирует реактор
// и отправляет сообщение пользователю
if ($this->needsMoreMoney()) {
$this->recordThat(new MoreMoneyNeeded());
}
$this->persist();
throw CouldNotSubtractMoney::notEnoughFunds($amount);
}
$this->recordThat(new MoneySubtracted($amount));
}
protected function applyMoneySubtracted(MoneySubtracted $event)
{
$this->balance -= $event->amount;
$this->accountLimitHitInARow = 0;
}
Заключение
Постарался максимально без воды описать процесс «онбординга» в CQRS/ES на Laravel. Концепция очень интересная, но не без особенностей. Прежде, чем внедрять помните о:
- eventual consistency;
- желательно использовать в отдельных доменах DDD, не стоит делать большую систему полностью на этом паттерне;
- изменения в схеме таблицы событий могут быть очень болезненны;
- ответственно стоит подойти к выбору гранулярности событий, чем больше будет конкретных событий, тем больше их будет в таблице и большее количество ресурсов будет необходимо на работу с ними.