Change Data Capture (CDC)

Change Data Capture (CDC) captures changes to YDB table rows, uses these changes to generate a changefeed, writes them to distributed storage, and provides access to these records for further processing. It uses a topic as distributed storage to efficiently store the table change log.

When adding, updating, or deleting a table row, CDC generates a change record by specifying the primary key of the row and writes it to the topic partition corresponding to this key.

Guarantees

  • Change records are sharded across topic partitions by primary key.
  • Each change is only delivered once (exactly-once delivery).
  • Changes by the same primary key are delivered to the same topic partition in the order they took place in the table.

Limitations

  • The number of topic partitions is fixed as of changefeed creation and remains unchanged (unlike tables, topics are not elastic).

  • Changefeeds support records of the following types of operations:

    • Updates
    • Deletes

    Adding rows is a special case of updates, and a record of adding a row in a changefeed will look similar to an update record.

Virtual timestamps

All changes in YDB tables are arranged according to the order in which transactions are performed. Each change is marked with a virtual timestamp which consists of two elements:

  1. Global coordinator time.
  2. Unique transaction ID.

Using these stamps, you can arrange records from different partitions of the topic relative to each other or use them for filtering (for example, to exclude old change records).

Note

By default, virtual timestamps are not uploaded to the changefeed. To enable them, use the appropriate parameter when creating a changefeed.

Record structure

Depending on the changefeed parameters, the structure of a record may differ.

A JSON record has the following structure:

{
    "key": [<key components>],
    "update": {<columns>},
    "erase": {},
    "newImage": {<columns>},
    "oldImage": {<columns>},
    "ts": [<step>, <txId>]
}
  • key: An array of primary key component values. Always present.
  • update: Update flag. Present if a record matches the update operation. In UPDATES mode, it also contains the names and values of updated columns.
  • erase: Erase flag. Present if a record matches the delete operation.
  • newImage: Row snapshot that results from its change. Present in NEW_IMAGE and NEW_AND_OLD_IMAGES modes. Contains column names and values.
  • oldImage: Row snapshot before its change. Present in OLD_IMAGE and NEW_AND_OLD_IMAGES modes. Contains column names and values.
  • ts: Virtual timestamp. Present if the VIRTUAL_TIMESTAMPS setting is enabled. Contains the value of the global coordinator time (step) and the unique transaction ID (txId).

Sample record of an update in UPDATES mode:

{
   "key": [1, "one"],
   "update": {
       "payload": "lorem ipsum",
       "date": "2022-02-22"
   }
}

Record of an erase:

{
   "key": [2, "two"],
   "erase": {}
}

Record with row snapshots:

{
   "key": [1, 2, 3],
   "update": {},
   "newImage": {
       "textColumn": "value1",
       "intColumn": 101,
       "boolColumn": true
   },
   "oldImage": {
       "textColumn": null,
       "intColumn": 100,
       "boolColumn": false
   }
}

Record with virtual timestamps:

{
   "key": [1],
   "update": {
       "created": "2022-12-12T00:00:00.000000Z",
       "customer": "Name123"
   },
   "ts": [1670792400, 562949953607163]
}

Note

  • The same record may not contain the update and erase fields simultaneously, since these fields are operation flags (you can't update and erase a table row at the same time). However, each record contains one of these fields (any operation is either an update or erase).
  • In UPDATES mode, the update field for update operations is an operation flag (update) and contains the names and values of updated columns.
  • JSON object fields containing column names and values (newImage, oldImage, and update in UPDATES mode), do not include the columns that are primary key components.
  • If a record contains the erase field (indicating that the record matches the erase operation), this is always an empty JSON object ({}).

Record retention period

By default, records are stored in the changefeed for 24 hours from the time they are sent. Depending on usage scenarios, the retention period can be reduced or increased up to 30 days.

Warning

Records whose retention time has expired are deleted, regardless of whether they were processed (read) or not.

Deleting records before they are processed by the client will cause offset skips, which means that the offsets of the last record read from the partition and the earliest available record will differ by more than one.

Creating and deleting a changefeed

You can add a changefeed to an existing table or delete it using the ADD CHANGEFEED and DROP CHANGEFEED directives of the YQL ALTER TABLE statement. When deleting a table, the changefeed added to it is also deleted.

CDC purpose and use

For information about using CDC when developing apps, see best practices.