Change Data Capture (CDC)
Внимание
Поддерживается только для строковых таблиц. Поддержка функциональности для колоночных таблиц находится в разработке.
Change Data Capture (CDC) обеспечивает захват изменений строк строковой таблицы YDB, формирует из них поток изменений (changefeed), записывает в распределенное хранилище и предоставляет доступ к этим записям для дальнейшей обработки. В качестве распределенного хранилища используется топик, который позволяет эффективно хранить лог изменений таблицы.
Когда в таблице добавляется, обновляется или удаляется строка, CDC формирует запись о произошедшем изменении с указанием первичного ключа строки и пишет ее в соответствующую данному ключу партицию топика.
Гарантии
- Записи об изменениях шардированы между партициями топика по первичному ключу.
- Каждое изменение доставляется ровно один раз (exactly-once семантика).
- Изменения по одному и тому же первичному ключу доставляются в том же порядке, в котором они происходили в таблице в одну и ту же партицию топика.
- Запись об изменении доставляется до партиции топика только после фиксации (коммита) соответствующей транзакции в таблице.
Ограничения
-
Количество партиций топика фиксируется на момент создания потока изменений и остается неизменным (топики, в отличие от таблиц, не являются эластичными).
-
В потоке изменений поддерживаются записи о следующих видах операций:
- обновление;
- удаление.
Таким образом, добавление строки является частным случаем обновления, и в потоке изменений запись о добавлении строки будет выглядеть аналогично записи об обновлении.
Виртуальные метки времени
Все изменения в таблицах YDB упорядочены в соответствии с порядком выполнения транзакций. Каждое изменение маркируется виртуальной меткой времени, являющейся кортежем из двух элементов:
- Глобального времени координатора.
- Уникального идентификатора транзакции.
Используя эти метки, можно упорядочить записи из разных партиций топика относительного друг друга или использовать их для фильтрации (например, чтобы исключить записи о старых изменениях).
Примечание
По умолчанию виртуальные метки времени не выгружаются в поток изменений. Для их включения используйте соответствующий параметр при создании потока.
Первоначальное сканирование таблицы
По умолчанию в поток изменений выгружаются записи только о тех строках таблицы, которые изменились после создания потока. Первоначальное сканирование таблицы позволяет выгрузить в поток изменений значения всех строк, существовавших на момент его создания.
Сканирование производится в фоновом режиме поверх снапшота таблицы. При этом возможны следующие ситуации:
- В таблице меняется значение непросканированной строки. В поток изменений последовательно будут выгружены: запись с исходным значением и запись об изменении. При повторном изменении этой же строки будет выгружена только запись об изменении.
- Во время сканирования обнаруживается измененная строка. В поток изменений ничего не выгружается, так как исходное значение уже было выгружено в момент изменения (см. предыдущий пункт).
- В таблице меняется значение просканированной строки. В поток изменений выгружается только запись об изменении.
Таким образом, гарантируется, что для одной и той же строки (первичного ключа) сначала будет выгружено исходное значение, а затем — запись об изменении.
Примечание
Запись с исходным значением строки будет помечена как запись об обновлении. При использовании виртуальных меток времени записи маркируются меткой времени снапшота.
В процессе сканирования, в зависимости от частоты обновления данных таблицы, возможен повышенный фон ошибок OVERLOADED
из-за того, что, помимо записей об изменениях, необходимо доставить также записи с исходными значениями строк. По окончании сканирования поток изменений переходит в нормальный режим работы.
Важно
На время первоначального сканирования в таблице приостанавливаются процессы автоматического партиционирования.
Структура записи
В зависимости от параметров потока структура записи может отличаться.
JSON-формат
Запись в формате JSON имеет следующую структуру:
{
"key": [<key components>],
"update": {<columns>},
"erase": {},
"newImage": {<columns>},
"oldImage": {<columns>},
"ts": [<step>, <txId>]
}
key
— массив значений компонент первичного ключа. Присутствует всегда.update
— признак обновления. Присутствует, если запись соответствует операции обновления. В режимеUPDATES
так же содержит названия и значения изменившихся столбцов.erase
— признак удаления. Присутствует, если запись соответствует операции удаления.newImage
— снимок состояния строки, получившегося в результате изменения. Присутствует в режимахNEW_IMAGE
иNEW_AND_OLD_IMAGES
. Содержит названия и значения столбцов.oldImage
— снимок состояния строки, предшествовавшего изменению. Присутствует в режимахOLD_IMAGE
иNEW_AND_OLD_IMAGES
. Содержит названия и значения столбцов.ts
— виртуальная метка времени. Присутствует, если включена настройкаVIRTUAL_TIMESTAMPS
. Содержит значение глобального времени координатора (step
) и уникальный идентификатор транзакции (txId
).
Например, запись об обновлении в режиме UPDATES
:
{
"key": [1, "one"],
"update": {
"payload": "lorem ipsum",
"date": "2022-02-22"
}
}
Запись об удалении:
{
"key": [2, "two"],
"erase": {}
}
Запись со снимками строки:
{
"key": [1, 2, 3],
"update": {},
"newImage": {
"textColumn": "value1",
"intColumn": 101,
"boolColumn": true
},
"oldImage": {
"textColumn": null,
"intColumn": 100,
"boolColumn": false
}
}
Запись с виртуальными метками времени:
{
"key": [1],
"update": {
"created": "2022-12-12T00:00:00.000000Z",
"customer": "Name123"
},
"ts": [1670792400890, 562949953607163]
}
Примечание
- Одна и та же запись не может содержать поля
update
иerase
одновременно, так как эти поля являются признаками операции (невозможно одновременно обновить и удалить строку таблицы). Но каждая запись содержит одно из этих полей (любая операция является обновлением или удалением). - В режиме
UPDATES
для операций обновления полеupdate
выполняет роль не только признака операции (обновление), но и содержит названия и значения изменившихся столбцов. - Поля JSON-объекта, содержащие названия и значения столбцов (
newImage
,oldImage
иupdate
в режимеUPDATES
), не включают в себя столбцы, являющиеся компонентами первичного ключа. - Если в записи присутствует поле
erase
(то есть запись соответствует операции удаления), то это всегда пустой JSON-объект ({}
).
JSON-формат, совместимый с Debezium
Запись в формате JSON, совместимого с Debezium, имеет следующую структуру:
{
"payload": {
"op": <op>,
"before": {<columns>},
"after": {<columns>},
"source": {
"connector": <connector>,
"version": <version>,
"ts_ms": <ts_ms>,
"step": <step>,
"txId": <txId>,
"snapshot": <bool>
}
}
}
-
op
— операция, которая была произведена над строкой в таблице:c
— вставка. Допустимо только в режимеNEW_AND_OLD_IMAGES
.u
— обновление.d
— удаление.r
— чтение из снапшота.
-
before
— снимок состояния строки, предшествовавшего изменению. Присутствует в режимахOLD_IMAGE
иNEW_AND_OLD_IMAGES
. Содержит названия и значения столбцов. -
after
— снимок состояния строки, получившегося в результате изменения. Присутствует в режимахNEW_IMAGE
иNEW_AND_OLD_IMAGES
. Содержит названия и значения столбцов. -
source
— метаданные записи.connector
— название коннектора. Текущее название:ydb
.version
— версия коннектора, используемая для генерации записи. Текущая версия:1.0.0
.ts_ms
— примерное время применения изменения в YDB, в миллисекундах.step
— глобальное время координатора. Компонент виртуальных меток времени.txId
— уникальный идентификатор транзакции. Компонент виртуальных меток времени.snapshot
— признак чтения из снапшота.
При чтении с использованием Kafka API в качестве ключа сообщения указывается Debezium-совместимый первичный ключ измененной строки:
{
"payload": {<columns>}
}
payload
— первичный ключ строки, которая была изменена. Содержит названия и значения столбцов, являющихся компонентами первичного ключа.
Время хранения записей
По умолчанию записи хранятся в потоке изменений в течение 24 часов с момента отправки. В зависимости от сценариев использования время хранения можно уменьшить или увеличить до 30 дней.
Важно
Записи, время хранения которых истекло, удаляются вне зависимости от того, успели их обработать (прочитать) или нет.
Удаление записей до их обработки клиентом приводит к возникновению пропусков офсетов, то есть офсеты последней прочитанной из партиции записи и самой ранней из доступных будут отличаться более, чем на единицу.
Для настройки времени хранения записей укажите параметр RETENTION_PERIOD при создании потока изменений.
Количество партиций топика
По умолчанию количество партиций топика равно количеству партиций таблицы. Количество партиций топика можно переопределить, указав параметр TOPIC_MIN_ACTIVE_PARTITIONS при создании потока изменений.
Примечание
В настоящий момент возможность явного указания числа партиций топика доступна только для таблиц, у которых первый компонент первичного ключа имеет тип Uint64
или Uint32
.
Создание и удаление потока изменений
Поток изменений может быть добавлен к существующей таблице или удален директивами ADD CHANGEFEED и DROP CHANGEFEED операции YQL ALTER TABLE
. При удалении таблицы добавленный к ней поток изменений также будет удален.
Назначение и применение CDC
Об использовании CDC при разработке приложений смотрите в рекомендациях.