Топик

Топик в YDB — это сущность для хранения неструктурированных сообщений, предназначенная для их доставки множеству подписчиков. Фактически, топик — это именованный набор сообщений.

Приложение-писатель записывает сообщения в топик. Приложения-читатели независимы друг от друга, они получают, "считывают", сообщения из топика в том порядке, в котором они были записаны. С помощью топика реализуется архитектурный шаблон издатель-подписчик.

Топик YDB обладает следующими свойствами:

  • Гарантиями at-least-once при чтении сообщений подписчиками.
  • Гарантиями exactly-once при публикации сообщений (дедупликация сообщений).
  • Гарантиями последовательной обработки сообщений (FIFO), опубликованных с одинаковым идентификатором источника.
  • Масштабирование пропускной способности передачи сообщений, публикуемых с разными идентификаторами последовательности.

Сообщения

Данные передаются в виде потоков сообщений. Сообщение — это минимальная неделимая единица пользовательской информации. Сообщения состоят из тела и атрибутов, а также из дополнительных системных свойств. Содержимое сообщений представляет собой набор байт, которое никак не интерпретируется YDB.

Сообщения могут содержать пользовательские атрибуты в формате "ключ-значение". Они возвращаются вместе с телом сообщения при чтении. Пользовательские атрибуты позволяют читателю принять решение о необходимости обработки сообщения, не распаковывая тело сообщения.Атрибуты сообщения задаются при инициализации сессии записи. Это означает, что все сообщения, записанные в рамках одной сессии записи, будут обладать одинаковыми атрибутами при чтении.

Партиционирование

Для горизонтального масштабирования топик разделяется на отдельные элементы, партиции, являющиеся единицами параллелизма. Каждая партиция имеет ограниченную пропускную способность, рекомендуемая скорость записи до 1 МБ/с.

Примечание

В настоящий момент уменьшение количества партиций в топике поддерживается только при помощи удаления и повторного создания топика с меньшим количеством партиций.

Партиции бывают двух видов:

  • Активные. Все партиции по умолчанию, в них возможны как запись, так и чтение.
  • Неактивные. Из них можно только читать. Неактивные партиции появляются после разделения партиции при включенном автопартиционировании. Неактивные партиции удаляются автоматически, когда все сообщения из такой партиции удалены по истечении времени хранения.

Смещение (offset)

Все сообщения внутри партиции имеют уникальный порядковый номер, называемый смещением (offset). Смещение монотонно возрастает при записи новых сообщений.

Автопартиционирование

Количество партиций топика и их пропускная способность задаются при создании топика и определяют общую пропускную способность топика на запись. Если максимальная требуемая скорость записи в топик неизвестна при его создании или будет меняться со временем, можно воспользоваться автопартиционированием для динамического масштабирования топика. Если на топике включено автопартиционирование вверх, то количество партиций в таком топике автоматически увеличивается при возрастании скорости записи (подробнее см. Режимы автопартиционирования).

Гарантии

  1. SDK и сервер обеспечивают гарантии записи exactly-once в случае разделения партиций. Это означает, что любое сообщение будет записано либо в родительскую партицию, либо в одну из дочерних. Сообщение не может быть записано одновременно и в родительскую, и в дочернюю партиции. Более того, сообщение не может быть записано в одну партицию несколько раз.
  2. SDK и сервер обеспечивают порядок чтения. Сначала данные будут вычитаны из родительских партиций, и только затем — из дочерних.
  3. Таким образом, гарантии exactly-once записи и порядка чтения продолжают выполняться для конкретного идентификатора источника (producer-id).

Режимы автопартиционирования

Для любого топика возможны следующие режимы автопартиционирования.

Выключено (DISABLED)

Автопартиционирование отключено. В этом случае число партиций остаётся неизменным, и автоматическое масштабирование не происходит.

Исходное количество партиции указывается при создании топика. При ручном изменении числа партиций в этом режиме добавляются новые партиции. Все ранее существовавшие партиции остаются активными.

Увеличение (UP)

На топике включено автопартиционирование «вверх», то есть при увеличении скорости записи количество партиций увеличивается. При уменьшении скорости записи количество партиций остаётся неизменным.

Алгоритм увеличения числа партиций: если в течение заданного времени скорость записи в какую-то партицию превышает указанный порог (в % от максимальной скорости записи в партицию), эта партиция делится на две. Исходная партиция становится неактивной, и из неё можно только читать данные. Когда истечёт срок хранения сообщений в этой партиции и все сообщения будут удалены, сама партиция также будет удалена. Две новые дочерние партиции становятся активными, и в них возможны как чтение, так и запись.

Пауза (PAUSED)

Автопартиционирование на топике приостановлено. Автоматическое увеличение числа партиций не происходит. При необходимости можно снова включить режим увеличения числа партиций.

Примеры YQL-запросов для перевода топика в различные режимы автопартиционирования можно посмотреть здесь.

Ограничения

При использовании автопартиционирования действуют следующие ограничения:

  1. Если на топике включено автопартиционирование, его нельзя отключить, можно только приостановить.
  2. Если на топике включено автопартиционирование, то запись или чтение в такой топик по протоколу Kafka API невозможны.
  3. Автопартиционирование не может быть включено на топике с режимом хранения по месту.

Источники и группы сообщений

Идентификатор источника, producer_id, и идентификатор группы сообщений, message_group_id, — это способ упорядочить набор сообщений. Порядок записанных сообщений сохраняется в пределах пар: <producer_id, message_group_id>.

При первом использовании пара <идентификатор источника, группа сообщений> привязывается к партиции топика по алгоритму round-robin и все сообщения с этой парой попадают в одну и ту же партицию. Привязка удаляется при отсутствии новых сообщений, использующих этот идентификатор источника, в течение 14 дней.

Важно

Рекомендуемое максимальное количество пар <producer_id, message_group_id> - до 100'000 на одну партицию за последние 14 дней.

Когда важен порядок обработки сообщений

Рассмотрим финансовое приложение, задача которого вычислять остаток на счете пользователя и разрешать либо запрещать списание средств.

Для решения подобных задач можно использовать очередь сообщений. При пополнении счета, снятии средств или совершении покупки в очередь записывается сообщение с идентификатором счета, суммой и типом операции. Приложение обрабатывает поступающие сообщения и вычисляет баланс.

Для правильного вычисления баланса важен порядок обработки сообщений. Если пользователь сначала пополняет счет, а затем совершает покупку, то и сообщения с информацией об этих операциях должны быть обработаны приложением в такой же последовательности. Иначе может произойти ошибка бизнес-логики, и, например, приложение отклонит покупку из-за нехватки средств. В очередях сообщений есть механизмы гарантированного порядка доставки, но они не могут обеспечить порядок сообщений внутри одной очереди на произвольных объемах данных.

Когда сообщения из потока читают несколько экземпляров приложения, сообщение о пополнении счета может получить один, а о списании — другой. В этом случае не существует экземпляра, который гарантированно содержит верную информацию о балансе. Для решения этой проблемы можно сохранять данные в СУБД, обмениваться информацией между экземплярами приложения, строить распределенный кеш и пр.

В YDB можно так записывать данные, чтобы сообщения от одного источника (например, сообщения о транзакциях по одному счету) приходили в один и тот же экземпляр приложения. Источник сообщения определяется идентификатором источника (source_id), а для защиты от дублей используется порядковый номер сообщения от источника. YDB так настраивает потоки данных, чтобы сообщения от одного источника попадают в одну и ту же партицию. Таким образом сообщения о транзакциях по заданному счету будут попадать всегда в одну и ту же партицию и обрабатываться экземпляром приложения, который связан с этим сегментом. Каждый из экземпляров обслуживает свое подмножество партиций и задача синхронизации между экземплярами не возникает.

Ниже приведен пример, когда все транзакции по счетам с четными идентификаторами передаются в первый экземпляр приложения, а с нечетными — во второй.

Если порядок обработки не важен

Для некоторых задач порядок обработки сообщений не критичен. Например, иногда важно просто доставить данные, а упорядочение выполнит система хранения.

Для таких случаев можно использовать упрощенный режим записи, называемый "записью без дедупликации". В этом режиме не нужно указывать идентификаторы источника сообщений ( producer_id или source_id ), и порядковые номера сообщений — sequence number. Запись без дедупликации работает быстрее и потребляет меньше ресурсов на сервере, но упорядочение и дедупликация сообщений на сервере не происходит. Это значит, что если отправить одно и то же сообщение повторно (например, при падении и последующем перезапуске пишущего процесса), оно может быть записано больше одного раза.

Важно

Настоятельно не рекомендуем использовать случайные или псевдослучайные идентификаторы источников. Рекомендуем использовать не более 100000 различных идентификаторов источников на одну партицию.

Идентификатор источника

Идентификатор источника является произвольной строкой длины до 2048 символов, обычно в качестве идентификатора источника используют идентификатор сервера с файлами или другой идентификатор.

Примеры идентификаторов источников

Тип Идентификатор Описание
Файл Идентификатор сервера Для хранения журналов работы приложений используют файлы. В этом случае удобно в качестве идентификатора источника использовать идентификатор сервера.
Действия пользователя Идентификатор класса действий пользователя: "просмотр страницы", "покупка" и тд. Важно обрабатывать действия пользователя в порядке их выполнения пользователем, при этом нет необходимости обрабатывать в одном приложении абсолютно все действия пользователя. В таком случае удобно действия пользователя группировать по классам.

Идентификатор группы сообщений

Идентификатор группы сообщений является произвольной строкой длины до 2048 символов, обычно в качестве идентификатора группы сообщений используют имя файла или идентификатор пользователя.

Примеры идентификаторов групп сообщений

Тип Идентификатор Описание
Файл Полный путь к файлу Все данные с сервера и файла на нем будут отправлены в одну партицию.
Действия пользователя Идентификатор пользователя Важно обрабатывать действия пользователя в порядке их выполнения, в этом случае в качестве идентификатора источника удобно использоваться идентификатор самого пользователя.

Порядковые номера сообщений

Все сообщения от одного источника имеют порядковый номер, sequence number, используемый для дедупликации. Порядковый номер сообщения должен монотонно возрастать в рамках пары топик, источник. При получении сервером сообщения с порядковым номером, меньшим или равным максимальному записанному по паре топик, источник, сообщение будет пропущено как дубликат. При этом допускается наличие пропусков в последовательности порядковых номеров сообщений. Порядковые номера сообщений должны быть уникальны только в пределах пары топик, источник.

Не используются, если выбран режим записи без дедупликации.

Примеры порядковых номеров сообщений

Тип Пример Описание
Файл Смещение передаваемых данных от начала в файле Нельзя удалять строки из начала файла, так как это приведет или пропуску части данных, как к дублям, либо к потере части данных.
Таблица базы данных Автоинкрементный идентификатор записи

Время хранения сообщений

Для каждого топика определено время хранения сообщений. После истечения времени хранения сообщения автоматически удаляются. Исключение составляют данные, которые еще не были прочитаны "важным" читателем — они будут храниться до тех пор, пока читатель их не прочитает.

Сжатие данных

При передаче приложение-писатель указывает, что сообщение может быть сжато одним из поддерживаемых кодеков. Название кодека передается при записи и сохраняется вместе с сообщением, а также возвращается на чтении. Сжатие сообщений происходит по каждому сообщению в отдельности, сжатие пакета сообщений не поддерживается. Операции сжатия-разжатия данных производятся на стороне приложений-читателей и -писателей.

Список поддерживаемых кодеков явно указывается в каждом топике. При попытке записи данных в топик с неподдерживаемым кодеком, это приведет к ошибке записи.

Кодек Описание
raw Без сжатия.
gzip Сжатие алгоритмом gzip.
lzop Сжатие алгоритмом lzop.
zstd Сжатие алгоритмом zstd.

Читатель

Читатель — это именованная сущность для чтения данных из топика. Читатель содержит позиции чтения, подтвержденные читателем по каждому топику, читаемого от его имени.

Позиция чтения

Позиция чтения — это сохраненное смещение читателя по каждой партиции топика. Позиция чтения сохраняется читателем после отправки подтверждения прочитанных данных. При установке новой сессии чтения сообщения поступают читателю начиная с сохраненной позиции чтения. Это позволяет пользователям не хранить позицию чтения на своей стороне.

Важный читатель

Читатель может обладать признаком "важный". Наличие этого признака означает, что сообщения в топике не будут удаляться до тех пор, пока читатель не прочитает и не подтвердит сообщения. Этот признак можно устанавливать для самых критичных читателей, которые должны обработать все данные даже при длительном простое.

Важно

Так как длительный простой важного читателя может привести к использованию всего свободного места хранения данных непрочитанными сообщениями, необходимо следить за отставанием чтения важных читателей

Протоколы для работы с топиками

Для работы с топиками используется YDB SDK (см. Работа с топиками).

Также ограниченно поддержан протокол Kafka API версии 3.4.0. (см. Работа с Kafka API).

Транзакции с участием топиков

YDB поддерживает работу с топиками в рамках транзакций.

Транзакционное чтение из топика

Данные в топиках не изменяются при чтении из топика. Поэтому при чтении в транзакции из топика непосредственно транзакционной операцией является только изменение смещения (offset). При транзакционном чтении через SDK не происходит коммита смещений. Отложенный коммит смещений происходит автоматически при коммите транзакции, SDK скрывает это от пользователя.

Транзакционная запись в топик

При транзакционной записи в топик данные до коммита сохраняются вне партиции, а затем публикуются (становятся видимыми) в момент коммита транзакции. При этом данные будут добавлены в конец партиции в последовательные смещения (offset). Видимости собственных изменений в топиках в транзакциях с участием топиков не предусмотрено.

Ограничения при работе с топиками в транзакции

Транзакции не накладывают дополнительных ограничений на работу с топиками. Внутри транзакции можно записывать большие объёмы данных в топик, писать в несколько партиций и читать несколькими консьюмерами.

Тем не менее рекомендуется выбирать режим работы с транзакциями, учитывая особенности транзакционной работы с топиками: данные публикуются в момент коммита транзакции. То есть если транзакция длительная, данные станут видимыми только спустя значительное время.