Семантика exactly-once в Apache Kafka

Наконец, свершилось то, что сообщество Kafka так долго ждало: в Apache Kafka версии 0.11 появилась семантика exactly-once («строго однократная доставка»). В этом посте я расскажу вам о следующих моментах:

– что представляет собой семантика exactly-once в Apache Kafka;
– почему эта проблема сложна;
– как новые свойства идемпотентности и транзакций позволяют корректно выполнять потоковую exactly-once-обработку с помощью Kafka Streams API.

 

Exactly-once – действительно сложная проблема

Я догадываюсь, о чём каждый из вас сейчас подумал: доставка exactly-once невозможна. Для практического применения это слишком дорого (или поправьте меня, если я ошибаюсь). Вы не одиноки в таких мыслях. Некоторые из моих коллег признают, что доставка exactly-once – одна из самых сложных проблем в сфере распределённых приложений.

 

 

Итак, кое-кто недвусмысленно дал понять, что считает доставку exactly-once с большой вероятностью невозможной!

 

 

Я не отрицаю, что семантика доставки exactly-once (и поддержка потоковой обработки в этом же режиме) – действительно трудноразрешимая задача. Но также я больше года была свидетелем того, как талантливые инженеры Confluent совместно с open-source-сообществом усердно работали над решением этой проблемы в Apache Kafka. Так что давайте перейдём к обзору семантики передачи сообщений.

 

Обзор семантики передачи сообщений

 

Компьютеры, образующие распределённую систему, всегда могут выйти из строя вне зависимости от других участников системы. В случае с Kafka из строя может выйти отдельный брокер, или может произойти сетевая ошибка, в то время как продюсер отправляет сообщение в топик. И в зависимости от действий продюсера по обработке сбоя, можно получить различные семантики.

 

Семантика at least once («хотя бы один раз»).

 

Если продюсер получает подтверждение от брокера Kafka, и при этом acks=all, это означает, что сообщение было записано в топик Kafka строго однократно. Но если продюсер не получает подтверждение по истечении тайм-аута или получает ошибку, то он может попробовать снова отправить сообщение, считая, что оно не было записано в топик Kafka. Если брокер дал сбой непосредственно перед отправкой подтверждения, но после того, как сообщение было успешно записано в топик Kafka, эта повторная попытка отправки приведёт к тому, что сообщение будет записано и отправлено конечному потребителю дважды. Все будут довольны неутомимостью раздающего, но такой подход приводит к дублированию работы и некорректным результатам.

 

Семантика at most once («не более одного раза»).

 

Если продюсер не производит повторную отправку сообщения по истечении тайм-аута или получения ошибки, то сообщение может не записаться в топик Kafka, и, следовательно, оно не будет доставлено потребителю. В большинстве случаев сообщения будут доставляться, но, чтобы избежать вероятности дублирования, мы допускаем, что иногда сообщения не доходят.

 

Семантика exactly once («строго однократная доставка»).

 

Даже при повторной попытке продюсера отправить сообщение, сообщение доставляется строго один раз. Семантика exactly once – наиболее желаемая гарантия, но при этом наименее понимаемая. Причина в том, что она требует взаимодействия между самой системой передачи сообщений и приложением, генерирующим и получающим сообщения. Например, если после удачного получения сообщения вы перемотаете Kafka-потребитель на предыдущее положение, то снова получите оттуда все до последнего сообщения. Это наглядно показывает, почему система обмена сообщениями и клиентское приложение должны взаимодействовать друг с другом, чтобы работала семантика exactly-once.

 

Сбои, которые нужно обрабатывать

 

Описание трудностей поддержки семантики exactly-once начнём с простого примера.

 

Допустим, однопоточное приложение-продюсер отправляет сообщение “Hello, Kafka” в состоящий из одного раздела топик Kafka с именем “EoS” Далее, предположим, что единственный экземпляр приложения-потребителя на другом конце берёт данные из топика и выводит сообщение. Если повезёт и сбоев не будет, то всё сработает отлично, и сообщение “Hello, Kafka” будет однократно записано в раздел топика “EoS”. Потребитель получает сообщение, обрабатывает его и фиксирует его положение, тем самым сообщая о завершении обработки – и приложение-потребитель уже не получит это сообщение повторно даже в случае своего сбоя и перезагрузки.

 

Но всем нам хорошо известно, что нельзя всегда рассчитывать на удачу. По мере масштабирования периодически происходят даже самые маловероятные сценарии сбоев.

 

  1. Сбой брокера. Kafka – это высокодоступная, устойчивая и надёжная система, в которой каждое сообщение, записанное в раздел, сохраняется и реплицируется n-ное кличество раз. Поэтому Kafka может выдержать n-1 сбоев брокера, а значит, раздел доступен до тех пор, пока доступен хотя бы один брокер. Протокол репликации Kafka гарантирует, что, если сообщение однажды было успешно записано в главную реплику, оно будет растиражировано по всем доступным репликам.
  2. Сбой RPC «продюсер-брокер». Устойчивость Kafka зависит от продюсера, получающего от брокера подтверждение. Сбой при получении подтверждения не обязательно означает сбой самого запроса. Брокер может «упасть» уже после записи сообщения, но ещё до отправки подтверждения продюсеру. Также он может «упасть» ещё до записи сообщения в топик. Поскольку продюсеру неоткуда узнать причину сбоя, он вынужден предположить, что сообщение не было успешно записано, и сделает ещё одну попытку. В некоторых случаях это приведёт к дублированию сообщения в логе раздела Kafka, а конечный потребитель получит это сообщение более одного раза.
  3. Сбой клиента. При доставке exactly-once нужно учитывать и вероятность сбоя клиента. Но откуда мы можем узнать, что это сбой клиента, а не просто временное отделение от брокера или пауза приложения? Необходимо достоверно различать постоянный сбой от временного. По идее, брокер должен отбрасывать сообщения от зомби-продюсера. То же касается и потребителя: как только появляется новый экземпляр клиента, он должен иметь возможность восстановиться из любого состояния, в котором был сбойный экземпляр, и начать обработку с нужного места. Это означает, что получаемые положения всегда должны быть синхронизированы с генерируемыми данными.

 

Объяснение семантики exactly-once в Apache Kafka

 

До версии 0.11.x Apache Kafka поддерживал для каждого раздела семантику доставки at least once и доставку с сохранением порядка. Как видно из вышеприведённого примера, это означает, что повторные попытки отправки сообщения продюсером могли привести к его дублированию. В новой семантике exactly-once мы усилили семантику обработки данных Kafka тремя различными, но взаимосвязанными способами.

 

Идемпотентность: семантика exactly-once с сохранением порядка доставки для каждого раздела

 

Идемпотентная операция – это операция, которая при многократном выполнении даёт тот же результат, что и при однократном. Операция отправки продюсером теперь является идемпотентной. В случае ошибки, заставляющей продюсера повторить попытку, сообщение, которое многократно отправлялось продюсером, будет однократно записано в логе брокера Kafka. Применительно к одиночному разделу идемпотентные операции отправки продюсером избавляют нас от вероятности дублирования сообщений вследствие ошибок продюсера или брокера.

 

Для включения этой функции и получения семантики exactly-once для каждого раздела (то есть никакого дублирования, никакой потери данных и сохранение порядка доставки) просто укажите в настройках продюсера enable.idempotence=true.

 

Так как же работает эта функция? «Под капотом» она работает аналогично ТСР: каждый пакет сообщений, отправленный в Kafka, будет содержать порядковый номер, при помощи которого брокер сможет устранить дублирование данных. Но в отличие от ТСР, который гарантированно выполняет дедупликацию только в условиях временного соединения в памяти, порядковый номер сохраняется в реплицированный лог. Поэтому даже в случае сбоя главной реплики любой брокер, берущий на себя эту роль, также распознаёт, являются ли дублем заново отправленные данные.

 

Накладные расходы при таком подходе относительно невелики: всего лишь несколько дополнительных числовых полей для каждого пакета сообщений. Дальше вы увидите, что эта функция очень незначительно снижает производительность по сравнению с неидемпотентным продюсером.

 

Транзакции: атомарные записи в нескольких разделах

 

Теперь Kafka поддерживает атомарную запись в нескольких разделах при помощи новых транзакционных API. Это позволяет продюсеру отправлять пакеты сообщений в несколько разделов так, что либо все сообщения из пакета будут видны любому потребителю, либо ни одно из них не будет видно никому. Данная функция также позволяет осуществлять смещение потребителя в одной транзакции с данными, которые вы обработали, а, следовательно, делает возможной сквозную семантику exactly-once. Вот сниппет с примером кода, который демонстрирует использование транзакционного API:

 

producer.initTransactions();
try {
 producer.beginTransaction();
 producer.send(record1);
 producer.send(record2);
 producer.commitTransaction();
} catch(ProducerFencedException e) {
 producer.close();
} catch(KafkaException e) {
 producer.abortTransaction();
}

 

Этот пример показывает, как можно использовать новый Producer API для атомарной отправки сообщений в набор разделов топика. Стоит отметить, что раздел топика Kafka может одновременно содержать сообщения, как являющиеся частью транзакции, так и не являющиеся.

 

Поэтому с точки зрения потребителя есть два способа чтения транзакционных сообщений, выраженных через isolation level — настройку потребителя:

 

  1. read_committed: вдобавок к чтению сообщений, не являющихся частью транзакции, есть возможность считывать и те, что являются частью транзакции, после коммита транзакции.
  2. read_uncommitted: считываются все сообщения в порядке смещения без ожидания коммита транзакции; эта опция аналогична существующей семантике Kafka-потребителя.

 

Чтобы использовать транзакцию, необходимо сконфигурировать потребителя для задания нужного isolation level, использовать новые Producer API и задать в конфигурации продюсера параметр Transactional ID как некий уникальный ID (он нужен для обеспечения непрерывности транзакционного состояния при перезапусках приложения).

 

То, что нужно: потоковая Exactly-once-обработка в Apache Kafka

 

Благодаря Streams API в Apache Kafka теперь доступна потоковая exactly-once- обработка на основании идемпотентности и атомарности. Чтобы ваше потоковое приложение могло использовать эту семантику, достаточно указать в конфигурации processing.guarantee=exactly_once. В результате вся обработка будет выполняться строго однократно. Это относится как к обработке, так и к воссозданному состоянию, созданному заданием на обработку данных и записанному обратно в Kafka.

 

«Вот почему гарантии exactly-once-обработки, предоставляемые Streams API в Kafka, являются сильнейшими среди всех гарантий, предлагаемых сегодня системами потоковой обработки. Он обеспечивает сквозные exactly-once-гарантии для приложения потоковой обработки, начиная с чтения данных из Kafka, с любого состояния, воссозданного в Kafka потоковым приложением, до записи в Kafka финального результата. Системы потоковой обработки, в которых поддержка воссозданных состояний опирается лишь на внешние системы данных, ослабляют гарантии потоковой exactly-once-обработки. Даже когда они используют Kafka в качестве источника для обработки и должны восстановиться после сбоя, то могут лишь переключить свои Kafka-положения для перепотребления и переобработки сообщений. Но они не могут откатить ассоциированное состояние во внешней системе, что приводит к некорректным результатам в случае неидемпотентности обновления состояния».

 

Позволю себе добавить ещё немного подробностей. Обычно нас волнует, правильный ли ответ получает приложение потоковой обработки, даже если во время обработки падает один из инстансов. Правильное решение при восстановлении вышедшего из строя инстанса – продолжение обработки в том состоянии, которое было до сбоя.

 

Итак, потоковая обработка – это простая операция «чтение – обработка – запись» в топике Kafka. Потребитель считывает из топика сообщения, обрабатывающая логика преобразует их или меняет состояние, поддерживаемое обработчиком, а продюсер записывает результат в другой топик Kafka. Потоковая exactly-once-обработка – это просто возможность строго однократно выполнять операцию «чтение – обработка – запись». При таком раскладе получить правильный ответ означает лишь не пропустить какое-нибудь входящее сообщение или не продублировать выходные данные. И это как раз тот режим работы, который потребители ждут от потокового exactly-once-обработчика.

 

Помимо рассмотренного простейшего сценария сбоя есть и много других.
Потоковый обработчик может брать входные данные из нескольких исходных топиков, порядок которых отличается при многократных прогонах. Поэтому если вы перезапустите свой потоковый обработчик, который взял данные из нескольких исходных топиков, то можете получить другие результаты.
Потоковый обработчик может передавать выходные данные в несколько целевых топиков. Если продюсер не может выполнять атомарную запись в несколько топиков, то его выходные данные могут оказаться неверными в случае сбоя записи в некоторые (не во все) разделы.

 

Потоковый обработчик может агрегировать или объединять несколько входных данных при помощи средств управления состоянием, предоставляемых Streams API. Если один из экземпляров потокового обработчика даёт сбой, то у вас должна быть возможность откатиться к состоянию, воссозданному этим экземпляром потокового обработчика. При перезапуске инстанса вам также нужно иметь возможность продолжить обработку и воссоздать его состояние.

 

Потоковый обработчик может искать более полную информацию во внешней базе данных или посредством обращения к сервису, обновляемому извне. Зависимость от внешнего сервиса делает потоковый обработчик совершенно недетерминированным. Если внешний сервис изменит своё внутреннее состояние между двумя запусками, это приведёт к некорректной передаче результатов. Но при правильной обработке это не должно стать причиной получения полностью неправильных результатов – просто выходные данные потокового процессора будут относиться к допустимым выходным данным.

 

Сбой и перезапуск, особенно в сочетании с недетерминированными операциями и изменениями персистентного состояния, рассчитанного приложением, могут привести не только к дублированию, но и к неверным результатам. Например, если на одной стадии обработки рассчитывается количество увиденных событий, то дублирование может привести к неверному подсчёту на следующей стадии. Поэтому нам нужно определиться со смыслом фразы «потоковая exactly-once-обработка».

 

Это относится и к потреблению из топика, и к воссозданию промежуточного состояния в топике Kafka и отправке в него. Не все возможные вычисления в сообщении выполняются с помощью Streams API, некоторые из них (например, зависящие от внешнего сервиса или потребления из нескольких топиков-источников) в основе своей являются недетерминированными.

 

«Применительно к гарантиям потоковой exactly-once-обработки для детерминированных операций нужно убедиться, что выходные данные операции «чтение – обработка – запись» будут такими же, как если бы потоковый обработчик видел каждое сообщение строго однократно – как будто при условии бесперебойной работы».

 

Стоп, так чем же всё-таки является потоковая exactly-once-обработка для недетерминированных операций?

 

Всё это имеет смысл для детерминированных операций, но что означает потоковая exactly-once обработка, когда сама логика обработки является недетерминированной? Допустим, тот же потоковый обработчик, подсчитывающий количество входящих событий, будет модифицирован для подсчёта только тех событий, которые удовлетворяют условиям, диктуемым внешним сервисом. Эта операция – недетерминированная по своей природе, поскольку внешние условия могут меняться от запуска к запуску потокового обработчика, что потенциально ведёт к различным результатам. Так как же нам понимать гарантии потоковой exactly-once-обработки применительно к таким операциям?

 

«Применительно к гарантиям потоковой exactly-once-обработки для недетерминированных операций нужно убедиться, что выходные данные операции «чтение – обработка – запись» принадлежат к подмножеству допустимых выходных данных, генерируемых комбинацией допустимых значений недетерминированных входных данных».

 

Итак, в нашем примере потокового обработчика при текущем значении счётчика 31 и значении входящего события 2 верными выходными данными в случае сбоя могут быть только 31 или 33: 31 – при условии, что входное событие отклонено, как указано внешними условиями, а 33 – если оно не отклонено.

 

Это статья лишь поверхностно касается вопросов потоковой exactly-once-обработки в Streams API. В следующем посте на эту тему будет подробнее рассказано о гарантиях, а также проведено сравнение гарантий exactly-once в других системах потоковой обработки.

 

Гарантии exactly-once в Kafka: действительно ли они работают?

 

Любая крупная работа вроде этой всегда вызывает вопрос «А работает ли эта фича так, как обещано?» Чтобы ответить на него, давайте рассмотрим её правильность (как мы спроектировали, построили и протестировали эту фичу) и производительность.

 

Скрупулезный дизайн и процесс проверки

 

Корректность и производительность начинаются с надёжной архитектуры. Работу над ней и прототипами мы начали около трёх лет назад в LinkedIn. Также мы занимались этим более года в Confluent, пытаясь найти элегантный способ свести идемпотентность и транзакционные требования в целостный пакет. Мы составили 60-страничное описание, охватывающее все аспекты архитектуры, начиная с высокоуровневой передачи сообщений до рутинных подробностей реализации каждой структуры данных и RPC. Этот процесс шёл целых девять месяцев под пристальным вниманием общественности, за это время архитектура значительно улучшилась благодаря обратной связи сообщества.

 

Например, благодаря open-source-дискуссии мы заменили буферизацию транзакционных чтений со стороны потребителя на более умную фильтрацию в серверной части, избежав тем самым возможных больших проблем с производительностью. Также мы улучшили взаимодействие транзакций со сжатыми топиками и расширили возможности по обеспечению безопасности.

 

В результате мы пришли к простой архитектуре, которая во многом опирается на надёжные примитивы Kafka.

 

  1. Наш транзакционный лог – это топик Kafka с соответствующими гарантиями устойчивости.
  2. Наш новый координатор транзакций (который управляет состоянием транзакций для продюсера) работает в брокере и, естественно, использует алгоритм выбора лидера для обработки сбоев.
  3. Для приложений потоковой обработки, созданных с помощью Kafka Streams API, мы воспользовались тем, что топики Kafka являются «источником истины» для хранения состояний и входящих положений. Поэтому мы смогли «прозрачно» положить эти данные в транзакции, которые атомарно пишутся в несколько разделов, а значит, в рамках операций «чтение – обработка – запись» мы обеспечили гарантии exactly-once для потоков.

 

Эти простота, упор на эффективное использование и внимание к деталям дали нашей архитектуре большие шансы превратиться в хорошо работающую реализацию.

 

Итеративный процесс разработки

 

Мы разрабатывали фичу с открытым исходным кодом, чтобы каждый запрос на добавление кода подвергался интенсивной проверке. В результате отдельные запросы в течение месяцев проходили через несколько десятков итераций. Это позволило выявить некоторые недостатки архитектуры и бесчисленные тупиковые ситуации, о которых мы и не догадывались.

 

Для тестов мы написали более 15 000 строк кода, включая распределённое тестирование работы под нагрузкой с настоящими сбоями, и проводили их каждую ночь в течение нескольких недель в поисках багов. Тесты выявили самые разные проблемы: от базовых ошибок в коде до эзотерических проблем с NTP-синхронизацией в нашей тестовой обвязке. Также мы прогоняли распределённые случайные тесты: брали полный кластер Kafka с несколькими транзакционными клиентами, транзакционно создавали сообщения, считывали их в многопоточном режиме и жёстко «убивали» клиенты и серверы во время этого процесса, чтобы убедиться, что данные не теряются и не дублируются.

 

В результате получилась простая и надёжная архитектура с тщательно протестированной, высококачественной кодовой базой, которая и легла в основу нашего решения.

 

Хорошая новость: Kafka остался таким же быстрым!

 

При создании этой фичи мы сосредоточились на производительности; мы хотели, чтобы наши пользователи могли использовать доставку exactly-once и семантику обработки не только в каких-то определённых случаях, но и имели возможность включать их по умолчанию. Мы отказались от множества более простых архитектурных решений, подразумевающих снижение производительности. После долгих раздумий мы остановились на схеме, имеющей минимальные накладные расходы на каждую транзакцию (~1 процедура записи на раздел и несколько дополнительных записей в центральном транзакционном логе). Это видно из результатов измерения производительности фичи. Для однокилобайтных сообщений и транзакций, длящихся 100 мс, пропускная способность продюсера снижается на:

 

3% по сравнению с его работой, когда он сконфигурирован на контролируемую (in-order) доставку at least once (acks=all, max.in.flight.requests.per.connection=1),
на 20% по сравнению с его работой, когда он сконфигурирован на at most once без соблюдения порядка сообщений (acks=1, max.in.flight.requests.per.connection=5), используется по умолчанию.

 

С момента выхода первого релиза семантики exactly-once появились планы по дальнейшему улучшению производительности. Например, как только мы решим задачу KAFKA-5494, улучшающую конвейеризацию в продюсере, то надеемся значительно снизить накладные расходы в производительности транзакционного продюсера даже по сравнению с продюсером, поддерживающим доставку at most once без соблюдения порядка сообщений. Мы также обнаружили, что идемпотентность оказывает ничтожно малое влияние на производительность продюсера. Если вам интересно, вот результаты наших бенчмарков, тестовая конфигурация и методика тестирования.

 

Помимо обеспечения низких накладных расходов при работе новых фич, нам хотелось не допустить регрессию производительности приложений, не использующих фичи exactly-once. Мы не только добавили поля в заголовки сообщений Kafka для внедрения exactly-once, но и доработали формат сообщений Kafka, чтобы ещё эффективнее сжимать их при передаче по сети или на диске. В частности, мы перенесли часть метаданных в заголовки пакетов и внедрили кодирование длин переменных для каждой записи в пакете. Это позволило значительно уменьшить размер сообщения. Например, пакет с семью записями по 10 байтов каждая в новом формате будет весить на 35% меньше. Это привело к улучшению чистой производительности Kafka в приложениях, использующих ввод/ вывод: при обработке маленьких сообщений до 20% ускоряется работа продюсера и до 50% – потребителя. Такой рост производительности доступен любому пользователю Kafka 0.11, даже если он не пользуется фичами exactly-once.

 

Мы также пересмотрели накладные расходы на потоковую exactly-once обработку с помощью Streams API. При коротком интервале коммита в 100 мс (что необходимо для поддержания низкой сквозной задержки) порисходит падение производительности от 15% (однокилобайтные сообщения) до 30% (стобайтные сообщения). Однако при большом интервале коммита в 30 с вообще не наблюдается повышения накладных расходов для сообщений размером 1 Кб и больше. В следующем релизе мы планируем внедрить спекулятивное исполнение, что позволит поддерживать низкую сквозную задержку даже при большом интервале коммита. То есть мы хотим добиться нулевых накладных расходов на транзакцию.

 

Наконец, кардинально переработав некоторые ключевые структуры данных, мы освободили пространство для построения идемпотентности и транзакционных фич с минимальным снижением производительности, чтобы Kafka стал ещё быстрее. Годы напряжённой работы – и мы невероятно рады выпустить фичи exactly-once для всего сообщества Apache Kafka. Благодаря усилиям, вложенным в создание семантики exactly-once, по мере её широкого распространения в сообществе будут внедряться и новые улучшения. С нетерпением ждём обратной связи и работаем над преобразованиями для предстоящих релизов Apache Kafka.

 

Это что, волшебная пыль, которую я могу распылить на своё приложение?

 

Нет, не совсем. Обработка данных exactly-once – это сквозная гарантия, и приложение должно быть спроектировано так, чтобы не нарушать это свойство. Если вы пользуетесь потребительским API, это значит, что вы фиксируете изменения состояния вашего приложения в соответствии со своими смещениями, как описано в этой статье.

 

Ситуация с потоковой обработкой немного лучше. Поскольку потоковая обработка – закрытая система, где входные и выходные данные, а также изменения состояний смоделированы в одной операции, то это действительно во многом похоже на волшебную пыль. Единственное изменение в конфиге обеспечит вам сквозные гарантии. Хотя вам всё ещё нужно получить данные из Kafka. Вы получите возможность потоковой обработки в сочетании с exactly-once коннектором.

 

Дополнительные ресурсы

 

Эта статья в основном посвящена описанию характера ориентированных на пользователей гарантий, обеспечиваемых грядущей exactly-once-фичей в Apache Kafka 0.11, а также тому, как вы можете её использовать.

 

Если вы хотите подробнее изучить гарантии exactly-once, рекомендую пройтись по KIP-98, чтобы узнать о свойствах транзакции, и KIP-129, чтобы узнать о потоковой exactly-once обработке. Если хотите ещё глубже узнать об архитектуре этих фич, вам в помощь этот дизайн-документ.