Работа с топиками
- Примеры работы с топиками
- Инициализация соединения с топиками
- Управление топиками
- Запись сообщений
- Чтение сообщений
- Подключение к топику для чтения сообщений
- Чтение сообщений
- Чтение без подтверждения обработки сообщений
- Чтение с подтверждением обработки сообщений
- Чтение с хранением позиции на клиентской стороне
- Чтение без указания Consumer'а
- Чтение в транзакции
- Обработка серверного прерывания чтения
- Поддержка автомасштабирования топиков
В этой статье приведены примеры использования YDB SDK для работы с топиками.
Перед выполнением примеров создайте топик и добавьте читателя.
Примеры работы с топиками
Инициализация соединения с топиками
Для работы с топиками создаются экземпляры драйвера YDB и клиента.
Драйвер YDB отвечает за взаимодействие приложения и YDB на транспортном уровне. Драйвер должен существовать на всем протяжении жизненного цикла работы с топиками и должен быть инициализирован перед созданием клиента.
Клиент сервиса топиков (исходный код) работает поверх драйвера YDB и отвечает за управляющие операции с топиками, а также создание сессий чтения и записи.
Фрагмент кода приложения для инициализации драйвера YDB:
// Create driver instance.
auto driverConfig = TDriverConfig()
.SetEndpoint(opts.Endpoint)
.SetDatabase(opts.Database)
.SetAuthToken(GetEnv("YDB_TOKEN"));
TDriver driver(driverConfig);
В этом примере используется аутентификационный токен, сохранённый в переменной окружения YDB_TOKEN
. Подробнее про соединение с БД и аутентификацию.
Фрагмент кода приложения для создания клиента:
TTopicClient topicClient(driver);
Для работы с топиками создаются экземпляры транспорта YDB и клиента.
Транспорт YDB отвечает за взаимодействие приложения и YDB на транспортном уровне. Он должен существовать на всем протяжении жизненного цикла работы с топиками и должен быть инициализирован перед созданием клиента.
Фрагмент кода приложения для инициализации транспорта YDB:
try (GrpcTransport transport = GrpcTransport.forConnectionString(connString)
.withAuthProvider(CloudAuthHelper.getAuthProviderFromEnviron())
.build()) {
// Use YDB transport
}
В этом примере используется вспомогательный метод CloudAuthHelper.getAuthProviderFromEnviron()
, получающий токен из переменных окружения.
Например, YDB_ACCESS_TOKEN_CREDENTIALS
.
Подробнее про соединение с БД и аутентификацию.
Клиент сервиса топиков (исходный код) работает поверх транспорта YDB и отвечает как за управляющие операции с топиками, так и за создание писателей и читателей.
Фрагмент кода приложения для создания клиента:
try (TopicClient topicClient = TopicClient.newClient(transport)
.setCompressionExecutor(compressionExecutor)
.build()) {
// Use topic client
}
В обоих примерах кода выше используется блок (try-with-resources).
Это позволяет автоматически закрывать клиент и транспорт при выходе из этого блока, т.к. оба являются наследниками AutoCloseable
.
Управление топиками
Создание топика
Единственный обязательный параметр для создания топика - это его путь, остальные параметры опциональны.
Полный список настроек можно посмотреть в заголовочном файле.
Пример создания топика c тремя партициями и поддержкой кодека ZSTD:
auto settings = NYdb::NTopic::TCreateTopicSettings()
.PartitioningSettings(3, 3)
.AppendSupportedCodecs(NYdb::NTopic::ECodec::ZSTD);
auto status = topicClient
.CreateTopic("my-topic", settings) // returns TFuture<TStatus>
.GetValueSync();
Полный список поддерживаемых параметров можно посмотреть в документации SDK.
Пример создания топика со списком поддерживаемых кодеков и минимальным количеством партиций
err := db.Topic().Create(ctx, "topic-path",
// optional
topicoptions.CreateWithSupportedCodecs(topictypes.CodecRaw, topictypes.CodecGzip),
// optional
topicoptions.CreateWithMinActivePartitions(3),
)
Пример создания топика со списком поддерживаемых кодеков и минимальным количеством партиций
driver.topic_client.create_topic(topic_path,
supported_codecs=[ydb.TopicCodec.RAW, ydb.TopicCodec.GZIP], # optional
min_active_partitions=3, # optional
)
Полный список настроек можно посмотреть в коде SDK.
Пример создания топика со списком поддерживаемых кодеков и минимальным количеством партиций
topicClient.createTopic(topicPath, CreateTopicSettings.newBuilder()
// Optional
.setSupportedCodecs(SupportedCodecs.newBuilder()
.addCodec(Codec.RAW)
.addCodec(Codec.GZIP)
.build())
// Optional
.setPartitioningSettings(PartitioningSettings.newBuilder()
.setMinActivePartitions(3)
.build())
.build());
Изменение топика
При изменении топика в параметрах метода AlterTopic
нужно указать путь топика и параметры, которые будут изменяться. Изменяемые параметры представлены структурой TAlterTopicSettings
.
Полный список настроек можно посмотреть в заголовочном файле.
Пример добавления важного читателя к топику и установки времени хранения сообщений для топика в два дня:
auto alterSettings = NYdb::NTopic::TAlterTopicSettings()
.BeginAddConsumer("my-consumer")
.Important(true)
.EndAddConsumer()
.SetRetentionPeriod(TDuration::Days(2));
auto status = topicClient
.AlterTopic("my-topic", alterSettings) // returns TFuture<TStatus>
.GetValueSync();
При изменении топика в параметрах нужно указать путь топика и те параметры, которые будут изменяться.
Полный список поддерживаемых параметров можно посмотреть в документации SDK.
Пример добавления читателя к топику
err := db.Topic().Alter(ctx, "topic-path",
topicoptions.AlterWithAddConsumers(topictypes.Consumer{
Name: "new-consumer",
SupportedCodecs: []topictypes.Codec{topictypes.CodecRaw, topictypes.CodecGzip}, // optional
}),
)
Пример изменения списка поддерживаемых кодеков и минимального количества партиций у топика
driver.topic_client.alter_topic(topic_path,
set_supported_codecs=[ydb.TopicCodec.RAW, ydb.TopicCodec.GZIP], # optional
set_min_active_partitions=3, # optional
)
При изменении топика в параметрах метода alterTopic
нужно указать путь топика и параметры, которые будут изменяться.
Полный список настроек можно посмотреть в коде SDK.
topicClient.alterTopic(topicPath, AlterTopicSettings.newBuilder()
.addAddConsumer(Consumer.newBuilder()
.setName("new-consumer")
.setSupportedCodecs(SupportedCodecs.newBuilder()
.addCodec(Codec.RAW)
.addCodec(Codec.GZIP)
.build())
.build())
.build());
Получение информации о топике
Для получения информации о топике используется метод DescribeTopic
.
Описание топика представлено структурой TTopicDescription
.
Полный список полей описания смотри в заголовочном файле.
Получить доступ к этому описанию можно так:
auto result = topicClient.DescribeTopic("my-topic").GetValueSync();
if (result.IsSuccess()) {
const auto& description = result.GetTopicDescription();
std::cout << "Topic description: " << GetProto(description) << std::endl;
}
Существует отдельный метод для получения информации о читателе - DescribeConsumer
.
descResult, err := db.Topic().Describe(ctx, "topic-path")
if err != nil {
log.Fatalf("failed describe topic: %v", err)
return
}
fmt.Printf("describe: %#v\n", descResult)
info = driver.topic_client.describe_topic(topic_path)
print(info)
Для получения информации о топике используется метод describeTopic
.
Полный список полей описания можно посмотреть в коде SDK.
Result<TopicDescription> topicDescriptionResult = topicClient.describeTopic(topicPath)
.join();
TopicDescription description = topicDescriptionResult.getValue();
Удаление топика
Для удаления топика достаточно указать путь к нему.
auto status = topicClient.DropTopic("my-topic").GetValueSync();
err := db.Topic().Drop(ctx, "topic-path")
driver.topic_client.drop_topic(topic_path)
topicClient.dropTopic(topicPath);
Запись сообщений
Подключение к топику для записи сообщений
На данный момент поддерживается подключение только с совпадающими идентификаторами источника и группы сообщений (producer_id
и message_group_id
), в будущем это ограничение будет снято.
Подключение к топику на запись представлено объектом сессии записи с интерфейсом IWriteSession
или ISimpleBlockingWriteSession
(вариант для простой записи по одному сообщению без подтверждения, блокирующейся при превышении числа inflight записей или размера буфера SDK). Настройки сессии записи представлены структурой TWriteSessionSettings
, для варианта ISimpleBlockingWriteSession
часть настроек не поддерживается.
Полный список настроек смотри в заголовочном файле.
Пример создания сессии записи с интерфейсом IWriteSession
.
TString producerAndGroupID = "group-id";
auto settings = TWriteSessionSettings()
.Path("my-topic")
.ProducerId(producerAndGroupID)
.MessageGroupId(producerAndGroupID);
auto session = topicClient.CreateWriteSession(settings);
producerAndGroupID := "group-id"
writer, err := db.Topic().StartWriter(producerAndGroupID, "topicName",
topicoptions.WithMessageGroupID(producerAndGroupID),
)
if err != nil {
return err
}
writer = driver.topic_client.writer(topic_path)
Инициализация настроек писателя:
String producerAndGroupID = "group-id";
WriterSettings settings = WriterSettings.newBuilder()
.setTopicPath(topicPath)
.setProducerId(producerAndGroupID)
.setMessageGroupId(producerAndGroupID)
.build();
Создание синхронного писателя:
SyncWriter writer = topicClient.createSyncWriter(settings);
После создания писателя его необходимо инициализировать. Для этого есть два метода:
-
init()
: неблокирующий, запускает процесс инициализации в фоне и не ждёт его завершения.writer.init();
-
initAndWait()
: блокирующий, запускает процесс инициализации и ждёт его завершения. Если в процессе инициализации возникла ошибка, будет брошено исключение.try { writer.initAndWait(); logger.info("Init finished succsessfully"); } catch (Exception exception) { logger.error("Exception while initializing writer: ", exception); return; }
Инициализация настроек писателя:
String producerAndGroupID = "group-id";
WriterSettings settings = WriterSettings.newBuilder()
.setTopicPath(topicPath)
.setProducerId(producerAndGroupID)
.setMessageGroupId(producerAndGroupID)
.build();
Создание и инициализация асинхронного писателя:
AsyncWriter writer = topicClient.createAsyncWriter(settings);
// Init in background
writer.init()
.thenRun(() -> logger.info("Init finished successfully"))
.exceptionally(ex -> {
logger.error("Init failed with ex: ", ex);
return null;
});
Запись сообщений
Асинхронная запись возможна через интерфейс IWriteSession
.
Работа пользователя с объектом IWriteSession
в общем устроена как обработка цикла событий с тремя типами событий: TReadyToAcceptEvent
, TAcksEvent
и TSessionClosedEvent
.
Для каждого из типов событий можно установить обработчик этого события, а также можно установить общий обработчик. Обработчики устанавливаются в настройках сессии записи перед её созданием.
Если обработчик для некоторого события не установлен, его необходимо получить и обработать в методах GetEvent
/ GetEvents
. Для неблокирующего ожидания очередного события есть метод WaitEvent
с интерфейсом TFuture<void>()
.
Для записи каждого сообщения пользователь должен "потратить" move-only объект TContinuationToken
, который выдаёт SDK с событием TReadyToAcceptEvent
. При записи сообщения можно установить пользовательские seqNo и временную метку создания, но по умолчанию их проставляет SDK автоматически.
По умолчанию Write
выполняется асинхронно - данные из сообщений вычитываются и сохраняются во внутренний буфер, отправка происходит в фоне в соответствии с настройками MaxMemoryUsage
, MaxInflightCount
, BatchFlushInterval
, BatchFlushSizeBytes
. Сессия сама переподключается к YDB при обрывах связи и повторяет отправку сообщений пока это возможно, в соответствии с настройкой RetryPolicy
. При получении ошибки, которую невозможно повторить, сессия чтения отправляет пользователю TSessionClosedEvent
с диагностической информацией.
Так может выглядеть запись нескольких сообщений в цикле событий без использования обработчиков:
// Event loop
while (true) {
// Get event
// May block for a while if write session is busy
TMaybe<TWriteSessionEvent::TEvent> event = session->GetEvent(/*block=*/true);
if (auto* readyEvent = std::get_if<TWriteSessionEvent::TReadyToAcceptEvent>(&*event)) {
session->Write(std::move(event.ContinuationToken), "This is yet another message.");
} else if (auto* ackEvent = std::get_if<TWriteSessionEvent::TAcksEvent>(&*event)) {
std::cout << ackEvent->DebugString() << std::endl;
} else if (auto* closeSessionEvent = std::get_if<TSessionClosedEvent>(&*event)) {
break;
}
}
Для отправки сообщения - достаточно в поле Data сохранить Reader, из которого можно будет прочитать данные. Можно рассчитывать на то что данные каждого сообщения читаются один раз (или до первой ошибки), к моменту возврата из Write данные будут уже прочитаны и сохранены во внутренний буфер.
SeqNo и дата создания сообщений по умолчанию проставляются автоматически.
По умолчанию Write выполняется асинхронно - данные из сообщений вычитываются и сохраняются во внутренний буфер, отправка происходит в фоне. Writer сам переподключается к YDB при обрывах связи и повторяет отправку сообщений пока это возможно. При получении ошибки, которую невозможно повторить Writer останавливается и следующие вызовы Write будут завершаться с ошибкой.
err := writer.Write(ctx,
topicwriter.Message{Data: strings.NewReader("1")},
topicwriter.Message{Data: bytes.NewReader([]byte{1,2,3})},
topicwriter.Message{Data: strings.NewReader("3")},
)
if err != nil {
return err
}
Для отправки сообщений можно передавать как просто содержимое сообщения (bytes, str), так и вручную задавать некоторые свойства. Объекты можно передавать по одному или сразу в массиве (list). Метод write
выполняется асинхронно. Возврат из метода происходит сразу после того как сообщения будут положены во внутренний буфер клиента, обычно это происходит быстро. Ожидание может возникнуть, если внутренний буфер уже заполнен и нужно подождать, пока часть данных будет отправлена на сервер.
# Простая отправка сообщений, без явного указания метаданных.
# Удобно начинать, удобно использовать пока важно только содержимое сообщения.
writer = driver.topic_client.writer(topic_path)
writer.write("mess") # Строки будут переданы в кодировке utf-8, так удобно отправлять
# текстовые сообщения.
writer.write(bytes([1, 2, 3])) # Эти байты будут отправлены "как есть", так удобно отправлять
# бинарные данные.
writer.write(["mess-1", "mess-2"]) # Здесь за один вызов отправляется несколько сообщений —
# так снижаются накладные расходы на внутренние процессы SDK,
# имеет смысл при большом потоке сообщений.
# Полная форма, используется, когда кроме содержимого сообщения нужно вручную задать и его свойства.
writer = driver.topic_client.writer(topic="topic-path", auto_seqno=False, auto_created_at=False)
writer.write(ydb.TopicWriterMessage("asd", seqno=123, created_at=datetime.datetime.now()))
writer.write(ydb.TopicWriterMessage(bytes([1, 2, 3]), seqno=124, created_at=datetime.datetime.now()))
# В полной форме так же можно отправлять несколько сообщений за один вызов функции.
# Это имеет смысл при большом потоке отправляемых сообщений — для снижения
# накладных расходов на внутренние вызовы SDK.
writer.write([
ydb.TopicWriterMessage("asd", seqno=123, created_at=datetime.datetime.now()),
ydb.TopicWriterMessage(bytes([1, 2, 3]), seqno=124, created_at=datetime.datetime.now(),
])
Метод send
блокирует управление, пока сообщение не будет помещено в очередь отправки.
Попадание сообщения в эту очередь означает, что писатель сделает всё возможное для доставки сообщения.
Например, если сессия записи по какой-то причине оборвётся, писатель переустановит соединение и попробует отправить это сообщение на новой сессии.
Но попадание сообщения в очередь отправки не гарантирует того, что сообщение в итоге будет записано.
Например, могут возникать ошибки, приводящие к завершению работы писателя до того, как сообщения из очереди будут отправлены.
Если нужно подтверждение успешной записи для каждого сообщения, используйте асинхронного писателя и проверяйте статус, возвращаемый методом send
.
writer.send(Message.of("11".getBytes()));
long timeoutSeconds = 5; // How long should we wait for a message to be put into sending buffer
try {
writer.send(
Message.newBuilder()
.setData("22".getBytes())
.setCreateTimestamp(Instant.now().minusSeconds(5))
.build(),
timeoutSeconds,
TimeUnit.SECONDS
);
} catch (TimeoutException exception) {
logger.error("Send queue is full. Couldn't put message into sending queue within {} seconds", timeoutSeconds);
} catch (InterruptedException | ExecutionException exception) {
logger.error("Couldn't put the message into sending queue due to exception: ", exception);
}
Метод send
в асинхронном клиенте неблокирующий. Помещает сообщение в очередь отправки.
Метод возвращает CompletableFuture<WriteAck>
, позволяющую проверить, действительно ли сообщение было записано.
В случае, если очередь переполнена, будет брошено исключение QueueOverflowException.
Это способ сигнализировать пользователю о том, что поток записи следует притормозить.
В таком случае стоит или пропускать сообщения, или выполнять повторные попытки записи через exponential backoff.
Также можно увеличить размер клиентского буфера (setMaxSendBufferMemorySize
), чтобы обрабатывать больший объем сообщений перед тем, как он заполнится.
try {
// Non-blocking. Throws QueueOverflowException if send queue is full
writer.send(Message.of("33".getBytes()));
} catch (QueueOverflowException exception) {
// Send queue is full. Need to retry with backoff or skip
}
Запись сообщений с подтверждением о сохранении на сервере
Получение подтверждений от сервера возможно через интерфейс IWriteSession
.
Ответы о записи сообщений на сервере приходят клиенту SDK в виде событий TAcksEvent
. В одном событии могут содержаться ответы о нескольких отправленных ранее сообщениях. Варианты ответа: запись подтверждена (EES_WRITTEN
), запись отброшена как дубликат ранее записанного сообщения (EES_ALREADY_WRITTEN
) или запись отброшена по причине сбоя (EES_DISCARDED
).
Пример установки обработчика TAcksEvent для сессии записи:
auto settings = TWriteSessionSettings()
// other settings are set here
.EventHandlers(
TWriteSessionSettings::TEventHandlers()
.AcksHandler(
[&](TWriteSessionEvent::TAcksEvent& event) {
for (const auto& ack : event.Acks) {
if (ack.State == TWriteAck::EEventState::EES_WRITTEN) {
ackedSeqNo.insert(ack.SeqNo);
std::cout << "Acknowledged message with seqNo " << ack.SeqNo << std::endl;
}
}
}
)
);
auto session = topicClient.CreateWriteSession(settings);
В такой сессии записи события TAcksEvent
не будут приходить пользователю в GetEvent
/ GetEvents
, вместо этого SDK при получении подтверждений от сервера будет вызывать переданный обработчик. Аналогично можно настраивать обработчики на остальные типы событий.
При подключении можно указать опцию синхронной записи сообщений - topicoptions.WithSyncWrite(true). Тогда Write будет возвращаться только после того как получит подтверждение с сервера о сохранении всех, сообщений переданных в вызове. При этом SDK так же как и обычно будет при необходимости переподключаться и повторять отправку сообщений. В этом режиме контекст управляет только временем ожидания ответа из SDK, т.е. даже после отмены контекста SDK продолжит попытки отправить сообщения.
producerAndGroupID := "group-id"
writer, _ := db.Topic().StartWriter(producerAndGroupID, "topicName",
topicoptions.WithMessageGroupID(producerAndGroupID),
topicoptions.WithSyncWrite(true),
)
err = writer.Write(ctx,
topicwriter.Message{Data: strings.NewReader("1")},
topicwriter.Message{Data: bytes.NewReader([]byte{1,2,3})},
topicwriter.Message{Data: strings.NewReader("3")},
)
if err != nil {
return err
}
Есть два способа получить подтверждение о записи сообщений на сервере:
flush()
— дожидается подтверждения для всех сообщений, записанных ранее во внутренний буфер.write_with_ack(...)
— отправляет сообщение и ждет подтверждение его доставки от сервера. При отправке нескольких сообщений подряд это способ работает медленно.
# Положить несколько сообщений во внутренний буфер, затем дождаться,
# пока все они будут доставлены до сервера.
for mess in messages:
writer.write(mess)
writer.flush()
# Можно отправить несколько сообщений и дождаться подтверждения на всю группу.
writer.write_with_ack(["mess-1", "mess-2"])
# Ожидание при отправке каждого сообщения — этот метод вернет результат только после получения
# подтверждения от сервера.
# Это самый медленный вариант отправки сообщений, используйте его только если такой режим
# действительно нужен.
writer.write_with_ack("message")
Метод send
возвращает CompletableFuture<WriteAck>
. Её успешное завершение означает подтверждение записи сервером.
В структуре WriteAck
содержится информация о seqNo, offset и статусе записи:
writer.send(Message.of(message))
.whenComplete((result, ex) -> {
if (ex != null) {
logger.error("Exception on writing message message: ", ex);
} else {
switch (result.getState()) {
case WRITTEN:
WriteAck.Details details = result.getDetails();
StringBuilder str = new StringBuilder("Message was written successfully");
if (details != null) {
str.append(", offset: ").append(details.getOffset());
}
logger.debug(str.toString());
break;
case ALREADY_WRITTEN:
logger.warn("Message has already been written");
break;
default:
break;
}
}
});
Выбор кодека для сжатия сообщений
Подробнее о сжатии данных в топиках.
Сжатие, которое используется при отправке сообщений методом Write
, задаётся при создании сессии записи настройками Codec
и CompressionLevel
. По умолчанию выбирается кодек GZIP.
Пример создания сессии записи без сжатия сообщений:
auto settings = TWriteSessionSettings()
// other settings are set here
.Codec(ECodec::RAW);
auto session = topicClient.CreateWriteSession(settings);
Если необходимо в рамках сессии записи отправить сообщение, сжатое другим кодеком, можно использовать метод WriteEncoded
с указанием кодека и размера расжатого сообщения. Для успешной записи этим способом используемый кодек должен быть разрешён в настройках топика.
По умолчанию SDK выбирает кодек автоматически (с учетом настроек топика). В автоматическом режиме SDK сначала отправляет по одной группе сообщений каждым из разрешенных кодеков, затем иногда будет пробовать сжать сообщения всеми доступными кодеками и выбирать кодек, дающий наименьший размер сообщения. Если для топика список разрешенных кодеков пуст, то автовыбор производится между Raw и Gzip-кодеками.
При необходимости можно задать фиксированный кодек в опциях подключения. Тогда будет использоваться именно он и замеры проводиться не будут.
producerAndGroupID := "group-id"
writer, _ := db.Topic().StartWriter(producerAndGroupID, "topicName",
topicoptions.WithMessageGroupID(producerAndGroupID),
topicoptions.WithCodec(topictypes.CodecGzip),
)
По умолчанию SDK выбирает кодек автоматически (с учетом настроек топика). В автоматическом режиме SDK сначала отправляет по одной группе сообщений каждым из разрешенных кодеков, затем иногда будет пробовать сжать сообщения всеми доступными кодеками и выбирать кодек, дающий наименьший размер сообщения. Если для топика список разрешенных кодеков пуст, то автовыбор производится между Raw и Gzip-кодеками.
При необходимости можно задать фиксированный кодек в опциях подключения. Тогда будет использоваться именно он и замеры проводиться не будут.
writer = driver.topic_client.writer(topic_path,
codec=ydb.TopicCodec.GZIP,
)
String producerAndGroupID = "group-id";
WriterSettings settings = WriterSettings.newBuilder()
.setTopicPath(topicPath)
.setProducerId(producerAndGroupID)
.setMessageGroupId(producerAndGroupID)
.setCodec(Codec.ZSTD)
.build();
Запись сообщений без дедупликации
Подробнее о записи без дедупликации — в соответствующем разделе концепций.
Если в настройках сессии записи не указывается опция ProducerId
, будет создана сессия записи без дедупликации.
Пример создания такой сессии записи:
auto settings = TWriteSessionSettings()
.Path(myTopicPath);
auto session = topicClient.CreateWriteSession(settings);
Для включения дедупликации нужно в настройках сессии записи указать опцию ProducerId
или явно включить дедупликацию, вызвав метод DeduplicationEnabled()
, например, как в секции "Подключение к топику".
Запись метаданных на уровне сообщения
При записи сообщения можно дополнительно указать метаданные как список пар "ключ-значение". Эти данные будут доступны при вычитывании сообщения.
Ограничение на размер метаданных — не более 1000 ключей.
Воспользоваться функцией записи метаданных можно с помощью метода Write()
, принимающего TWriteMessage
объект:
auto settings = TWriteSessionSettings()
.Path(myTopicPath)
//set all oter settings;
;
auto session = topicClient.CreateWriteSession(settings);
TMaybe<TWriteSessionEvent::TEvent> event = session->GetEvent(/*block=*/true);
TWriteMessage message("This is yet another message").MessageMeta({
{"meta-key", "meta-value"},
{"another-key", "value"}
});
if (auto* readyEvent = std::get_if<TWriteSessionEvent::TReadyToAcceptEvent>(&*event)) {
session->Write(std::move(event.ContinuationToken), std::move(message));
}
При конструировании сообщения для записи с помощью Builder'а, ему можно передать объекты типа MetadataItem
с парой ключ типа String
+ значение типа byte[]
.
Можно передать сразу List
таких объектов:
List<MetadataItem> metadataItems = Arrays.asList(
new MetadataItem("meta-key", "meta-value".getBytes()),
new MetadataItem("another-key", "value".getBytes())
);
writer.send(
Message.newBuilder()
.setMetadataItems(metadataItems)
.build()
);
Или добавлять каждый MetadataItem
отдельно:
writer.send(
Message.newBuilder()
.addMetadataItem(new MetadataItem("meta-key", "meta-value".getBytes()))
.addMetadataItem(new MetadataItem("another-key", "value".getBytes()))
.build()
);
При чтении эти метаданные сообщения получить, вызвав на нём метод getMetadataItems()
:
Message message = reader.receive();
List<MetadataItem> metadata = message.getMetadataItems();
Запись в транзакции
Для записи в топик в транзакции необходимо передать ссылку на объект транзакции в метод Write
сессии записи:
auto tableSession = tableClient.GetSession().GetValueSync().GetSession();
auto transaction = tableSession.BeginTransaction().GetValueSync().GetTransaction();
NYdb::NTopic::TWriteMessage writeMessage("message");
topicSession->Write(std::move(writeMessage), transaction);
transaction.Commit().GetValueSync();
Для записи в топик в транзакции необходимо создать транзакционного писателя через вызов TopicClient.StartTransactionalWriter. После этого можно отправлять сообщения, как обычно. Закрывать транзакционного писателя не требуется — это происходит автоматически при завершении транзакции.
err := db.Query().DoTx(ctx, func(ctx context.Context, tx query.TxActor) error {
writer, err := db.Topic().StartTransactionalWriter(tx, topicName)
if err != nil {
return err
}
return writer.Write(ctx, topicwriter.Message{Data: strings.NewReader("asd")})
})
В настройках SendSettings
метода send
можно указать транзакцию.
Тогда сообщение будет записано вместе с коммитом этой транзакцией.
// creating a session in the table service
Result<Session> sessionResult = tableClient.createSession(Duration.ofSeconds(10)).join();
if (!sessionResult.isSuccess()) {
logger.error("Couldn't get a session from the pool: {}", sessionResult);
return; // retry or shutdown
}
Session session = sessionResult.getValue();
// creating a transaction in the table service
// this transaction is not yet active and has no id
TableTransaction transaction = session.createNewTransaction(TxMode.SERIALIZABLE_RW);
// get message text within the transaction
Result<DataQueryResult> dataQueryResult = transaction.executeDataQuery("SELECT \"Hello, world!\";")
.join();
if (!dataQueryResult.isSuccess()) {
logger.error("Couldn't execute DataQuery: {}", dataQueryResult);
return; // retry or shutdown
}
// now the transaction is active and has an id
ResultSetReader rsReader = dataQueryResult.getValue().getResultSet(0);
byte[] message;
if (rsReader.next()) {
message = rsReader.getColumn(0).getBytes();
} else {
return; // retry or shutdown
}
writer.send(
Message.of(message),
SendSettings.newBuilder()
.setTransaction(transaction)
.build()
);
// flush to wait until all messages reach server before commit
writer.flush();
Status commitStatus = transaction.commit().join();
analyzeCommitStatus(commitStatus);
В настройках SendSettings
метода send
можно указать транзакцию.
Тогда сообщение будет записано вместе с коммитом этой транзакцией.
// creating a session in the table service
Result<Session> sessionResult = tableClient.createSession(Duration.ofSeconds(10)).join();
if (!sessionResult.isSuccess()) {
logger.error("Couldn't get a session from the pool: {}", sessionResult);
return; // retry or shutdown
}
Session session = sessionResult.getValue();
// creating a transaction in the table service
// this transaction is not yet active and has no id
TableTransaction transaction = session.createNewTransaction(TxMode.SERIALIZABLE_RW);
// get message text within the transaction
Result<DataQueryResult> dataQueryResult = transaction.executeDataQuery("SELECT \"Hello, world!\";")
.join();
if (!dataQueryResult.isSuccess()) {
logger.error("Couldn't execute DataQuery: {}", dataQueryResult);
return; // retry or shutdown
}
// now the transaction is active and has an id
ResultSetReader rsReader = dataQueryResult.getValue().getResultSet(0);
byte[] message;
if (rsReader.next()) {
message = rsReader.getColumn(0).getBytes();
} else {
return; // retry or shutdown
}
try {
writer.send(Message.newBuilder()
.setData(message)
.build(),
SendSettings.newBuilder()
.setTransaction(transaction)
.build())
.whenComplete((result, ex) -> {
if (ex != null) {
logger.error("Exception while sending a message: ", ex);
} else {
switch (result.getState()) {
case WRITTEN:
WriteAck.Details details = result.getDetails();
logger.info("Message was written successfully, offset: " + details.getOffset());
break;
case ALREADY_WRITTEN:
logger.info("Message has already been written");
break;
default:
break;
}
}
})
// Waiting for the message to reach the server before committing the transaction
.join();
Status commitStatus = transaction.commit().join();
analyzeCommitStatus(commitStatus);
} catch (QueueOverflowException exception) {
logger.error("Queue overflow exception while sending a message{}: ", index, exception);
// Send queue is full. Need to retry with backoff or skip
}
Чтение сообщений
Подключение к топику для чтения сообщений
Для чтения сообщений из топика необходимо наличие заранее созданного Consumer, связанного с этим топиком.
Создать Consumer можно при создании или изменении топика.
У топика может быть несколько Consumer'ов и для каждого из них сервер хранит свой прогресс чтения.
Подключение для чтения из одного или нескольких топиков представлено объектом сессии чтения с интерфейсом IReadSession
. Настройки сессии чтения представлены структурой TReadSessionSettings
.
Полный список настроек смотри в заголовочном файле.
Чтобы создать подключение к существующему топику my-topic
через добавленного ранее читателя my-consumer
, используйте следующий код:
auto settings = TReadSessionSettings()
.ConsumerName("my-consumer")
.AppendTopics("my-topic");
auto session = topicClient.CreateReadSession(settings);
Чтобы создать подключение к существующему топику my-topic
через добавленного ранее читателя my-consumer
, используйте следующий код:
reader, err := db.Topic().StartReader("my-consumer", topicoptions.ReadTopic("my-topic"))
if err != nil {
return err
}
Чтобы создать подключение к существующему топику my-topic
через добавленного ранее читателя my-consumer
, используйте следующий код:
reader = driver.topic_client.reader(topic="topic-path", consumer="consumer_name")
Инициализация настроек читателя
ReaderSettings settings = ReaderSettings.newBuilder()
.setConsumerName(consumerName)
.addTopic(TopicReadSettings.newBuilder()
.setPath(topicPath)
.setReadFrom(Instant.now().minus(Duration.ofHours(24))) // Optional
.setMaxLag(Duration.ofMinutes(30)) // Optional
.build())
.build();
Создание синхронного читателя
SyncReader reader = topicClient.createSyncReader(settings);
После создания синхронного читателя необходимо инициализировать. Для этого следует воспользоваться одним их двух методов:
-
init()
: неблокирующий, запускает процесс инициализации в фоне и не ждёт его завершения.reader.init();
-
initAndWait()
: блокирующий, запускает процесс инициализации и ждёт его завершения. Если в процессе инициализации возникла ошибка, будет брошено исключение.try { reader.initAndWait(); logger.info("Init finished succsessfully"); } catch (Exception exception) { logger.error("Exception while initializing reader: ", exception); return; }
Инициализация настроек читателя
ReaderSettings settings = ReaderSettings.newBuilder()
.setConsumerName(consumerName)
.addTopic(TopicReadSettings.newBuilder()
.setPath(topicPath)
.setReadFrom(Instant.now().minus(Duration.ofHours(24))) // Optional
.setMaxLag(Duration.ofMinutes(30)) // Optional
.build())
.build();
Для асинхронного читателя, помимо общих настроек чтения ReaderSettings
, понадобятся настройки обработчика событий ReadEventHandlersSettings
, в которых необходимо передать экземпляр наследника ReadEventHandler
.
Он будет описывать, как должна происходить обработка различных событий, происходящих во время чтения.
ReadEventHandlersSettings handlerSettings = ReadEventHandlersSettings.newBuilder()
.setEventHandler(new Handler())
.build();
Опционально, в ReadEventHandlersSettings
можно указать executor'а, на котором будет происходить обработка сообщений.
Для реализации объекта-наследника ReadEventHandler можно воспользоваться дефолтным абстрактным классом AbstractReadEventHandler
.
Достаточно переопределить метод onMessages, отвечающий за обработку самих сообщений. Пример реализации:
private class Handler extends AbstractReadEventHandler {
@Override
public void onMessages(DataReceivedEvent event) {
for (Message message : event.getMessages()) {
StringBuilder str = new StringBuilder();
logger.info("Message received. SeqNo={}, offset={}", message.getSeqNo(), message.getOffset());
process(message);
message.commit().thenRun(() -> {
logger.info("Message committed");
});
}
}
}
Создание и инициализация асинхронного читателя:
AsyncReader reader = topicClient.createAsyncReader(readerSettings, handlerSettings);
// Init in background
reader.init()
.thenRun(() -> logger.info("Init finished successfully"))
.exceptionally(ex -> {
logger.error("Init failed with ex: ", ex);
return null;
});
Вы также можете использовать расширенный вариант создания подключения, чтобы указать несколько топиков и задать параметры чтения. Следующий код создаст подключение к топикам my-topic
и my-specific-topic
через читателя my-consumer
:
auto settings = TReadSessionSettings()
.ConsumerName("my-consumer")
.AppendTopics("my-topic")
.AppendTopics(
TTopicReadSettings("my-specific-topic")
.ReadFromTimestamp(someTimestamp)
);
auto session = topicClient.CreateReadSession(settings);
reader, err := db.Topic().StartReader("my-consumer", []topicoptions.ReadSelector{
{
Path: "my-topic",
},
{
Path: "my-specific-topic",
ReadFrom: time.Date(2022, 7, 1, 10, 15, 0, 0, time.UTC),
},
},
)
if err != nil {
return err
}
Также в примере выше задаётся время, с которого следует начинать читать сообщения.
Функциональность находится в разработке.
ReaderSettings settings = ReaderSettings.newBuilder()
.setConsumerName(consumerName)
.addTopic(TopicReadSettings.newBuilder()
.setPath("my-topic")
.build())
.addTopic(TopicReadSettings.newBuilder()
.setPath("my-specific-topic")
.setReadFrom(Instant.now().minus(Duration.ofHours(24))) // Optional
.setMaxLag(Duration.ofMinutes(30)) // Optional
.build())
.build();
Чтение сообщений
Сервер хранит позицию чтения сообщений. После вычитывания очередного сообщения клиент может отправить на сервер подтверждение обработки. Позиция чтения изменится, а при новом подключении будут вычитаны только неподтвержденные сообщения.
Читать сообщения можно и без подтверждения обработки. В этом случае при новом подключении будут прочитаны все неподтвержденные сообщения, в том числе и уже обработанные.
Информацию о том, какие сообщения уже обработаны, можно сохранять на клиентской стороне, передавая на сервер стартовую позицию чтения при создании подключения. При этом позиция чтения сообщений на сервере не изменяется.
Можно использовать транзакции. В этом случае позиция чтения изменится при подтверждении транзакции. При новом подключении будут прочитаны все неподтверждённые сообщения.
Работа пользователя с объектом IReadSession
в общем устроена как обработка цикла событий со следующими типами событий: TDataReceivedEvent
, TCommitOffsetAcknowledgementEvent
, TStartPartitionSessionEvent
, TEndPartitionSessionEvent
, TStopPartitionSessionEvent
, TPartitionSessionStatusEvent
, TPartitionSessionClosedEvent
и TSessionClosedEvent
.
Для каждого из типов событий можно установить обработчик этого события, а также можно установить общий обработчик. Обработчики устанавливаются в настройках сессии записи перед её созданием.
Если обработчик для некоторого события не установлен, его необходимо получить и обработать в методах GetEvent
/ GetEvents
. Для неблокирующего ожидания очередного события есть метод WaitEvent
с сигнатурой TFuture<void>()
.
SDK получает данные с сервера партиями и буферизирует их. В зависимости от задач клиентский код может читать сообщения из буфера по одному или пакетами.
SDK получает данные с сервера партиями и буферизирует их. В зависимости от задач клиентский код может читать сообщения из буфера по одному или пакетами.
SDK получает данные с сервера партиями и буферизирует их. В зависимости от задач клиентский код может читать сообщения из буфера по одному или пакетами.
Чтение без подтверждения обработки сообщений
Чтение сообщений по одному
Чтение сообщений по одному в C++ SDK не предусмотрено. Событие TDataReceivedEvent
содержит пакет сообщений.
func SimpleReadMessages(ctx context.Context, r *topicreader.Reader) error {
for {
mess, err := r.ReadMessage(ctx)
if err != nil {
return err
}
processMessage(mess)
}
}
while True:
message = reader.receive_message()
process(message)
Чтобы читать сообщения без подтверждения обработки, по одному, используйте следующий код:
while(true) {
Message message = reader.receive();
process(message);
}
В асинхронном клиенте нет возможности читать сообщения по одному.
Чтение сообщений пакетом
При установке сессии чтения с настройкой SimpleDataHandlers
достаточно передать обработчик для сообщений с данными. SDK будет вызывать этот обработчик на каждый принятый от сервера пакет сообщений. Подтверждения чтения по умолчанию отправляться не будут.
auto settings = TReadSessionSettings()
.EventHandlers_.SimpleDataHandlers(
[](TReadSessionEvent::TDataReceivedEvent& event) {
std::cout << "Get data event " << DebugString(event);
}
);
auto session = topicClient.CreateReadSession(settings);
// Wait SessionClosed event.
ReadSession->GetEvent(/* block = */true);
В этом примере после создания сессии основной поток дожидается завершения сессии со стороны сервера в методе GetEvent
, другие типы событий приходить не будут.
func SimpleReadBatches(ctx context.Context, r *topicreader.Reader) error {
for {
batch, err := r.ReadMessageBatch(ctx)
if err != nil {
return err
}
processBatch(batch)
}
}
while True:
batch = reader.receive_batch()
process(batch)
В синхронном клиенте нет возможности прочитать сразу пакет сообщений.
Чтобы прочитать пакет сообщений без подтверждения обработки, используйте следующий код:
private class Handler extends AbstractReadEventHandler {
@Override
public void onMessages(DataReceivedEvent event) {
for (Message message : event.getMessages()) {
process(message);
}
}
}
Чтение с подтверждением обработки сообщений
Подтверждение обработки сообщения (коммит) - сообщает серверу, что сообщение из топика обработано получателем и больше его отправлять не нужно. При использовании чтения с подтверждением нужно подтверждать все полученные сообщения без пропуска. Коммит сообщений на сервере происходит после подтверждения очередного интервала сообщений «без дырок», сами подтверждения при этом можно отправлять в любом порядке.
Например с сервера пришли сообщения 1, 2, 3. Программа обрабатывает их параллельно и отправляет подтверждения в таком порядке: 1, 3, 2. В этом случае сначала будет закоммичено сообщение 1, а сообщения 2 и 3 будут закоммичены только после того как сервер получит подтверждение об обработке сообщения 2.
В случае ошибки на коммите сообщения можно написать эту ошибку в лог и продолжить работу. Состояние сообщения в этой точке неизвестно. Сообщение могло закоммититься, а потом возникла сетевая ошибка и клиент не получил подтверждения. Если сообщение не закоммитилось, то оно будет прочитано ещё раз и снова поступит в обработку (может быть на другом читателе). Ретраить именно коммит смысла нет, т.к. сессия чтения этого сообщения уже потеряна.
Чтение сообщений по одному с подтверждением
Чтение сообщений по одному в C++ SDK не предусмотрено. Событие TDataReceivedEvent
содержит пакет сообщений.
func SimpleReadMessages(ctx context.Context, r *topicreader.Reader) error {
for {
mess, err := r.ReadMessage(ctx)
if err != nil {
return err
}
processMessage(mess)
r.Commit(mess.Context(), mess)
}
}
while True:
message = reader.receive_message()
process(message)
reader.commit(message)
Для подтверждения обработки сообщения достаточно вызвать у сообщения метод commit
.
Актуально как для синхронного, так и для асинхронного читателя.
В асинхронном читателе, при обработке пакета сообщений, можно вызвать commit
или у всего пакета сразу, или у каждого сообщения отдельно.
Этот метод возвращает CompletableFuture<Void>
, успешное выполнение которой означает подтверждение обработки сервером.
В случае ошибки коммита не следует пытаться его ретраить. Скорее всего, ошибка вызвана закрытием сессии.
Читатель (необязательно этот же) сам создаст новую сессию для этой партиции и сообщение будет прочитано снова.
message.commit()
.whenComplete((result, ex) -> {
if (ex != null) {
// Read session was probably closed, there is nothing we can do here.
// Do not retry this commit on the same event.
logger.error("exception while committing message: ", ex);
} else {
logger.info("message committed successfully");
}
});
Чтение сообщений пакетом с подтверждением
Аналогично примеру выше, при установке сессии чтения с настройкой SimpleDataHandlers
достаточно передать обработчик для сообщений с данными. SDK будет вызывать этот обработчик на каждый принятый от сервера пакет сообщений. Передача параметра commitDataAfterProcessing = true
означает, что SDK будет отправлять на сервер подтверждения чтения всех сообщений после выполнения обработчика.
auto settings = TReadSessionSettings()
.EventHandlers_.SimpleDataHandlers(
[](TReadSessionEvent::TDataReceivedEvent& event) {
std::cout << "Get data event " << DebugString(event);
}
, /* commitDataAfterProcessing = */true
);
auto session = topicClient.CreateReadSession(settings);
// Wait SessionClosed event.
ReadSession->GetEvent(/* block = */true);
func SimpleReadMessageBatch(ctx context.Context, r *topicreader.Reader) error {
for {
batch, err := r.ReadMessageBatch(ctx)
if err != nil {
return err
}
processBatch(batch)
r.Commit(batch.Context(), batch)
}
}
while True:
batch = reader.receive_batch()
process(batch)
reader.commit(batch)
Неактуально, т.к. в синхронном читателе нет возможности читать сообщения пакетами.
В обработчике onMessage
можно закоммитить весь пакет сообщений, вызвав commit
на событии.
@Override
public void onMessages(DataReceivedEvent event) {
for (Message message : event.getMessages()) {
process(message);
}
event.commit()
.whenComplete((result, ex) -> {
if (ex != null) {
// Read session was probably closed, there is nothing we can do here.
// Do not retry this commit on the same message.
logger.error("exception while committing message batch: ", ex);
} else {
logger.info("message batch committed successfully");
}
});
}
Чтение с хранением позиции на клиентской стороне
Вместо коммитов сообщений на сервер можно хранить прогресс чтения самостоятельно. В этом случае нужно передать в SDK обработчик, который будет вызываться при старте чтения каждой партиции. В этом обработчике нужно будет
указать позицию, с которой нужно начинать чтение этой партиции.
При обработке событий TStartPartitionSessionEvent
можно при ответе серверу задать позицию, с которой следует начинать чтение.
Для этого в метод Confirm
следует передать параметр readOffset
.
Допольнительно можно передать параметр commitOffset
, который укажет позицию, сообщения до которой следует считать закоммиченными.
Пример установки обработчика:
settings.EventHandlers_.StartPartitionSessionHandler(
[](TReadSessionEvent::TStartPartitionSessionEvent& event) {
auto readFromOffset = GetOffsetToReadFrom(event.GetPartitionId());
event.Confirm(readFromOffset);
}
);
Здесь GetOffsetToReadFrom
- это часть примера, а не SDK. Используйте свой способ определить требуемую стартовую позицию чтения для партиции с данным partition id.
Также в TReadSessionSettings
поддерживается настройка ReadFromTimestamp
для чтения событий с отметками времени записи не меньше данной. Эта настройка предполагается не для точного позиционирования старта, а для пропуска объёма данных за большой интервал времени. Несколько первых полученных сообщений могут иметь отметки времени записи меньше указанной.
func ReadWithExplicitPartitionStartStopHandlerAndOwnReadProgressStorage(ctx context.Context, db ydb.Connection) error {
readContext, stopReader := context.WithCancel(context.Background())
defer stopReader()
readStartPosition := func(
ctx context.Context,
req topicoptions.GetPartitionStartOffsetRequest,
) (res topicoptions.GetPartitionStartOffsetResponse, err error) {
offset, err := readLastOffsetFromDB(ctx, req.Topic, req.PartitionID)
res.StartFrom(offset)
// Reader will stop if return err != nil
return res, err
}
r, err := db.Topic().StartReader("my-consumer", topicoptions.ReadTopic("my-topic"),
topicoptions.WithGetPartitionStartOffset(readStartPosition),
)
if err != nil {
return err
}
go func() {
<-readContext.Done()
_ = r.Close(ctx)
}()
for {
batch, err := r.ReadMessageBatch(readContext)
if err != nil {
return err
}
processBatch(batch)
_ = externalSystemCommit(batch.Context(), batch.Topic(), batch.PartitionID(), batch.EndOffset())
}
}
Функциональность находится в разработке.
Чтение с заданного оффсета в Java возможно только в асинхронном читателе.
В обработчике событий StartPartitionSessionEvent
можно при ответе серверу задать позицию, с которой следует начинать чтение.
Для этого в метод confirm
следует передать настройки StartPartitionSessionSettings
с указанным оффсетом через setReadOffset
.
Также вызовом setCommitOffset
можно указать оффсет, который следует считать закоммиченным.
@Override
public void onStartPartitionSession(StartPartitionSessionEvent event) {
event.confirm(StartPartitionSessionSettings.newBuilder()
.setReadOffset(lastReadOffset) // Long
.setCommitOffset(lastCommitOffset) // Long
.build());
}
Также поддерживается настройка читателя setReadFrom
для чтения событий с отметками времени записи не меньше данной.
Чтение без указания Consumer'а
Обычно прогресс чтения топика сохраняется на сервере в каждом Consumer
е. Но можно не хранить такой прогресс на сервере и при создании читателя явно указать, что чтение будет происходить без Consumer
а.
Для чтения без Consumer
а следует в настройках читателя ReaderSettings
это явно указать, вызвав withoutConsumer()
:
ReaderSettings settings = ReaderSettings.newBuilder()
.withoutConsumer()
.addTopic(TopicReadSettings.newBuilder()
.setPath(TOPIC_NAME)
.build())
.build();
В таком случае нужно учитывать, что при переустановке соединения прогресс на сервере будет сброшен. Поэтому, чтобы не начинать чтение сначала, в SDK следует передавать offset начала чтения при каждом старте сессии чтения партиции:
@Override
public void onStartPartitionSession(StartPartitionSessionEvent event) {
event.confirm(StartPartitionSessionSettings.newBuilder()
.setReadOffset(lastReadOffset) // the last offset read by this client, Long
.build());
}
Чтение в транзакции
Перед чтением из топика клиентский код должен передать в настройки получения событий из сессии ссылку на объект транзакции.
ReadSession->WaitEvent().Wait(TDuration::Seconds(1));
auto tableSettings = NYdb::NTable::TTxSettings::SerializableRW();
auto transactionResult = TableSession->BeginTransaction(tableSettings).GetValueSync();
auto Transaction = transactionResult.GetTransaction();
NYdb::NTopic::TReadSessionGetEventSettings topicSettings;
topicSettings.Block(false);
topicSettings.Tx(Transaction);
auto events = ReadSession->GetEvents(topicSettings);
for (auto& event : events) {
// обработать событие и записать результаты в таблицу
}
NYdb::NTable::TCommitTxSettings commitSettings;
auto commitResult = Transaction.Commit(commitSettings).GetValueSync();
Важно
При обработке событий events
не нужно явно подтверждать обработку для событий типа TDataReceivedEvent
.
Подтверждение обработки события TStopPartitionSessionEvent
надо делать после вызова Commit
.
std::optional<TStopPartitionSessionEvent> stopPartitionSession;
auto events = ReadSession->GetEvents(topicSettings);
for (auto& event : events) {
if (auto* e = std::get_if<TStopPartitionSessionEvent>(&event) {
stopPartitionSessionEvent = std::move(*e);
} else {
// обработать событие и записать результаты в таблицу
}
}
NYdb::NTable::TCommitTxSettings commitSettings;
auto commitResult = Transaction.Commit(commitSettings).GetValueSync();
if (stopPartitionSessionEvent) {
stopPartitionSessionEvent->Commit();
}
Для чтения сообщений в рамках транзакции следует использовать метод Reader.PopMessagesBatchTx
. Он прочитает пакет сообщений и добавит их коммит в транзакцию, при этом отдельно коммитить эти сообщения не требуется. Читателя сообщений можно использовать повторно в разных транзакциях. При этом важно, чтобы порядок коммита транзакций соответствовал порядку получения сообщений от читателя, так как коммиты сообщений в топике должны выполняться строго по порядку. Проще всего это сделать если использовать читателя в цикле.
for {
err := db.Query().DoTx(ctx, func(ctx context.Context, tx query.TxActor) error {
batch, err := reader.PopMessagesBatchTx(ctx, tx) // батч закоммитится при общем коммите транзакции
if err != nil {
return err
}
return processBatch(ctx, batch)
})
if err != nil {
handleError(err)
}
}
В настройках ReceiveSettings
метода receive
можно указать транзакцию:
Message message = reader.receive(ReceiveSettings.newBuilder()
.setTransaction(transaction)
.build());
Тогда полученное сообщение будет закоммичено вместе с транзакцией. Коммитить его отдельно не нужно.
Метод receive
свяжет на сервере оффсеты сообщения с транзакцией вызовом sendUpdateOffsetsInTransaction
и вернёт управление, когда получит ответ на него.
После получения сообщения в обработчике onMessages
можно связать одно или несколько сообщений с транзакцией.
Для этого нужно вызвать отдельный метод reader.updateOffsetsInTransaction
и дождаться его выполнения на сервере.
Этот метод принимает параметром список оффсетов. Для удобства у Message
и DataReceivedEvent
есть метод getPartitionOffsets()
, возвращающий такой список.
@Override
public void onMessages(DataReceivedEvent event) {
for (Message message : event.getMessages()) {
// creating a session in the table service
Result<Session> sessionResult = tableClient.createSession(Duration.ofSeconds(10)).join();
if (!sessionResult.isSuccess()) {
logger.error("Couldn't get a session from the pool: {}", sessionResult);
return; // retry or shutdown
}
Session session = sessionResult.getValue();
// creating a transaction in the table service
// this transaction is not yet active and has no id
TableTransaction transaction = session.createNewTransaction(TxMode.SERIALIZABLE_RW);
// do something else in the transaction
transaction.executeDataQuery("SELECT 1").join();
// now the transaction is active and has an id
// analyzeQueryResultIfNeeded();
Status updateStatus = reader.updateOffsetsInTransaction(transaction,
message.getPartitionOffsets(), new UpdateOffsetsInTransactionSettings.Builder().build())
// Do not commit a transaction without waiting for updateOffsetsInTransaction result to avoid a race condition
.join();
if (!updateStatus.isSuccess()) {
logger.error("Couldn't update offsets in a transaction: {}", updateStatus);
return; // retry or shutdown
}
Status commitStatus = transaction.commit().join();
analyzeCommitStatus(commitStatus);
}
}
Обработка серверного прерывания чтения
В YDB используется серверная балансировка партиций между клиентами. Это означает, что сервер может прерывать чтение сообщений из произвольных партиций.
При мягком прерывании клиент получает уведомление, что сервер уже закончил отправку сообщений из партиции и больше сообщения читаться не будут. Клиент может завершить обработку сообщений и отправить подтверждение на сервер.
В случае жесткого прерывания клиент получает уведомление, что работать с сообщениями партиции больше нельзя. Клиент должен прекратить обработку прочитанных сообщений. Неподтвержденные сообщения будут переданы другому читателю.
Мягкое прерывание чтения
Мягкое прерывание приходит в виде события TStopPartitionSessionEvent
с методом Confirm
. Клиент может завершить обработку сообщений и отправить подтверждение на сервер.
Фрагмент цикла событий может выглядеть так:
auto event = ReadSession->GetEvent(/*block=*/true);
if (auto* stopPartitionSessionEvent = std::get_if<TReadSessionEvent::TStopPartitionSessionEvent>(&*event)) {
stopPartitionSessionEvent->Confirm();
} else {
// other event types
}
Клиентский код сразу получает все имеющиеся в буфере (на стороне SDK) сообщения, даже если их не достаточно для формирования пакета при групповой обработке.
r, _ := db.Topic().StartReader("my-consumer", nil,
topicoptions.WithBatchReadMinCount(1000),
)
for {
batch, _ := r.ReadMessageBatch(ctx) // <- if partition soft stop batch can be less, then 1000
processBatch(batch)
_ = r.Commit(batch.Context(), batch)
}
Специальной обработки не требуется.
while True:
batch = reader.receive_batch()
process(batch)
reader.commit(batch)
Неактуально, т.к. в синхронном читателе нет возможности настраивать обработку подобных событий.
Клиент сразу ответит серверу подтверждением остановки.
Для возможности реагировать на такое событие следует переопределить метод onStopPartitionSession(StopPartitionSessionEvent event)
в объекте-наследнике ReadEventHandler
(см Подключение к топику для чтения сообщений).
event.confirm()
обязательно должен быть вызван, т.к. сервер ожидает этого ответа для продолжения остановки.
@Override
public void onStopPartitionSession(StopPartitionSessionEvent event) {
logger.info("Partition session {} stopped. Committed offset: {}", event.getPartitionSessionId(),
event.getCommittedOffset());
// This event means that no more messages will be received by server
// Received messages still can be read from ReaderBuffer
// Messages still can be committed, until confirm() method is called
// Confirm that session can be closed
event.confirm();
}
Жесткое прерывание чтения
Жёсткое прерывание приходит в виде события TPartitionSessionClosedEvent
либо в ответ на подтверждение мягкого прерывания, либо при потере соединения с партицией. Узнать причину можно, вызвав метод GetReason
.
Фрагмент цикла событий может выглядеть так:
auto event = ReadSession->GetEvent(/*block=*/true);
if (auto* partitionSessionClosedEvent = std::get_if<TReadSessionEvent::TPartitionSessionClosedEvent>(&*event)) {
if (partitionSessionClosedEvent->GetReason() == TPartitionSessionClosedEvent::EReason::ConnectionLost) {
std::cout << "Connection with partition was lost" << std::endl;
}
} else {
// other event types
}
При прерывании чтения контекст сообщения или пакета сообщений будет отменен.
ctx := batch.Context() // batch.Context() will cancel if partition revoke by server or connection broke
if len(batch.Messages) == 0 {
return
}
buf := &bytes.Buffer{}
for _, mess := range batch.Messages {
buf.Reset()
_, _ = buf.ReadFrom(mess)
_, _ = io.Copy(buf, mess)
writeMessagesToDB(ctx, buf.Bytes())
}
В этом примере обработка сообщений в батче остановится, если в процессе работы партиция будет отобрана. Такая оптимизация требует дополнительного кода на клиенте. В простых случаях, когда обработка отобранных партиций не является проблемой, ее можно не применять.
def process_batch(batch):
for message in batch.messages:
if not batch.alive:
return False
process(message)
return True
batch = reader.receive_batch()
if process_batch(batch):
reader.commit(batch)
Неактуально, т.к. в синхронном читателе нет возможности настраивать обработку подобных событий.
@Override
public void onPartitionSessionClosed(PartitionSessionClosedEvent event) {
logger.info("Partition session {} is closed.", event.getPartitionSession().getPartitionId());
}
Поддержка автомасштабирования топиков
SDK поддерживает два режима чтения топиков с включенным автомасштабированием: режим полной поддержки и режим совместимости. Режим чтения задаётся в параметрах создания сессии чтения. По умолчанию используется режим совместимости.
auto settings = TReadSessionSettings()
.SetAutoscalingSupport(true); // full support is enabled
// or
auto settings = TReadSessionSettings()
.SetAutoscalingSupport(false); // compatibility mode is enabled
auto readSession = topicClient.CreateReadSession(settings);
В режиме полной поддержки, когда все сообщения из партиции будут прочитаны, придёт событие TEndPartitionSessionEvent
. После получения этого события в партиции больше не появится новых сообщений для чтения. Чтобы продолжить чтение из дочерних партиций, необходимо вызвать Confirm()
, тем самым подтвердив, что приложение готово принимать сообщения из дочерних партиций. Если сообщения из всех партиций обрабатываются в одном потоке, то Confirm()
можно вызвать сразу после получения TEndPartitionSessionEvent
. Если обработка сообщений из разных партиций осуществляется в разных потоках, то следует завершить обработку сообщений, например, выполнить накопившийся батч, подтвердить их обработку (коммит) или сохранить позицию чтения в своей базе, и только после этого вызвать Confirm()
.
После получения TEndPartitionSessionEvent
и обработки всех сообщений рекомендуется всегда сразу подтверждать их обработку (коммит). Это позволит сбалансировать чтение дочерних партиций между разными сессиями чтения, что приведёт к равномерному распределению нагрузки по всем читателям.
Фрагмент цикла событий может выглядеть так:
auto settings = TReadSessionSettings()
.SetAutoscalingSupport(true);
auto readSession = topicClient.CreateReadSession(settings);
auto event = readSession->GetEvent(/*block=*/true);
if (auto* endPartitionSessionEvent = std::get_if<TReadSessionEvent::TEndPartitionSessionEvent>(&*event)) {
endPartitionSessionEvent->Confirm();
} else {
// other event types
}
В режиме совместимости отсутствует явный сигнал о завершении чтения из партиции, и сервер будет пытаться эвристически определить, что клиент обработал партицию до конца. Это может привести к задержке между завершением чтения из исходной партиции и началом чтения из её дочерних партиций.
Если клиент подтверждает обработку сообщений (коммит), то сигналом завершения обработки сообщений из партиции будет подтверждение обработки последнего сообщения этой партиции. В случае, если клиент не подтверждает обработку сообщений, сервер будет периодически прерывать чтение из партиции и переключаться на чтение в другой сессии (если существуют другие сессии, готовые обрабатывать партицию). Это будет продолжаться до тех пор, пока чтение не начнётся с конца партиции.
Рекомендуется проверять корректность обработки мягкого прерывания чтения: клиент должен обработать полученные сообщения, подтвердить их обработку (коммит) или сохранить позицию чтения в своей базе, и только после этого вызывать Confirm()
для события TStopPartitionSessionEvent
.