Change Data Capture (CDC)

Внимание

Поддерживается только для строковых таблиц. Поддержка функциональности для колоночных таблиц находится в разработке.

Change Data Capture (CDC) обеспечивает захват изменений строк строковой таблицы YDB, формирует из них поток изменений (changefeed), записывает в распределенное хранилище и предоставляет доступ к этим записям для дальнейшей обработки. В качестве распределенного хранилища используется топик, который позволяет эффективно хранить лог изменений таблицы.

Когда в таблице добавляется, обновляется или удаляется строка, CDC формирует запись о произошедшем изменении с указанием первичного ключа строки и пишет ее в соответствующую данному ключу партицию топика.

Гарантии

  • Записи об изменениях шардированы между партициями топика по первичному ключу.
  • Каждое изменение доставляется ровно один раз (exactly-once семантика).
  • Изменения по одному и тому же первичному ключу доставляются в том же порядке, в котором они происходили в таблице в одну и ту же партицию топика.
  • Запись об изменении доставляется до партиции топика только после фиксации (коммита) соответствующей транзакции в таблице.

Ограничения

  • Количество партиций топика фиксируется на момент создания потока изменений и остается неизменным (топики, в отличие от таблиц, не являются эластичными).

  • В потоке изменений поддерживаются записи о следующих видах операций:

    • обновление;
    • удаление.

Таким образом, добавление строки является частным случаем обновления, и в потоке изменений запись о добавлении строки будет выглядеть аналогично записи об обновлении.

Виртуальные метки времени

Все изменения в таблицах YDB упорядочены в соответствии с порядком выполнения транзакций. Каждое изменение маркируется виртуальной меткой времени, являющейся кортежем из двух элементов:

  1. Глобального времени координатора.
  2. Уникального идентификатора транзакции.

Используя эти метки, можно упорядочить записи из разных партиций топика относительного друг друга или использовать их для фильтрации (например, чтобы исключить записи о старых изменениях).

Примечание

По умолчанию виртуальные метки времени не выгружаются в поток изменений. Для их включения используйте соответствующий параметр при создании потока.

Первоначальное сканирование таблицы

По умолчанию в поток изменений выгружаются записи только о тех строках таблицы, которые изменились после создания потока. Первоначальное сканирование таблицы позволяет выгрузить в поток изменений значения всех строк, существовавших на момент его создания.

Сканирование производится в фоновом режиме поверх снапшота таблицы. При этом возможны следующие ситуации:

  • В таблице меняется значение непросканированной строки. В поток изменений последовательно будут выгружены: запись с исходным значением и запись об изменении. При повторном изменении этой же строки будет выгружена только запись об изменении.
  • Во время сканирования обнаруживается измененная строка. В поток изменений ничего не выгружается, так как исходное значение уже было выгружено в момент изменения (см. предыдущий пункт).
  • В таблице меняется значение просканированной строки. В поток изменений выгружается только запись об изменении.

Таким образом, гарантируется, что для одной и той же строки (первичного ключа) сначала будет выгружено исходное значение, а затем — запись об изменении.

Примечание

Запись с исходным значением строки будет помечена как запись об обновлении. При использовании виртуальных меток времени записи маркируются меткой времени снапшота.

В процессе сканирования, в зависимости от частоты обновления данных таблицы, возможен повышенный фон ошибок OVERLOADED из-за того, что, помимо записей об изменениях, необходимо доставить также записи с исходными значениями строк. По окончании сканирования поток изменений переходит в нормальный режим работы.

Важно

На время первоначального сканирования в таблице приостанавливаются процессы автоматического партиционирования.

Структура записи

В зависимости от параметров потока структура записи может отличаться.

JSON-формат

Запись в формате JSON имеет следующую структуру:

{
    "key": [<key components>],
    "update": {<columns>},
    "erase": {},
    "newImage": {<columns>},
    "oldImage": {<columns>},
    "ts": [<step>, <txId>]
}
  • key — массив значений компонент первичного ключа. Присутствует всегда.
  • update — признак обновления. Присутствует, если запись соответствует операции обновления. В режиме UPDATES так же содержит названия и значения изменившихся столбцов.
  • erase — признак удаления. Присутствует, если запись соответствует операции удаления.
  • newImage — снимок состояния строки, получившегося в результате изменения. Присутствует в режимах NEW_IMAGE и NEW_AND_OLD_IMAGES. Содержит названия и значения столбцов.
  • oldImage — снимок состояния строки, предшествовавшего изменению. Присутствует в режимах OLD_IMAGE и NEW_AND_OLD_IMAGES. Содержит названия и значения столбцов.
  • tsвиртуальная метка времени. Присутствует, если включена настройка VIRTUAL_TIMESTAMPS. Содержит значение глобального времени координатора (step) и уникальный идентификатор транзакции (txId).

Например, запись об обновлении в режиме UPDATES:

{
    "key": [1, "one"],
    "update": {
        "payload": "lorem ipsum",
        "date": "2022-02-22"
    }
}

Запись об удалении:

{
    "key": [2, "two"],
    "erase": {}
}

Запись со снимками строки:

{
    "key": [1, 2, 3],
    "update": {},
    "newImage": {
        "textColumn": "value1",
        "intColumn": 101,
        "boolColumn": true
    },
    "oldImage": {
        "textColumn": null,
        "intColumn": 100,
        "boolColumn": false
    }
}

Запись с виртуальными метками времени:

{
    "key": [1],
    "update": {
        "created": "2022-12-12T00:00:00.000000Z",
        "customer": "Name123"
    },
    "ts": [1670792400890, 562949953607163]
}

Примечание

  • Одна и та же запись не может содержать поля update и erase одновременно, так как эти поля являются признаками операции (невозможно одновременно обновить и удалить строку таблицы). Но каждая запись содержит одно из этих полей (любая операция является обновлением или удалением).
  • В режиме UPDATES для операций обновления поле update выполняет роль не только признака операции (обновление), но и содержит названия и значения изменившихся столбцов.
  • Поля JSON-объекта, содержащие названия и значения столбцов (newImage, oldImage и update в режиме UPDATES), не включают в себя столбцы, являющиеся компонентами первичного ключа.
  • Если в записи присутствует поле erase (то есть запись соответствует операции удаления), то это всегда пустой JSON-объект ({}).

JSON-формат, совместимый с Debezium

Запись в формате JSON, совместимого с Debezium, имеет следующую структуру:

{
    "payload": {
        "op": <op>,
        "before": {<columns>},
        "after": {<columns>},
        "source": {
            "connector": <connector>,
            "version": <version>,
            "ts_ms": <ts_ms>,
            "step": <step>,
            "txId": <txId>,
            "snapshot": <bool>
        }
    }
}
  • op — операция, которая была произведена над строкой в таблице:

    • c — вставка. Допустимо только в режиме NEW_AND_OLD_IMAGES.
    • u — обновление.
    • d — удаление.
    • r — чтение из снапшота.
  • before — снимок состояния строки, предшествовавшего изменению. Присутствует в режимах OLD_IMAGE и NEW_AND_OLD_IMAGES. Содержит названия и значения столбцов.

  • after — снимок состояния строки, получившегося в результате изменения. Присутствует в режимах NEW_IMAGE и NEW_AND_OLD_IMAGES. Содержит названия и значения столбцов.

  • source — метаданные записи.

    • connector — название коннектора. Текущее название: ydb.
    • version — версия коннектора, используемая для генерации записи. Текущая версия: 1.0.0.
    • ts_ms — примерное время применения изменения в YDB, в миллисекундах.
    • step — глобальное время координатора. Компонент виртуальных меток времени.
    • txId — уникальный идентификатор транзакции. Компонент виртуальных меток времени.
    • snapshot — признак чтения из снапшота.

При чтении с использованием Kafka API в качестве ключа сообщения указывается Debezium-совместимый первичный ключ измененной строки:

{
    "payload": {<columns>}
}
  • payload — первичный ключ строки, которая была изменена. Содержит названия и значения столбцов, являющихся компонентами первичного ключа.

Время хранения записей

По умолчанию записи хранятся в потоке изменений в течение 24 часов с момента отправки. В зависимости от сценариев использования время хранения можно уменьшить или увеличить до 30 дней.

Важно

Записи, время хранения которых истекло, удаляются вне зависимости от того, успели их обработать (прочитать) или нет.

Удаление записей до их обработки клиентом приводит к возникновению пропусков офсетов, то есть офсеты последней прочитанной из партиции записи и самой ранней из доступных будут отличаться более, чем на единицу.

Для настройки времени хранения записей укажите параметр RETENTION_PERIOD при создании потока изменений.

Количество партиций топика

По умолчанию количество партиций топика равно количеству партиций таблицы. Количество партиций топика можно переопределить, указав параметр TOPIC_MIN_ACTIVE_PARTITIONS при создании потока изменений.

Примечание

В настоящий момент возможность явного указания числа партиций топика доступна только для таблиц, у которых первый компонент первичного ключа имеет тип Uint64 или Uint32.

Создание и удаление потока изменений

Поток изменений может быть добавлен к существующей таблице или удален директивами ADD CHANGEFEED и DROP CHANGEFEED операции YQL ALTER TABLE. При удалении таблицы добавленный к ней поток изменений также будет удален.

Назначение и применение CDC

Об использовании CDC при разработке приложений смотрите в рекомендациях.