Работа с топиками

В этой статье приведены примеры использования YDB SDK для работы с топиками.

Перед выполнением примеров создайте топик и добавьте читателя.

Примеры работы с топиками

Инициализация соединения с топиками

Для работы с топиками создаются экземпляры драйвера YDB и клиента.

Драйвер YDB отвечает за взаимодействие приложения и YDB на транспортном уровне. Драйвер должен существовать на всем протяжении жизненного цикла работы с топиками и должен быть инициализирован перед созданием клиента.

Клиент сервиса топиков (исходный код) работает поверх драйвера YDB и отвечает за управляющие операции с топиками, а также создание сессий чтения и записи.

Фрагмент кода приложения для инициализации драйвера YDB:

// Create driver instance.
auto driverConfig = TDriverConfig()
    .SetEndpoint(opts.Endpoint)
    .SetDatabase(opts.Database)
    .SetAuthToken(GetEnv("YDB_TOKEN"));

TDriver driver(driverConfig);

В этом примере используется аутентификационный токен, сохранённый в переменной окружения YDB_TOKEN. Подробнее про соединение с БД и аутентификацию.

Фрагмент кода приложения для создания клиента:

TTopicClient topicClient(driver);

Для работы с топиками создаются экземпляры транспорта YDB и клиента.

Транспорт YDB отвечает за взаимодействие приложения и YDB на транспортном уровне. Он должен существовать на всем протяжении жизненного цикла работы с топиками и должен быть инициализирован перед созданием клиента.

Фрагмент кода приложения для инициализации транспорта YDB:

try (GrpcTransport transport = GrpcTransport.forConnectionString(connString)
        .withAuthProvider(CloudAuthHelper.getAuthProviderFromEnviron())
        .build()) {
    // Use YDB transport
}

В этом примере используется вспомогательный метод CloudAuthHelper.getAuthProviderFromEnviron(), получающий токен из переменных окружения.
Например, YDB_ACCESS_TOKEN_CREDENTIALS.
Подробнее про соединение с БД и аутентификацию.

Клиент сервиса топиков (исходный код) работает поверх транспорта YDB и отвечает как за управляющие операции с топиками, так и за создание писателей и читателей.

Фрагмент кода приложения для создания клиента:

try (TopicClient topicClient = TopicClient.newClient(transport)
              .setCompressionExecutor(compressionExecutor)
              .build()) {
  // Use topic client
}

В обоих примерах кода выше используется блок (try-with-resources).
Это позволяет автоматически закрывать клиент и транспорт при выходе из этого блока, т.к. оба являются наследниками AutoCloseable.

Управление топиками

Создание топика

Единственный обязательный параметр для создания топика - это его путь, остальные параметры опциональны.

Полный список настроек можно посмотреть в заголовочном файле.

Пример создания топика c тремя партициями и поддержкой кодека ZSTD:

auto settings = NYdb::NTopic::TCreateTopicSettings()
    .PartitioningSettings(3, 3)
    .AppendSupportedCodecs(NYdb::NTopic::ECodec::ZSTD);

auto status = topicClient
    .CreateTopic("my-topic", settings)  // returns TFuture<TStatus>
    .GetValueSync();

Полный список поддерживаемых параметров можно посмотреть в документации SDK.

Пример создания топика со списком поддерживаемых кодеков и минимальным количеством партиций

err := db.Topic().Create(ctx, "topic-path",
  // optional
  topicoptions.CreateWithSupportedCodecs(topictypes.CodecRaw, topictypes.CodecGzip),

  // optional
  topicoptions.CreateWithMinActivePartitions(3),
)

Пример создания топика со списком поддерживаемых кодеков и минимальным количеством партиций

driver.topic_client.create_topic(topic_path,
    supported_codecs=[ydb.TopicCodec.RAW, ydb.TopicCodec.GZIP], # optional
    min_active_partitions=3,                                    # optional
)

Полный список настроек можно посмотреть в коде SDK.

Пример создания топика со списком поддерживаемых кодеков и минимальным количеством партиций

topicClient.createTopic(topicPath, CreateTopicSettings.newBuilder()
                // Optional
                .setSupportedCodecs(SupportedCodecs.newBuilder()
                        .addCodec(Codec.RAW)
                        .addCodec(Codec.GZIP)
                        .build())
                // Optional
                .setPartitioningSettings(PartitioningSettings.newBuilder()
                        .setMinActivePartitions(3)
                        .build())
                .build());

Изменение топика

При изменении топика в параметрах метода AlterTopic нужно указать путь топика и параметры, которые будут изменяться. Изменяемые параметры представлены структурой TAlterTopicSettings.

Полный список настроек можно посмотреть в заголовочном файле.

Пример добавления важного читателя к топику и установки времени хранения сообщений для топика в два дня:

auto alterSettings = NYdb::NTopic::TAlterTopicSettings()
    .BeginAddConsumer("my-consumer")
        .Important(true)
    .EndAddConsumer()
    .SetRetentionPeriod(TDuration::Days(2));

auto status = topicClient
    .AlterTopic("my-topic", alterSettings)  // returns TFuture<TStatus>
    .GetValueSync();

При изменении топика в параметрах нужно указать путь топика и те параметры, которые будут изменяться.

Полный список поддерживаемых параметров можно посмотреть в документации SDK.

Пример добавления читателя к топику

err := db.Topic().Alter(ctx, "topic-path",
  topicoptions.AlterWithAddConsumers(topictypes.Consumer{
    Name:            "new-consumer",
    SupportedCodecs: []topictypes.Codec{topictypes.CodecRaw, topictypes.CodecGzip}, // optional
  }),
)

Функциональность находится в разработке.

При изменении топика в параметрах метода alterTopic нужно указать путь топика и параметры, которые будут изменяться.

Полный список настроек можно посмотреть в коде SDK.

topicClient.alterTopic(topicPath, AlterTopicSettings.newBuilder()
                .addAddConsumer(Consumer.newBuilder()
                        .setName("new-consumer")
                        .setSupportedCodecs(SupportedCodecs.newBuilder()
                                .addCodec(Codec.RAW)
                                .addCodec(Codec.GZIP)
                                .build())
                        .build())
                .build());

Получение информации о топике

Для получения информации о топике используется метод DescribeTopic.

Описание топика представлено структурой TTopicDescription.

Полный список полей описания смотри в заголовочном файле.

Получить доступ к этому описанию можно так:

auto result = topicClient.DescribeTopic("my-topic").GetValueSync();
if (result.IsSuccess()) {
    const auto& description = result.GetTopicDescription();
    std::cout << "Topic description: " << GetProto(description) << std::endl;
}

Существует отдельный метод для получения информации о читателе - DescribeConsumer.

  descResult, err := db.Topic().Describe(ctx, "topic-path")
if err != nil {
  log.Fatalf("failed describe topic: %v", err)
  return
}
fmt.Printf("describe: %#v\n", descResult)
info = driver.topic_client.describe_topic(topic_path)
print(info)

Для получения информации о топике используется метод describeTopic.

Полный список полей описания можно посмотреть в коде SDK.

Result<TopicDescription> topicDescriptionResult = topicClient.describeTopic(topicPath)
        .join();
TopicDescription description = topicDescriptionResult.getValue();

Удаление топика

Для удаления топика достаточно указать путь к нему.

auto status = topicClient.DropTopic("my-topic").GetValueSync();
  err := db.Topic().Drop(ctx, "topic-path")
driver.topic_client.drop_topic(topic_path)
topicClient.dropTopic(topicPath);

Запись сообщений

Подключение к топику для записи сообщений

На данный момент поддерживается подключение только с совпадающими идентификаторами источника и группы сообщений (producer_id и message_group_id), в будущем это ограничение будет снято.

Подключение к топику на запись представлено объектом сессии записи с интерфейсом IWriteSession или ISimpleBlockingWriteSession (вариант для простой записи по одному сообщению без подтверждения, блокирующейся при превышении числа inflight записей или размера буфера SDK). Настройки сессии записи представлены структурой TWriteSessionSettings, для варианта ISimpleBlockingWriteSession часть настроек не поддерживается.

Полный список настроек смотри в заголовочном файле.

Пример создания сессии записи с интерфейсом IWriteSession.

TString producerAndGroupID = "group-id";
auto settings = TWriteSessionSettings()
    .Path("my-topic")
    .ProducerId(producerAndGroupID)
    .MessageGroupId(producerAndGroupID);

auto session = topicClient.CreateWriteSession(settings);
producerAndGroupID := "group-id"
writer, err := db.Topic().StartWriter(producerAndGroupID, "topicName",
  topicoptions.WithMessageGroupID(producerAndGroupID),
)
if err != nil {
    return err
}
writer = driver.topic_client.writer(topic_path)

Инициализация настроек писателя:

String producerAndGroupID = "group-id";
WriterSettings settings = WriterSettings.newBuilder()
      .setTopicPath(topicPath)
      .setProducerId(producerAndGroupID)
      .setMessageGroupId(producerAndGroupID)
      .build();

Создание синхронного писателя:

SyncWriter writer = topicClient.createSyncWriter(settings);

После создания писателя его необходимо инициализировать. Для этого есть два метода:

  • init(): неблокирующий, запускает процесс инициализации в фоне и не ждёт его завершения.
    writer.init();
    
  • initAndWait(): блокирующий, запускает процесс инициализации и ждёт его завершения. Если в процессе инициализации возникла ошибка, будет брошено исключение.
    try {
        writer.initAndWait();
        logger.info("Init finished succsessfully");
    } catch (Exception exception) {
        logger.error("Exception while initializing writer: ", exception);
        return;
    }
    

Инициализация настроек писателя:

String producerAndGroupID = "group-id";
WriterSettings settings = WriterSettings.newBuilder()
      .setTopicPath(topicPath)
      .setProducerId(producerAndGroupID)
      .setMessageGroupId(producerAndGroupID)
      .build();

Создание и инициализация асинхронного писателя:

AsyncWriter writer = topicClient.createAsyncWriter(settings);

// Init in background
writer.init()
        .thenRun(() -> logger.info("Init finished successfully"))
        .exceptionally(ex -> {
            logger.error("Init failed with ex: ", ex);
            return null;
        });

Запись сообщений

Асинхронная запись возможна через интерфейс IWriteSession.

Работа пользователя с объектом IWriteSession в общем устроена как обработка цикла событий с тремя типами событий: TReadyToAcceptEvent, TAcksEvent и TSessionClosedEvent.

Для каждого из типов событий можно установить обработчик этого события, а также можно установить общий обработчик. Обработчики устанавливаются в настройках сессии записи перед её созданием.

Если обработчик для некоторого события не установлен, его необходимо получить и обработать в методах GetEvent / GetEvents. Для неблокирующего ожидания очередного события есть метод WaitEvent с интерфейсом TFuture<void>().

Для записи каждого сообщения пользователь должен "потратить" move-only объект TContinuationToken, который выдаёт SDK с событием TReadyToAcceptEvent. При записи сообщения можно установить пользовательские seqNo и временную метку создания, но по умолчанию их проставляет SDK автоматически.

По умолчанию Write выполняется асинхронно - данные из сообщений вычитываются и сохраняются во внутренний буфер, отправка происходит в фоне в соответствии с настройками MaxMemoryUsage, MaxInflightCount, BatchFlushInterval, BatchFlushSizeBytes. Сессия сама переподключается к YDB при обрывах связи и повторяет отправку сообщений пока это возможно, в соответствии с настройкой RetryPolicy. При получении ошибки, которую невозможно повторить, сессия чтения отправляет пользователю TSessionClosedEvent с диагностической информацией.

Так может выглядеть запись нескольких сообщений в цикле событий без использования обработчиков:

// Event loop
while (true) {
    // Get event
    // May block for a while if write session is busy
    TMaybe<TWriteSessionEvent::TEvent> event = session->GetEvent(/*block=*/true);

    if (auto* readyEvent = std::get_if<TWriteSessionEvent::TReadyToAcceptEvent>(&*event)) {
        session->Write(std::move(event.ContinuationToken), "This is yet another message.");

    } else if (auto* ackEvent = std::get_if<TWriteSessionEvent::TAcksEvent>(&*event)) {
        std::cout << ackEvent->DebugString() << std::endl;

    } else if (auto* closeSessionEvent = std::get_if<TSessionClosedEvent>(&*event)) {
        break;
    }
}

Для отправки сообщения - достаточно в поле Data сохранить Reader, из которого можно будет прочитать данные. Можно рассчитывать на то что данные каждого сообщения читаются один раз (или до первой ошибки), к моменту возврата из Write данные будут уже прочитаны и сохранены во внутренний буфер.

SeqNo и дата создания сообщений по умолчанию проставляются автоматически.

По умолчанию Write выполняется асинхронно - данные из сообщений вычитываются и сохраняются во внутренний буфер, отправка происходит в фоне. Writer сам переподключается к YDB при обрывах связи и повторяет отправку сообщений пока это возможно. При получении ошибки, которую невозможно повторить Writer останавливается и следующие вызовы Write будут завершаться с ошибкой.

err := writer.Write(ctx,
  topicwriter.Message{Data: strings.NewReader("1")},
  topicwriter.Message{Data: bytes.NewReader([]byte{1,2,3})},
  topicwriter.Message{Data: strings.NewReader("3")},
)
if err == nil {
  return err
}

Для отправки сообщений можно передавать как просто содержимое сообщения (bytes, str), так и вручную задавать некоторые свойства. Объекты можно передавать по одному или сразу в массиве (list). Метод write выполняется асинхронно. Возврат из метода происходит сразу после того как сообщения будут положены во внутренний буфер клиента, обычно это происходит быстро. Ожидание может возникнуть, если внутренний буфер уже заполнен и нужно подождать, пока часть данных будет отправлена на сервер.

# Простая отправка сообщений, без явного указания метаданных.
# Удобно начинать, удобно использовать пока важно только содержимое сообщения.
writer = driver.topic_client.writer(topic_path)
writer.write("mess")  # Строки будут переданы в кодировке utf-8, так удобно отправлять
                      # текстовые сообщения.
writer.write(bytes([1, 2, 3]))  # Эти байты будут отправлены "как есть", так удобно отправлять
                                # бинарные данные.
writer.write(["mess-1", "mess-2"])  # Здесь за один вызов отправляется несколько сообщений —
                                    # так снижаются накладные расходы на внутренние процессы SDK,
                                    # имеет смысл при большом потоке сообщений.

# Полная форма, используется, когда кроме содержимого сообщения нужно вручную задать и его свойства.
writer = driver.topic_client.writer(topic="topic-path", auto_seqno=False, auto_created_at=False)

writer.write(ydb.TopicWriterMessage("asd", seqno=123, created_at=datetime.datetime.now()))
writer.write(ydb.TopicWriterMessage(bytes([1, 2, 3]), seqno=124, created_at=datetime.datetime.now()))

# В полной форме так же можно отправлять несколько сообщений за один вызов функции.
# Это имеет смысл при большом потоке отправляемых сообщений — для снижения
# накладных расходов на внутренние вызовы SDK.
writer.write([
  ydb.TopicWriterMessage("asd", seqno=123, created_at=datetime.datetime.now()),
  ydb.TopicWriterMessage(bytes([1, 2, 3]), seqno=124, created_at=datetime.datetime.now(),
  ])

Метод send блокирует управление, пока сообщение не будет помещено в очередь отправки.
Попадание сообщения в эту очередь означает, что писатель сделает всё возможное для доставки сообщения.
Например, если сессия записи по какой-то причине оборвётся, писатель переустановит соединение и попробует отправить это сообщение на новой сессии.
Но попадание сообщения в очередь отправки не гарантирует того, что сообщение в итоге будет записано.
Например, могут возникать ошибки, приводящие к завершению работы писателя до того, как сообщения из очереди будут отправлены.
Если нужно подтверждение успешной записи для каждого сообщения, используйте асинхронного писателя и проверяйте статус, возвращаемый методом send.

writer.send(Message.of("11".getBytes()));

long timeoutSeconds = 5; // How long should we wait for a message to be put into sending buffer
try {
    writer.send(
            Message.newBuilder()
                    .setData("22".getBytes())
                    .setCreateTimestamp(Instant.now().minusSeconds(5))
                    .build(),
            timeoutSeconds,
            TimeUnit.SECONDS
    );
} catch (TimeoutException exception) {
    logger.error("Send queue is full. Couldn't put message into sending queue within {} seconds", timeoutSeconds);
} catch (InterruptedException | ExecutionException exception) {
    logger.error("Couldn't put the message into sending queue due to exception: ", exception);
}

Метод send в асинхронном клиенте неблокирующий. Помещает сообщение в очередь отправки.
Метод возвращает CompletableFuture<WriteAck>, позволяющую проверить, действительно ли сообщение было записано.
В случае, если очередь переполнена, будет брошено исключение QueueOverflowException.
Это способ сигнализировать пользователю о том, что поток записи следует притормозить.
В таком случае стоит или пропускать сообщения, или выполнять повторные попытки записи через exponential backoff.
Также можно увеличить размер клиентского буфера (setMaxSendBufferMemorySize), чтобы обрабатывать больший объем сообщений перед тем, как он заполнится.

try {
    // Non-blocking. Throws QueueOverflowException if send queue is full
    writer.send(Message.of("33".getBytes()));
} catch (QueueOverflowException exception) {
    // Send queue is full. Need to retry with backoff or skip
}

Запись сообщений с подтверждением о сохранении на сервере

Получение подтверждений от сервера возможно через интерфейс IWriteSession.

Ответы о записи сообщений на сервере приходят клиенту SDK в виде событий TAcksEvent. В одном событии могут содержаться ответы о нескольких отправленных ранее сообщениях. Варианты ответа: запись подтверждена (EES_WRITTEN), запись отброшена как дубликат ранее записанного сообщения (EES_ALREADY_WRITTEN) или запись отброшена по причине сбоя (EES_DISCARDED).

Пример установки обработчика TAcksEvent для сессии записи:

auto settings = TWriteSessionSettings()
  // other settings are set here
  .EventHandlers(
    TWriteSessionSettings::TEventHandlers()
      .AcksHandler(
        [&](TWriteSessionEvent::TAcksEvent& event) {
          for (const auto& ack : event.Acks) {
            if (ack.State == TWriteAck::EEventState::EES_WRITTEN) {
              ackedSeqNo.insert(ack.SeqNo);
              std::cout << "Acknowledged message with seqNo " << ack.SeqNo << std::endl;
            }
          }
        }
      )
  );

auto session = topicClient.CreateWriteSession(settings);

В такой сессии записи события TAcksEvent не будут приходить пользователю в GetEvent / GetEvents, вместо этого SDK при получении подтверждений от сервера будет вызывать переданный обработчик. Аналогично можно настраивать обработчики на остальные типы событий.

При подключении можно указать опцию синхронной записи сообщений - topicoptions.WithSyncWrite(true). Тогда Write будет возвращаться только после того как получит подтверждение с сервера о сохранении всех, сообщений переданных в вызове. При этом SDK так же как и обычно будет при необходимости переподключаться и повторять отправку сообщений. В этом режиме контекст управляет только временем ожидания ответа из SDK, т.е. даже после отмены контекста SDK продолжит попытки отправить сообщения.


producerAndGroupID := "group-id"
writer, _ := db.Topic().StartWriter(producerAndGroupID, "topicName",
  topicoptions.WithMessageGroupID(producerAndGroupID),
  topicoptions.WithSyncWrite(true),
)

err = writer.Write(ctx,
  topicwriter.Message{Data: strings.NewReader("1")},
  topicwriter.Message{Data: bytes.NewReader([]byte{1,2,3})},
  topicwriter.Message{Data: strings.NewReader("3")},
)
if err == nil {
  return err
}

Есть два способа получить подтверждение о записи сообщений на сервере:

  • flush() — дожидается подтверждения для всех сообщений, записанных ранее во внутренний буфер.
  • write_with_ack(...) — отправляет сообщение и ждет подтверждение его доставки от сервера. При отправке нескольких сообщений подряд это способ работает медленно.
# Положить несколько сообщений во внутренний буфер, затем дождаться,
# пока все они будут доставлены до сервера.
for mess in messages:
    writer.write(mess)

writer.flush()

# Можно отправить несколько сообщений и дождаться подтверждения на всю группу.
writer.write_with_ack(["mess-1", "mess-2"])

# Ожидание при отправке каждого сообщения — этот метод вернет результат только после получения
# подтверждения от сервера.
# Это самый медленный вариант отправки сообщений, используйте его только если такой режим
# действительно нужен.
writer.write_with_ack("message")

Метод send возвращает CompletableFuture<WriteAck>. Её успешное завершение означает подтверждение записи сервером.
В структуре WriteAck содержится информация о seqNo, offset и статусе записи:

writer.send(Message.of(message))
        .whenComplete((result, ex) -> {
            if (ex != null) {
                logger.error("Exception on writing message message: ", ex);
            } else {
                switch (result.getState()) {
                    case WRITTEN:
                        WriteAck.Details details = result.getDetails();
                        StringBuilder str = new StringBuilder("Message was written successfully");
                        if (details != null) {
                            str.append(", offset: ").append(details.getOffset());
                        }
                        logger.debug(str.toString());
                        break;
                    case ALREADY_WRITTEN:
                        logger.warn("Message has already been written");
                        break;
                    default:
                        break;
                }
            }
        });

Выбор кодека для сжатия сообщений

Подробнее о сжатии данных в топиках.

Сжатие, которое используется при отправке сообщений методом Write, задаётся при создании сессии записи настройками Codec и CompressionLevel. По умолчанию выбирается кодек GZIP.
Пример создания сессии записи без сжатия сообщений:

auto settings = TWriteSessionSettings()
  // other settings are set here
  .Codec(ECodec::RAW);

auto session = topicClient.CreateWriteSession(settings);

Если необходимо в рамках сессии записи отправить сообщение, сжатое другим кодеком, можно использовать метод WriteEncoded с указанием кодека и размера расжатого сообщения. Для успешной записи этим способом используемый кодек должен быть разрешён в настройках топика.

По умолчанию SDK выбирает кодек автоматически (с учетом настроек топика). В автоматическом режиме SDK сначала отправляет по одной группе сообщений каждым из разрешенных кодеков, затем иногда будет пробовать сжать сообщения всеми доступными кодеками и выбирать кодек, дающий наименьший размер сообщения. Если для топика список разрешенных кодеков пуст, то автовыбор производится между Raw и Gzip-кодеками.

При необходимости можно задать фиксированный кодек в опциях подключения. Тогда будет использоваться именно он и замеры проводиться не будут.

producerAndGroupID := "group-id"
writer, _ := db.Topic().StartWriter(producerAndGroupID, "topicName",
  topicoptions.WithMessageGroupID(producerAndGroupID),
  topicoptions.WithCodec(topictypes.CodecGzip),
)

По умолчанию SDK выбирает кодек автоматически (с учетом настроек топика). В автоматическом режиме SDK сначала отправляет по одной группе сообщений каждым из разрешенных кодеков, затем иногда будет пробовать сжать сообщения всеми доступными кодеками и выбирать кодек, дающий наименьший размер сообщения. Если для топика список разрешенных кодеков пуст, то автовыбор производится между Raw и Gzip-кодеками.

При необходимости можно задать фиксированный кодек в опциях подключения. Тогда будет использоваться именно он и замеры проводиться не будут.

writer = driver.topic_client.writer(topic_path,
    codec=ydb.TopicCodec.GZIP,
)
String producerAndGroupID = "group-id";
WriterSettings settings = WriterSettings.newBuilder()
        .setTopicPath(topicPath)
        .setProducerId(producerAndGroupID)
        .setMessageGroupId(producerAndGroupID)
        .setCodec(Codec.ZSTD)
        .build();

Запись сообщений без дедупликации

Подробнее о записи без дедупликации — в соответствующем разделе концепций.

Если в настройках сессии записи не указывается опция ProducerId, будет создана сессия записи без дедупликации.
Пример создания такой сессии записи:

auto settings = TWriteSessionSettings()
    .Path(myTopicPath);

auto session = topicClient.CreateWriteSession(settings);

Для включения дедупликации нужно в настройках сессии записи указать опцию ProducerId или явно включить дедупликацию, вызвав метод EnableDeduplication(), например, как в секции "Подключение к топику".

Запись метаданных на уровне сообщения

При записи сообщения можно дополнительно указать метаданные как список пар "ключ-значение". Эти данные будут доступны при вычитывании сообщения.
Ограничение на размер метаданных — не более 1000 ключей.

Воспользоваться функцией записи метаданных можно с помощью метода Write(), принимающего TWriteMessage объект:

auto settings = TWriteSessionSettings()
    .Path(myTopicPath)
//set all oter settings;
;

auto session = topicClient.CreateWriteSession(settings);

TMaybe<TWriteSessionEvent::TEvent> event = session->GetEvent(/*block=*/true);
TWriteMessage message("This is yet another message").MessageMeta({
    {"meta-key", "meta-value"},
    {"another-key", "value"}
});

if (auto* readyEvent = std::get_if<TWriteSessionEvent::TReadyToAcceptEvent>(&*event)) {
    session->Write(std::move(event.ContinuationToken), std::move(message));
}

При конструировании сообщения для записи с помощью Builder'а, ему можно передать объекты типа MetadataItem с парой ключ типа String + значение типа byte[].

Можно передать сразу List таких объектов:

List<MetadataItem> metadataItems = Arrays.asList(
        new MetadataItem("meta-key", "meta-value".getBytes()),
        new MetadataItem("another-key", "value".getBytes())
);
writer.send(
        Message.newBuilder()
                .setMetadataItems(metadataItems)
                .build()
);

Или добавлять каждый MetadataItem отдельно:

writer.send(
        Message.newBuilder()
                .addMetadataItem(new MetadataItem("meta-key", "meta-value".getBytes()))
                .addMetadataItem(new MetadataItem("another-key", "value".getBytes()))
                .build()
);

При чтении эти метаданные сообщения получить, вызвав на нём метод getMetadataItems():

Message message = reader.receive();
List<MetadataItem> metadata = message.getMetadataItems();

Запись в транзакции

Пример на GitHub

В настройках SendSettings метода send можно указать транзакцию.
Тогда сообщение будет записано вместе с коммитом этой транзакцией.

// creating a session in the table service
Result<Session> sessionResult = tableClient.createSession(Duration.ofSeconds(10)).join();
if (!sessionResult.isSuccess()) {
    logger.error("Couldn't get a session from the pool: {}", sessionResult);
    return; // retry or shutdown
}
Session session = sessionResult.getValue();
// creating a transaction in the table service
// this transaction is not yet active and has no id
TableTransaction transaction = session.createNewTransaction(TxMode.SERIALIZABLE_RW);

// get message text within the transaction
Result<DataQueryResult> dataQueryResult = transaction.executeDataQuery("SELECT \"Hello, world!\";")
        .join();
if (!dataQueryResult.isSuccess()) {
    logger.error("Couldn't execute DataQuery: {}", dataQueryResult);
    return; // retry or shutdown
}
// now the transaction is active and has an id

ResultSetReader rsReader = dataQueryResult.getValue().getResultSet(0);
byte[] message;
if (rsReader.next()) {
    message = rsReader.getColumn(0).getBytes();
} else {
    return; // retry or shutdown
}

writer.send(
        Message.of(message),
        SendSettings.newBuilder()
                .setTransaction(transaction)
                .build()
);

// flush to wait until all messages reach server before commit
writer.flush();

Status commitStatus = transaction.commit().join();
analyzeCommitStatus(commitStatus);

Примечание

Требования к транзакции:

  • Это должна быть активная (имеющая идентификатор) транзакция в одном из сервисов YDB. Например, Table или Query.

  • В Topic Service поддерживается только уровень изоляции транзакций SERIALIZABLE_RW.

Пример на GitHub

В настройках SendSettings метода send можно указать транзакцию.
Тогда сообщение будет записано вместе с коммитом этой транзакцией.

// creating a session in the table service
Result<Session> sessionResult = tableClient.createSession(Duration.ofSeconds(10)).join();
if (!sessionResult.isSuccess()) {
    logger.error("Couldn't get a session from the pool: {}", sessionResult);
    return; // retry or shutdown
}
Session session = sessionResult.getValue();
// creating a transaction in the table service
// this transaction is not yet active and has no id
TableTransaction transaction = session.createNewTransaction(TxMode.SERIALIZABLE_RW);

// get message text within the transaction
Result<DataQueryResult> dataQueryResult = transaction.executeDataQuery("SELECT \"Hello, world!\";")
        .join();
if (!dataQueryResult.isSuccess()) {
    logger.error("Couldn't execute DataQuery: {}", dataQueryResult);
    return; // retry or shutdown
}
// now the transaction is active and has an id

ResultSetReader rsReader = dataQueryResult.getValue().getResultSet(0);
byte[] message;
if (rsReader.next()) {
    message = rsReader.getColumn(0).getBytes();
} else {
    return; // retry or shutdown
}

try {
    writer.send(Message.newBuilder()
                            .setData(message)
                            .build(),
                    SendSettings.newBuilder()
                            .setTransaction(transaction)
                            .build())
            .whenComplete((result, ex) -> {
                if (ex != null) {
                    logger.error("Exception while sending a message: ", ex);
                } else {
                    switch (result.getState()) {
                        case WRITTEN:
                            WriteAck.Details details = result.getDetails();
                            logger.info("Message was written successfully, offset: " + details.getOffset());
                            break;
                        case ALREADY_WRITTEN:
                            logger.info("Message has already been written");
                            break;
                        default:
                            break;
                    }
                }
            })
            // Waiting for the message to reach the server before committing the transaction
            .join();

    Status commitStatus = transaction.commit().join();
    analyzeCommitStatus(commitStatus);
} catch (QueueOverflowException exception) {
    logger.error("Queue overflow exception while sending a message{}: ", index, exception);
    // Send queue is full. Need to retry with backoff or skip
}

Примечание

Требования к транзакции:

  • Это должна быть активная (имеющая идентификатор) транзакция в одном из сервисов YDB. Например, Table или Query.

  • В Topic Service поддерживается только уровень изоляции транзакций SERIALIZABLE_RW.

Чтение сообщений

Подключение к топику для чтения сообщений

Для чтения сообщений из топика необходимо наличие заранее созданного Consumer, связанного с этим топиком.
Создать Consumer можно при создании или изменении топика.
У топика может быть несколько Consumer'ов и для каждого из них сервер хранит свой прогресс чтения.

Подключение для чтения из одного или нескольких топиков представлено объектом сессии чтения с интерфейсом IReadSession. Настройки сессии чтения представлены структурой TReadSessionSettings.

Полный список настроек смотри в заголовочном файле.

Чтобы создать подключение к существующему топику my-topic через добавленного ранее читателя my-consumer, используйте следующий код:

auto settings = TReadSessionSettings()
    .ConsumerName("my-consumer")
    .AppendTopics("my-topic");

auto session = topicClient.CreateReadSession(settings);

Чтобы создать подключение к существующему топику my-topic через добавленного ранее читателя my-consumer, используйте следующий код:

reader, err := db.Topic().StartReader("my-consumer", topicoptions.ReadTopic("my-topic"))
if err != nil {
    return err
}

Чтобы создать подключение к существующему топику my-topic через добавленного ранее читателя my-consumer, используйте следующий код:

reader = driver.topic_client.reader(topic="topic-path", consumer="consumer_name")

Инициализация настроек читателя

ReaderSettings settings = ReaderSettings.newBuilder()
        .setConsumerName(consumerName)
        .addTopic(TopicReadSettings.newBuilder()
                .setPath(topicPath)
                .setReadFrom(Instant.now().minus(Duration.ofHours(24))) // Optional
                .setMaxLag(Duration.ofMinutes(30)) // Optional
                .build())
        .build();

Создание синхронного читателя

SyncReader reader = topicClient.createSyncReader(settings);

После создания синхронного читателя необходимо инициализировать. Для этого следует воспользоваться одним их двух методов:

  • init(): неблокирующий, запускает процесс инициализации в фоне и не ждёт его завершения.
    reader.init();
    
  • initAndWait(): блокирующий, запускает процесс инициализации и ждёт его завершения. Если в процессе инициализации возникла ошибка, будет брошено исключение.
    try {
        reader.initAndWait();
        logger.info("Init finished succsessfully");
    } catch (Exception exception) {
        logger.error("Exception while initializing reader: ", exception);
        return;
    }
    

Инициализация настроек читателя

ReaderSettings settings = ReaderSettings.newBuilder()
        .setConsumerName(consumerName)
        .addTopic(TopicReadSettings.newBuilder()
                .setPath(topicPath)
                .setReadFrom(Instant.now().minus(Duration.ofHours(24))) // Optional
                .setMaxLag(Duration.ofMinutes(30)) // Optional
                .build())
        .build();

Для асинхронного читателя, помимо общих настроек чтения ReaderSettings, понадобятся настройки обработчика событий ReadEventHandlersSettings, в которых необходимо передать экземпляр наследника ReadEventHandler.
Он будет описывать, как должна происходить обработка различных событий, происходящих во время чтения.

ReadEventHandlersSettings handlerSettings = ReadEventHandlersSettings.newBuilder()
        .setEventHandler(new Handler())
        .build();

Опционально, в ReadEventHandlersSettings можно указать executor'а, на котором будет происходить обработка сообщений.
Для реализации объекта-наследника ReadEventHandler можно воспользоваться дефолтным абстрактным классом AbstractReadEventHandler.
Достаточно переопределить метод onMessages, отвечающий за обработку самих сообщений. Пример реализации:

private class Handler extends AbstractReadEventHandler {
    @Override
    public void onMessages(DataReceivedEvent event) {
        for (Message message : event.getMessages()) {
            StringBuilder str = new StringBuilder();
            logger.info("Message received. SeqNo={}, offset={}", message.getSeqNo(), message.getOffset());

            process(message);

            message.commit().thenRun(() -> {
                logger.info("Message committed");
            });
        }
    }
}

Создание и инициализация асинхронного читателя:

AsyncReader reader = topicClient.createAsyncReader(readerSettings, handlerSettings);
// Init in background
reader.init()
        .thenRun(() -> logger.info("Init finished successfully"))
        .exceptionally(ex -> {
            logger.error("Init failed with ex: ", ex);
            return null;
        });

Вы также можете использовать расширенный вариант создания подключения, чтобы указать несколько топиков и задать параметры чтения. Следующий код создаст подключение к топикам my-topic и my-specific-topic через читателя my-consumer:

auto settings = TReadSessionSettings()
    .ConsumerName("my-consumer")
    .AppendTopics("my-topic")
    .AppendTopics(
        TTopicReadSettings("my-specific-topic")
            .ReadFromTimestamp(someTimestamp)
    );

auto session = topicClient.CreateReadSession(settings);
reader, err := db.Topic().StartReader("my-consumer", []topicoptions.ReadSelector{
    {
        Path: "my-topic",
    },
    {
        Path:       "my-specific-topic",
        ReadFrom:   time.Date(2022, 7, 1, 10, 15, 0, 0, time.UTC),
    },
    },
)
if err != nil {
    return err
}

Также в примере выше задаётся время, с которого следует начинать читать сообщения.

Функциональность находится в разработке.

ReaderSettings settings = ReaderSettings.newBuilder()
        .setConsumerName(consumerName)
        .addTopic(TopicReadSettings.newBuilder()
                .setPath("my-topic")
                .build())
        .addTopic(TopicReadSettings.newBuilder()
                .setPath("my-specific-topic")
                .setReadFrom(Instant.now().minus(Duration.ofHours(24))) // Optional
                .setMaxLag(Duration.ofMinutes(30)) // Optional
                .build())
        .build();

Чтение сообщений

Сервер хранит позицию чтения сообщений. После вычитывания очередного сообщения клиент может отправить на сервер подтверждение обработки. Позиция чтения изменится, а при новом подключении будут вычитаны только неподтвержденные сообщения.

Читать сообщения можно и без подтверждения обработки. В этом случае при новом подключении будут прочитаны все неподтвержденные сообщения, в том числе и уже обработанные.

Информацию о том, какие сообщения уже обработаны, можно сохранять на клиентской стороне, передавая на сервер стартовую позицию чтения при создании подключения. При этом позиция чтения сообщений на сервере не изменяется.

Можно использовать транзакции. В этом случае позиция чтения изменится при подтверждении транзакции. При новом подключении будут прочитаны все неподтверждённые сообщения.

Работа пользователя с объектом IReadSession в общем устроена как обработка цикла событий со следующими типами событий: TDataReceivedEvent, TCommitOffsetAcknowledgementEvent, TStartPartitionSessionEvent, TStopPartitionSessionEvent, TPartitionSessionStatusEvent, TPartitionSessionClosedEvent и TSessionClosedEvent.

Для каждого из типов событий можно установить обработчик этого события, а также можно установить общий обработчик. Обработчики устанавливаются в настройках сессии записи перед её созданием.

Если обработчик для некоторого события не установлен, его необходимо получить и обработать в методах GetEvent / GetEvents. Для неблокирующего ожидания очередного события есть метод WaitEvent с сигнатурой TFuture<void>().

SDK получает данные с сервера партиями и буферизирует их. В зависимости от задач клиентский код может читать сообщения из буфера по одному или пакетами.

SDK получает данные с сервера партиями и буферизирует их. В зависимости от задач клиентский код может читать сообщения из буфера по одному или пакетами.

SDK получает данные с сервера партиями и буферизирует их. В зависимости от задач клиентский код может читать сообщения из буфера по одному или пакетами.

Чтение без подтверждения обработки сообщений

Чтение сообщений по одному

Чтение сообщений по одному в C++ SDK не предусмотрено. Событие TDataReceivedEvent содержит пакет сообщений.

func SimpleReadMessages(ctx context.Context, r *topicreader.Reader) error {
    for {
        mess, err := r.ReadMessage(ctx)
        if err != nil {
            return err
        }
        processMessage(mess)
    }
}
while True:
    message = reader.receive_message()
    process(message)

Чтобы читать сообщения без подтверждения обработки, по одному, используйте следующий код:

while(true) {
    Message message = reader.receive();
    process(message);
}

В асинхронном клиенте нет возможности читать сообщения по одному.

Чтение сообщений пакетом

При установке сессии чтения с настройкой SimpleDataHandlers достаточно передать обработчик для сообщений с данными. SDK будет вызывать этот обработчик на каждый принятый от сервера пакет сообщений. Подтверждения чтения по умолчанию отправляться не будут.

auto settings = TReadSessionSettings()
    .EventHandlers_.SimpleDataHandlers(
        [](TReadSessionEvent::TDataReceivedEvent& event) {
            std::cout << "Get data event " << DebugString(event);
        }
    );

auto session = topicClient.CreateReadSession(settings);

// Wait SessionClosed event.
ReadSession->GetEvent(/* block = */true);

В этом примере после создания сессии основной поток дожидается завершения сессии со стороны сервера в методе GetEvent, другие типы событий приходить не будут.

func SimpleReadBatches(ctx context.Context, r *topicreader.Reader) error {
    for {
        batch, err := r.ReadMessageBatch(ctx)
        if err != nil {
            return err
        }
        processBatch(batch)
    }
}
while True:
  batch = reader.receive_batch()
  process(batch)

В синхронном клиенте нет возможности прочитать сразу пакет сообщений.

Чтобы прочитать пакет сообщений без подтверждения обработки, используйте следующий код:

private class Handler extends AbstractReadEventHandler {
    @Override
    public void onMessages(DataReceivedEvent event) {
        for (Message message : event.getMessages()) {
            process(message);
        }
    }
}

Чтение с подтверждением обработки сообщений

Подтверждение обработки сообщения (коммит) - сообщает серверу, что сообщение из топика обработано получателем и больше его отправлять не нужно. При использовании чтения с подтверждением нужно подтверждать все полученные сообщения без пропуска. Коммит сообщений на сервере происходит после подтверждения очередного интервала сообщений «без дырок», сами подтверждения при этом можно отправлять в любом порядке.

Например с сервера пришли сообщения 1, 2, 3. Программа обрабатывает их параллельно и отправляет подтверждения в таком порядке: 1, 3, 2. В этом случае сначала будет закоммичено сообщение 1, а сообщения 2 и 3 будут закоммичены только после того как сервер получит подтверждение об обработке сообщения 2.

В случае ошибки на коммите сообщения можно написать эту ошибку в лог и продолжить работу. Состояние сообщения в этой точке неизвестно. Сообщение могло закоммититься, а потом возникла сетевая ошибка и клиент не получил подтверждения. Если сообщение не закоммитилось, то оно будет прочитано ещё раз и снова поступит в обработку (может быть на другом читателе). Ретраить именно коммит смысла нет, т.к. сессия чтения этого сообщения уже потеряна.

Чтение сообщений по одному с подтверждением

Чтение сообщений по одному в C++ SDK не предусмотрено. Событие TDataReceivedEvent содержит пакет сообщений.

func SimpleReadMessages(ctx context.Context, r *topicreader.Reader) error {
    for {
      mess, err := r.ReadMessage(ctx)
      if err != nil {
          return err
      }
      processMessage(mess)
      r.Commit(mess.Context(), mess)
    }
}
while True:
    message = reader.receive_message()
    process(message)
    reader.commit(message)

Для подтверждения обработки сообщения достаточно вызвать у сообщения метод commit.
Актуально как для синхронного, так и для асинхронного читателя.
В асинхронном читателе, при обработке пакета сообщений, можно вызвать commit или у всего пакета сразу, или у каждого сообщения отдельно.
Этот метод возвращает CompletableFuture<Void>, успешное выполнение которой означает подтверждение обработки сервером.
В случае ошибки коммита не следует пытаться его ретраить. Скорее всего, ошибка вызвана закрытием сессии.
Читатель (необязательно этот же) сам создаст новую сессию для этой партиции и сообщение будет прочитано снова.

message.commit()
       .whenComplete((result, ex) -> {
           if (ex != null) {
               // Read session was probably closed, there is nothing we can do here.
               // Do not retry this commit on the same event.
               logger.error("exception while committing message: ", ex);
           } else {
               logger.info("message committed successfully");
           }
       });

Чтение сообщений пакетом с подтверждением

Аналогично примеру выше, при установке сессии чтения с настройкой SimpleDataHandlers достаточно передать обработчик для сообщений с данными. SDK будет вызывать этот обработчик на каждый принятый от сервера пакет сообщений. Передача параметра commitDataAfterProcessing = true означает, что SDK будет отправлять на сервер подтверждения чтения всех сообщений после выполнения обработчика.

auto settings = TReadSessionSettings()
    .EventHandlers_.SimpleDataHandlers(
        [](TReadSessionEvent::TDataReceivedEvent& event) {
            std::cout << "Get data event " << DebugString(event);
        }
        , /* commitDataAfterProcessing = */true
    );

auto session = topicClient.CreateReadSession(settings);

// Wait SessionClosed event.
ReadSession->GetEvent(/* block = */true);
func SimpleReadMessageBatch(ctx context.Context, r *topicreader.Reader) error {
    for {
      batch, err := r.ReadMessageBatch(ctx)
      if err != nil {
          return err
      }
      processBatch(batch)
      r.Commit(batch.Context(), batch)
    }
}
while True:
  batch = reader.receive_batch()
  process(batch)
  reader.commit(batch)

Неактуально, т.к. в синхронном читателе нет возможности читать сообщения пакетами.

В обработчике onMessage можно закоммитить весь пакет сообщений, вызвав commit на событии.

@Override
public void onMessages(DataReceivedEvent event) {
    for (Message message : event.getMessages()) {
        process(message);
    }
    event.commit()
           .whenComplete((result, ex) -> {
               if (ex != null) {
                   // Read session was probably closed, there is nothing we can do here.
                   // Do not retry this commit on the same message.
                   logger.error("exception while committing message batch: ", ex);
               } else {
                   logger.info("message batch committed successfully");
               }
           });
}

Чтение с хранением позиции на клиентской стороне

Вместо коммитов сообщений на сервер можно хранить прогресс чтения самостоятельно. В этом случае нужно передать в SDK обработчик, который будет вызываться при старте чтения каждой партиции. В этом обработчике нужно будет
указать позицию, с которой нужно начинать чтение этой партиции.

При обработке событий TStartPartitionSessionEvent можно при ответе серверу задать позицию, с которой следует начинать чтение.
Для этого в метод Confirm следует передать параметр readOffset.
Допольнительно можно передать параметр commitOffset, который укажет позицию, сообщения до которой следует считать закоммиченными.

Пример установки обработчика:

settings.EventHandlers_.StartPartitionSessionHandler(
    [](TReadSessionEvent::TStartPartitionSessionEvent& event) {
        auto readFromOffset = GetOffsetToReadFrom(event.GetPartitionId());
        event.Confirm(readFromOffset);
    }
);

Здесь GetOffsetToReadFrom - это часть примера, а не SDK. Используйте свой способ определить требуемую стартовую позицию чтения для партиции с данным partition id.

Также в TReadSessionSettings поддерживается настройка ReadFromTimestamp для чтения событий с отметками времени записи не меньше данной. Эта настройка предполагается не для точного позиционирования старта, а для пропуска объёма данных за большой интервал времени. Несколько первых полученных сообщений могут иметь отметки времени записи меньше указанной.

func ReadWithExplicitPartitionStartStopHandlerAndOwnReadProgressStorage(ctx context.Context, db ydb.Connection) error {
    readContext, stopReader := context.WithCancel(context.Background())
    defer stopReader()

    readStartPosition := func(
        ctx context.Context,
        req topicoptions.GetPartitionStartOffsetRequest,
    ) (res topicoptions.GetPartitionStartOffsetResponse, err error) {
        offset, err := readLastOffsetFromDB(ctx, req.Topic, req.PartitionID)
        res.StartFrom(offset)

        // Reader will stop if return err != nil
        return res, err
    }

    r, err := db.Topic().StartReader("my-consumer", topicoptions.ReadTopic("my-topic"),
        topicoptions.WithGetPartitionStartOffset(readStartPosition),
    )
    if err != nil {
        return err
    }

    go func() {
        <-readContext.Done()
        _ = r.Close(ctx)
    }()

    for {
        batch, err := r.ReadMessageBatch(readContext)
        if err != nil {
            return err
        }

        processBatch(batch)
        _ = externalSystemCommit(batch.Context(), batch.Topic(), batch.PartitionID(), batch.EndOffset())
    }
}

Функциональность находится в разработке.

Чтение с заданного оффсета в Java возможно только в асинхронном читателе.
В обработчике событий StartPartitionSessionEvent можно при ответе серверу задать позицию, с которой следует начинать чтение.
Для этого в метод confirm следует передать настройки StartPartitionSessionSettings с указанным оффсетом через setReadOffset.
Также вызовом setCommitOffset можно указать оффсет, который следует считать закоммиченным.

@Override
public void onStartPartitionSession(StartPartitionSessionEvent event) {
    event.confirm(StartPartitionSessionSettings.newBuilder()
            .setReadOffset(lastReadOffset) // Long
            .setCommitOffset(lastCommitOffset) // Long
            .build());
}

Также поддерживается настройка читателя setReadFrom для чтения событий с отметками времени записи не меньше данной.

Чтение без указания Consumer'а

Обычно прогресс чтения топика сохраняется на сервере в каждом Consumerе. Но можно не хранить такой прогресс на сервере и при создании читателя явно указать, что чтение будет происходить без Consumerа.

Для чтения без Consumerа следует в настройках читателя ReaderSettings это явно указать, вызвав withoutConsumer():

ReaderSettings settings = ReaderSettings.newBuilder()
        .withoutConsumer()
        .addTopic(TopicReadSettings.newBuilder()
                .setPath(TOPIC_NAME)
                .build())
        .build();

В таком случае нужно учитывать, что при переустановке соединения прогресс на сервере будет сброшен. Поэтому, чтобы не начинать чтение сначала, в SDK следует передавать offset начала чтения при каждом старте сессии чтения партиции:

@Override
public void onStartPartitionSession(StartPartitionSessionEvent event) {
    event.confirm(StartPartitionSessionSettings.newBuilder()
            .setReadOffset(lastReadOffset) // the last offset read by this client, Long
            .build());
}

Чтение в транзакции

Перед чтением из топика клиентский код должен передать в настройки получения событий из сессии ссылку на объект транзакции.

    ReadSession->WaitEvent().Wait(TDuration::Seconds(1));

    auto tableSettings = NYdb::NTable::TTxSettings::SerializableRW();
    auto transactionResult = TableSession->BeginTransaction(tableSettings).GetValueSync();
    auto Transaction = transactionResult.GetTransaction();

    NYdb::NTopic::TReadSessionGetEventSettings topicSettings;
    topicSettings.Block(false);
    topicSettings.Tx(Transaction);

    auto events = ReadSession->GetEvents(topicSettings);

    for (auto& event : events) {
        // обработать событие и записать результаты в таблицу
    }

    NYdb::NTable::TCommitTxSettings commitSettings;
    auto commitResult = Transaction.Commit(commitSettings).GetValueSync();

Важно

При обработке событий events не нужно явно подтверждать обработку для событий типа TDataReceivedEvent.

Подтверждение обработки события TStopPartitionSessionEvent надо делать после вызова Commit.

    std::optional<TStopPartitionSessionEvent> stopPartitionSession;

    auto events = ReadSession->GetEvents(topicSettings);

    for (auto& event : events) {
        if (auto* e = std::get_if<TStopPartitionSessionEvent>(&event) {
            stopPartitionSessionEvent = std::move(*e);
        } else {
            // обработать событие и записать результаты в таблицу
        }
    }

    NYdb::NTable::TCommitTxSettings commitSettings;
    auto commitResult = Transaction.Commit(commitSettings).GetValueSync();

    if (stopPartitionSessionEvent) {
        stopPartitionSessionEvent->Commit();
    }

Пример на GitHub

В настройках ReceiveSettings метода receive можно указать транзакцию:

Message message = reader.receive(ReceiveSettings.newBuilder()
        .setTransaction(transaction)
        .build());

Тогда полученное сообщение будет закоммичено вместе с транзакцией. Коммитить его отдельно не нужно.
Метод receive свяжет на сервере оффсеты сообщения с транзакцией вызовом sendUpdateOffsetsInTransaction и вернёт управление, когда получит ответ на него.

Примечание

Требования к транзакции:

  • Это должна быть активная (имеющая идентификатор) транзакция в одном из сервисов YDB. Например, Table или Query.

  • В Topic Service поддерживается только уровень изоляции транзакций SERIALIZABLE_RW.

Пример на GitHub

После получения сообщения в обработчике onMessages можно связать одно или несколько сообщений с транзакцией.
Для этого нужно вызвать отдельный метод reader.updateOffsetsInTransaction и дождаться его выполнения на сервере.
Этот метод принимает параметром список оффсетов. Для удобства у Message и DataReceivedEvent есть метод getPartitionOffsets(), возвращающий такой список.

@Override
public void onMessages(DataReceivedEvent event) {
    for (Message message : event.getMessages()) {
        // creating a session in the table service
        Result<Session> sessionResult = tableClient.createSession(Duration.ofSeconds(10)).join();
        if (!sessionResult.isSuccess()) {
            logger.error("Couldn't get a session from the pool: {}", sessionResult);
            return; // retry or shutdown
        }
        Session session = sessionResult.getValue();
        // creating a transaction in the table service
        // this transaction is not yet active and has no id
        TableTransaction transaction = session.createNewTransaction(TxMode.SERIALIZABLE_RW);

        // do something else in the transaction
        transaction.executeDataQuery("SELECT 1").join();
        // now the transaction is active and has an id
        // analyzeQueryResultIfNeeded();

        Status updateStatus = reader.updateOffsetsInTransaction(transaction,
                        message.getPartitionOffsets(), new UpdateOffsetsInTransactionSettings.Builder().build())
                // Do not commit a transaction without waiting for updateOffsetsInTransaction result to avoid a race condition
                .join();
        if (!updateStatus.isSuccess()) {
            logger.error("Couldn't update offsets in a transaction: {}", updateStatus);
            return; // retry or shutdown
        }

        Status commitStatus = transaction.commit().join();
        analyzeCommitStatus(commitStatus);
    }
}

Примечание

Требования к транзакции:

  • Это должна быть активная (имеющая идентификатор) транзакция в одном из сервисов YDB. Например, Table или Query.

  • В Topic Service поддерживается только уровень изоляции транзакций SERIALIZABLE_RW.

Обработка серверного прерывания чтения

В YDB используется серверная балансировка партиций между клиентами. Это означает, что сервер может прерывать чтение сообщений из произвольных партиций.

При мягком прерывании клиент получает уведомление, что сервер уже закончил отправку сообщений из партиции и больше сообщения читаться не будут. Клиент может завершить обработку сообщений и отправить подтверждение на сервер.

В случае жесткого прерывания клиент получает уведомление, что работать с сообщениями партиции больше нельзя. Клиент должен прекратить обработку прочитанных сообщений. Неподтвержденные сообщения будут переданы другому читателю.

Мягкое прерывание чтения

Мягкое прерывание приходит в виде события TStopPartitionSessionEvent с методом Confirm. Клиент может завершить обработку сообщений и отправить подтверждение на сервер.

Фрагмент цикла событий может выглядеть так:

auto event = ReadSession->GetEvent(/*block=*/true);
if (auto* stopPartitionSessionEvent = std::get_if<TReadSessionEvent::TStopPartitionSessionEvent>(&*event)) {
    stopPartitionSessionEvent->Confirm();
} else {
  // other event types
}

Клиентский код сразу получает все имеющиеся в буфере (на стороне SDK) сообщения, даже если их не достаточно для формирования пакета при групповой обработке.

r, _ := db.Topic().StartReader("my-consumer", nil,
    topicoptions.WithBatchReadMinCount(1000),
)

for {
    batch, _ := r.ReadMessageBatch(ctx) // <- if partition soft stop batch can be less, then 1000
    processBatch(batch)
    _ = r.Commit(batch.Context(), batch)
}

Специальной обработки не требуется.

while True:
  batch = reader.receive_batch()
  process(batch)
  reader.commit(batch)

Неактуально, т.к. в синхронном читателе нет возможности настраивать обработку подобных событий.
Клиент сразу ответит серверу подтверждением остановки.

Для возможности реагировать на такое событие следует переопределить метод onStopPartitionSession(StopPartitionSessionEvent event) в объекте-наследнике ReadEventHandler (см Подключение к топику для чтения сообщений).
event.confirm() обязательно должен быть вызван, т.к. сервер ожидает этого ответа для продолжения остановки.

@Override
public void onStopPartitionSession(StopPartitionSessionEvent event) {
    logger.info("Partition session {} stopped. Committed offset: {}", event.getPartitionSessionId(),
            event.getCommittedOffset());
    // This event means that no more messages will be received by server
    // Received messages still can be read from ReaderBuffer
    // Messages still can be committed, until confirm() method is called

    // Confirm that session can be closed
    event.confirm();
}

Жесткое прерывание чтения

Жёсткое прерывание приходит в виде события TPartitionSessionClosedEvent либо в ответ на подтверждение мягкого прерывания, либо при потере соединения с партицией. Узнать причину можно, вызвав метод GetReason.

Фрагмент цикла событий может выглядеть так:

auto event = ReadSession->GetEvent(/*block=*/true);
if (auto* partitionSessionClosedEvent = std::get_if<TReadSessionEvent::TPartitionSessionClosedEvent>(&*event)) {
    if (partitionSessionClosedEvent->GetReason() == TPartitionSessionClosedEvent::EReason::ConnectionLost) {
        std::cout << "Connection with partition was lost" << std::endl;
    }
} else {
  // other event types
}

При прерывании чтения контекст сообщения или пакета сообщений будет отменен.

ctx := batch.Context() // batch.Context() will cancel if partition revoke by server or connection broke
if len(batch.Messages) == 0 {
    return
}

buf := &bytes.Buffer{}
for _, mess := range batch.Messages {
    buf.Reset()
    _, _ = buf.ReadFrom(mess)
    _, _ = io.Copy(buf, mess)
    writeMessagesToDB(ctx, buf.Bytes())
}

В этом примере обработка сообщений в батче остановится, если в процессе работы партиция будет отобрана. Такая оптимизация требует дополнительного кода на клиенте. В простых случаях, когда обработка отобранных партиций не является проблемой, ее можно не применять.

def process_batch(batch):
    for message in batch.messages:
        if not batch.alive:
            return False
        process(message)
    return True

batch = reader.receive_batch()
if process_batch(batch):
    reader.commit(batch)

Неактуально, т.к. в синхронном читателе нет возможности настраивать обработку подобных событий.

@Override
public void onPartitionSessionClosed(PartitionSessionClosedEvent event) {
    logger.info("Partition session {} is closed.", event.getPartitionSession().getPartitionId());
}