Работа с топиками
В этой статье приведены примеры использования YDB SDK для работы с топиками.
Перед выполнением примеров создайте топик и добавьте читателя.
Управление топиками
Создание топика
Полный список поддерживаемых параметров можно посмотреть в документации SDK.
Пример создания тописка со списком поддерживаемых кодеков и минимальным количество партиций
err := db.Topic().Create(ctx, "topic-path",
// optional
topicoptions.CreateWithSupportedCodecs(topictypes.CodecRaw, topictypes.CodecGzip),
// optional
topicoptions.CreateWithMinActivePartitions(3),
)
Изменение топика
При изменении топика в параметрах нужно указать путь топика и те параметры, которые будут изменяться.
Полный список поддерживаемых параметров можно посмотреть в документации SDK.
Пример добавления читателя к топику
err := db.Topic().Alter(ctx, "topic-path",
topicoptions.AlterWithAddConsumers(topictypes.Consumer{
Name: "new-consumer",
SupportedCodecs: []topictypes.Codec{topictypes.CodecRaw, topictypes.CodecGzip}, // optional
}),
)
Получение информации о топике
descResult, err := db.Topic().Describe(ctx, "topic-path")
if err != nil {
log.Fatalf("failed drop topic: %v", err)
return
}
fmt.Printf("describe: %#v\n", descResult)
Удаление топика
Для удаления топика достаточно указать путь к нему.
err := db.Topic().Drop(ctx, "topic-path")
Запись сообщений
Подключение к топику для записи сообщений
На данный момент поддерживается подключение только с совпадающими producer_id и message_group_id, в будущем это ограничение будет снято.
producerAndGroupID := "group-id"
writer, err := db.Topic().StartWriter(producerAndGroupID, "topicName",
topicoptions.WithMessageGroupID(producerAndGroupID),
)
if err != nil {
return err
}
Асинхронная запись сообщений
Для отправки сообщения - достаточно в поле Data сохранить Reader, из которого можно будет прочитать данные. Можно рассчитывать на то что данные каждого сообщения читаются один раз (или до первой ошибки), к моменту возврата из Write данные будут уже прочитаны и сохранены во внутренний буфер.
SeqNo и дата создания сообщений по умолчанию проставляются автоматически.
По умолчанию Write выполняется асинхронно - данные из сообщений вычитываются и сохраняются во внутренний буфер, отправка происходит в фоне. Writer сам переподключается к YDB при обрывах связи и повторяет отправку сообщений пока это возможно. При получении ошибки, которую невозможно повторить Writer останавливается и следующие вызовы Write будут завершаться с ошибкой.
err := writer.Write(ctx,
topicwriter.Message{Data: strings.NewReader("1")},
topicwriter.Message{Data: bytes.NewReader([]byte{1,2,3})},
topicwriter.Message{Data: strings.NewReader("3")},
)
if err == nil {
return err
}
Запись сообщений с подтверждением о сохранении на сервере
При подключении можно указать опцию синхронной записи сообщений - topicoptions.WithSyncWrite(true). Тогда Write будет возвращаться только после того как получит подтверждение с сервера о сохранении всех, сообщений переданных в вызове. При этом SDK так же как и обычно будет при необходимости переподключаться и повторять отправку сообщений. В этом режиме контекст управляет только временем ожидания ответа из SDK, т.е. даже после отмены контекста SDK продолжит попытки отправить сообщения.
producerAndGroupID := "group-id"
writer, _ := db.Topic().StartWriter(producerAndGroupID, "topicName",
topicoptions.WithMessageGroupID(producerAndGroupID),
topicoptions.WithSyncWrite(true),
)
err = writer.Write(ctx,
topicwriter.Message{Data: strings.NewReader("1")},
topicwriter.Message{Data: bytes.NewReader([]byte{1,2,3})},
topicwriter.Message{Data: strings.NewReader("3")},
)
if err == nil {
return err
}
Выбор кодека для сжатия сообщений
По умолчанию SDK выбирает кодек автоматически (с учётом настроек топика). В автоматическом режиме SDK сначала отправляет по одной группе сообщений каждым из разрешённых кодеков, затем иногда будет пробовать сжать сообщения всеми доступными кодеками и выбирать кодек, дающий наименьший размер сообщения. Если для топика список разрешённых кодеков пуст, то автовыбор производится между Raw и Gzip-кодеками.
При необходимости можно задать фиксированный кодек в опциях подключения. Тогда будет использоваться именно он и замеры проводиться не будут.
producerAndGroupID := "group-id"
writer, _ := db.Topic().StartWriter(producerAndGroupID, "topicName",
topicoptions.WithMessageGroupID(producerAndGroupID),
topicoptions.WithCodec(topictypes.CodecGzip),
)
Чтение сообщений
Подключение к топику для чтения сообщений
Чтобы создать подключение к существующему топику my-topic
через добавленного ранее читателя my-consumer
, используйте следующий код:
reader, err := db.Topic().StartReader("my-consumer", topicoptions.ReadTopic("my-topic"))
if err != nil {
return err
}
Вы также можете использовать расширенный вариант создания подключения, чтобы указать несколько топиков и задать параметры чтения. Следующий код создаст подключение к топикам my-topic
и my-specific-topic
через читателя my-consumer
, а также задаст время, с которого начинать читать сообщения:
reader, err := db.Topic().StartReader("my-consumer", []topicoptions.ReadSelector{
{
Path: "my-topic",
},
{
Path: "my-specific-topic",
ReadFrom: time.Date(2022, 7, 1, 10, 15, 0, 0, time.UTC),
},
},
)
if err != nil {
return err
}
Чтение сообщений
Сервер хранит позицию чтения сообщений. После вычитывания очередного сообщения клиент может отправить на сервер подтверждение обработки. Позиция чтения изменится, а при новом подключении будут вычитаны только неподтвержденные сообщения.
Читать сообщения можно и без подтверждения обработки. В этом случае при новом подключении будут прочитаны все неподтвержденные сообщения, в том числе и уже обработанные.
Информацию о том, какие сообщения уже обработаны, можно сохранять на клиентской стороне, передавая на сервер стартовую позицию чтения при создании подключения. При этом позиция чтения сообщений на сервере не изменяется.
SDK получает данные с сервера партиями и буферизирует их. В зависимости от задач клиентский код может читать сообщения из буфера по одному или пакетами.
Чтение без подтверждения обработки сообщений
Чтобы читать сообщения по одному, используйте следующий код:
func SimpleReadMessages(ctx context.Context, r *topicreader.Reader) error {
for {
mess, err := r.ReadMessage(ctx)
if err != nil {
return err
}
processMessage(mess)
}
}
Чтобы прочитать пакет сообщений, используйте следующий код:
func SimpleReadBatches(ctx context.Context, r *topicreader.Reader) error {
for {
batch, err := r.ReadMessageBatch(ctx)
if err != nil {
return err
}
processBatch(batch)
}
}
Чтение с подтверждением обработки сообщений
Чтобы подтверждать обработку сообщений по одному, используйте следующий код:
func SimpleReadMessages(ctx context.Context, r *topicreader.Reader) error {
for {
mess, err := r.ReadMessage(ctx)
if err != nil {
return err
}
processMessage(mess)
r.Commit(mess.Context(), mess)
}
}
Для подтверждения обработки пакета сообщений используйте следующий код:
func SimpleReadMessageBatch(ctx context.Context, r *topicreader.Reader) error {
for {
batch, err := r.ReadMessageBatch(ctx)
if err != nil {
return err
}
processBatch(batch)
r.Commit(batch.Context(), batch)
}
}
Чтение с хранением позиции на клиентской стороне
При начале чтения клиентский код должен сообщить серверу стартовую позицию чтения:
func ReadWithExplicitPartitionStartStopHandlerAndOwnReadProgressStorage(ctx context.Context, db ydb.Connection) error {
readContext, stopReader := context.WithCancel(context.Background())
defer stopReader()
readStartPosition := func(
ctx context.Context,
req topicoptions.GetPartitionStartOffsetRequest,
) (res topicoptions.GetPartitionStartOffsetResponse, err error) {
offset, err := readLastOffsetFromDB(ctx, req.Topic, req.PartitionID)
res.StartFrom(offset)
// Reader will stop if return err != nil
return res, err
}
r, err := db.Topic().StartReader("my-consumer", topicoptions.ReadTopic("my-topic"),
topicoptions.WithGetPartitionStartOffset(readStartPosition),
)
if err != nil {
return err
}
go func() {
<-readContext.Done()
_ = r.Close(ctx)
}()
for {
batch, err := r.ReadMessageBatch(readContext)
if err != nil {
return err
}
processBatch(batch)
_ = externalSystemCommit(batch.Context(), batch.Topic(), batch.PartitionID(), batch.EndOffset())
}
}
Обработка серверного прерывания чтения
В YDB используется серверная балансировка партиций между клиентами. Это означает, что сервер может прерывать чтение сообщений из произвольных партиций.
При мягком прерывании клиент получает уведомление, что сервер уже закончил отправку сообщений из партиции и больше сообщения читаться не будут. Клиент может завершить обработку сообщений и отправить подтверждение на сервер.
В случае жесткого прерывания клиент получает уведомление, что работать с сообщениями партиции больше нельзя. Клиент должен прекратить обработку прочитанных сообщений. Неподтвержденные сообщения будут переданы другому читателю.
Мягкое прерывание чтения
Клиентский код сразу получает все имеющиеся в буфере (на стороне SDK) сообщения, даже если их не достаточно для формирования пакета при групповой обработке.
r, _ := db.Topic().StartReader("my-consumer", nil,
topicoptions.WithBatchReadMinCount(1000),
)
for {
batch, _ := r.ReadMessageBatch(ctx) // <- if partition soft stop batch can be less, then 1000
processBatch(batch)
_ = r.Commit(batch.Context(), batch)
}
Жесткое прерывание чтения
При прерывании чтения контекст сообщения или пакета сообщений будет отменен.
ctx := batch.Context() // batch.Context() will cancel if partition revoke by server or connection broke
if len(batch.Messages) == 0 {
return
}
buf := &bytes.Buffer{}
for _, mess := range batch.Messages {
buf.Reset()
_, _ = buf.ReadFrom(mess)
_, _ = io.Copy(buf, mess)
writeMessagesToDB(ctx, buf.Bytes())
}