Гарантии доставки данных

Гарантии доставки определяют, сколько раз каждое событие из входного топика будет обработано потоковым запросом. Понимание гарантий системы критически важно при проектировании конвейеров обработки данных.

Примечание

Мы постоянно работаем над развитием механизмов потоковой обработки. В будущих версиях предоставляемые гарантии будут улучшены.

Гарантии обработки данных (dataplane):

  • at-least-once — для всех типов запросов каждое событие обрабатывается минимум один раз.

Аномалии при модификации запросов (control plane):

Чекпоинты и восстановление

YDB периодически сохраняет чекпоинт — снимок состояния запроса, содержащий:

  • смещения во входных топиках — позиции, до которых события были прочитаны и обработаны;
  • состояния агрегаций — промежуточные результаты операций, например накопленные значения в GROUP BY HOP.

YDB хранит смещения чтения в собственных чекпоинтах, а не полагается на смещения потребителя (consumer) во внешней системе.

При восстановлении запрос откатывается к последнему чекпоинту: возобновляет чтение с сохранённых смещений и восстанавливает состояния агрегаций. События, поступившие между чекпоинтом и сбоем, будут обработаны повторно. Подробнее о механизме чекпоинтов — в разделе Чекпоинты.

Гарантии обработки данных (dataplane) — at-least-once

Если в процессе обработки потока происходит сбой (перезапуск вычислительного узла, сетевой разрыв, таймаут), YDB автоматически восстанавливает запрос из последнего чекпоинта. Гарантия at-least-once обеспечивается для всех типов потоковых запросов — каждое событие будет обработано как минимум один раз. Запрос возобновляет чтение с сохранённого смещения и отправляет результаты обработки повторно. Это относится ко всем видам запросов: к запросам без агрегации (фильтрация, обогащение, трансформация), и к запросам с оконной агрегацией.

При записи результата в таблицу через UPSERT повторная обработка не приводит к дублированию: UPSERT обновляет существующую строку по первичному ключу. Данные не теряются, дубли не накапливаются.

При записи результата в выходной топик повторная обработка приводит к появлению дубликатов: одни и те же события будут записаны в топик более одного раза. Потребитель выходного топика должен учитывать это и при необходимости выполнять дедупликацию самостоятельно.

Гарантии при модификации запроса (control plane)

В настоящий момент изменение текста запроса без его остановки не поддерживается. Для обновления запроса используется сочетание команд DROP + CREATE, в этом случае гарантия at-least-once не выполняется: часть событий может быть пропущена. Ниже описаны сценарии, в которых это происходит.

Частичные результаты первого окна при старте запроса

Временные окна (GROUP BY HOP) рассчитывают свои границы по абсолютному (wall-clock) времени. Границы окон выравниваются по кратным интервалам от начала эпохи: например, при окне в 1 минуту границы всегда проходят в 12:00:00, 12:01:00, 12:02:00 и т.д., независимо от того, когда запрос был запущен. Если запрос стартует в 12:00:30, он попадает в уже идущее окно [12:00:00 .. 12:01:00], но данные начинают поступать только с 12:00:30. В результате агрегат первого окна вычисляется по данным за 30 секунд вместо полной минуты.

Это ожидаемое поведение при первом запуске — все последующие окна получат данные за полный интервал, который важно учитывать при пересоздании запроса.

Потеря событий при пересоздании запроса

Для изменения текста запроса используется сочетание команд DROP + CREATE. При DROP чекпоинт удаляется вместе с запросом, так как YDB использует внутреннее хранение смещений чтения из источника, то эти смещения удаляются вместе с запросом. Новый запрос не имеет сохранённой позиции и начинает чтение с конца топика. Все события, поступившие в топик между удалением старого запроса и стартом нового, не будут прочитаны.

Аналогичная ситуация возникает, если данные, на которые указывает смещение в чекпоинте, уже удалены из топика по TTL.

Для запросов с оконной агрегацией первые окна после пересоздания будут содержать пропуски данных и заниженные агрегаты.

Неполные агрегаты из-за отсутствия watermarks

В системах потоковой обработки watermarks — это системные метки, определяющие момент, после которого все данные для заданного временного интервала гарантированно получены. YDB в данный момент не поддерживает этот механизм.

Примечание

Watermarks будут поддержаны в версии 26.1.

Без watermarks YDB закрывает временное окно по wall-clock времени (системным часам), а не по полноте данных. Если топик имеет несколько партиций и данные из одной партиции поступают с задержкой, часть событий может прийти после закрытия окна и не попасть в агрегат.

Когда это проявляется:

  • топик содержит несколько партиций с неравномерной нагрузкой;
  • продюсер записывает данные в партиции с разной задержкой;
  • сеть или один из продюсеров временно замедляется.

Агрегаты в окнах могут быть занижены на долю событий из «медленных» партиций. Чем больше разброс задержек между партициями, тем заметнее эффект.

См. также