Using the TRANSFER mechanism to implement SCD2 based on data from CDC sources in YDB
This article describes the implementation of the Slowly Changing Dimensions Type 2 (SCD2) pattern in YDB.
Used tools
For delivering data to the SCD2 table, the following combination of functional capabilities of YDB will be used in this article:
- The source table will be row-oriented for operational transactional changes;
- The target table will be column-oriented for efficient execution of analytical queries;
- The subscription to changes in the source table will be implemented through the Change Data Capture (CDC) mechanism;
- The buffering of changes will be handled by an implicitly created CDC topic;
- The automatic transfer of data from the CDC topic to the target table will be handled by transfer.
Create a source data table that will generate CDC events
CREATE TABLE source_customers (
id Utf8 NOT NULL,
attribute1 Utf8,
attribute2 Utf8,
change_time Timestamp,
PRIMARY KEY (id)
);
ALTER TABLE `source_customers` ADD CHANGEFEED `updates` WITH (
FORMAT = 'DEBEZIUM_JSON',
MODE = 'NEW_AND_OLD_IMAGES'
);
Create a target data table in SCD2 format
CREATE TABLE dimension_scd2 (
id Utf8 NOT NULL,
attribute1 Utf8,
attribute2 Utf8,
valid_from Timestamp NOT NULL,
deleted_at Timestamp ,
PRIMARY KEY (valid_from, id)
)
PARTITION BY HASH(valid_from, id)
WITH (
STORE=COLUMN
)
Create a transfer and lambda function to process CDC data
Features of processing CDC data in Debezium format:
- When creating a record (
op = "c"), data is taken from theafterfield, and the value of the moment of change of the record from thechanged_atfield is saved in thevalid_fromfield. - When updating a record (
op = "u"), data is also taken from theafterfield, and the value of the moment of change of the record from thechanged_atfield is saved in thevalid_fromfield. - When deleting a record (
op = "d"), data is taken from thebeforefield, and the system moment of time of receiving information about the deletion of the record is set in thedeleted_atfield.
$transformation_lambda = ($msg) -> {
$cdc_data = CAST($msg._data AS Json);
-- Define the type of operation
$operation = Json::ConvertToString($cdc_data.payload.op);
$is_deleted = $operation == "d";
-- Get data depending on the type of operation
$data = IF($is_deleted, $cdc_data.payload.before, $cdc_data.payload.after);
$result = IF($data is not NULL, <|
id: Unwrap(CAST(Json::ConvertToString($data.id) AS Utf8)),
attribute1: CAST(Json::ConvertToString($data.attribute1) AS Utf8),
attribute2: CAST(Json::ConvertToString($data.attribute2) AS Utf8),
deleted_at: IF($is_deleted, DateTime::FromMilliseconds(Json::ConvertToUint64($cdc_data.payload.source.ts_ms)), NULL),
valid_from : Unwrap(cast(datetime::MakeDatetime(datetime::ParseIso8601(Json::ConvertToString($data.change_time))) as timestamp)),
|>, NULL);
return $result;
};
-- In this example, a high frequency of updates to the target table is configured.
-- This is done solely for clarity. For production scenarios, larger values should be set
CREATE TRANSFER dimension_scd2_cdc_transfer
FROM `source_customers/updates` TO dimension_scd2 USING $transformation_lambda
WITH (
FLUSH_INTERVAL=Interval("PT1S")
);
Note
The id column in the receiving table dimension_scd2 is declared as Utf8 NOT NULL, while in Json CDC, data that cannot be converted to a string may be passed, i.e., the result of converting data from Json may be a NULL value. The Unwrap function guarantees that after its execution, there cannot be a NULL value or a runtime error. This ensures that the result of executing the lambda function will either be a fully correct data structure, or a runtime error.
Demonstration of work
To demonstrate the work with CDC data, data will be written to the source table in YDB, which will generate CDC events:
Inserting a new record
INSERT INTO source_customers (id, attribute1, attribute2, change_time)
VALUES ('CUSTOMER_1001', 'John Doe', 'Los Angeles', CurrentUtcTimestamp());
This action will create a CDC event with the operation type op = "c" approximately of the following form:
{
"payload":{
"op":"c",
"source":{
"txId":18446744073709551615,
"connector":"ydb",
"version":"1.0.0",
"step":1755883683690,
"ts_ms":1755883683652,
"snapshot":false
},
"after":{
"attribute1":"John Doe",
"change_time":"2025-08-22T17:28:03.648313Z",
"id":"CUSTOMER_1001",
"attribute2":"Los Angeles"
}
}
}
The content of the dimension_scd2 table after executing the command above will be as follows:
| id | attribute1 | attribute2 | deleted_at | valid_from |
|---|---|---|---|---|
| CUSTOMER_1001 | John Doe | New York | NULL | 2025-08-22T17:28:03.648313Z |
Updating an existing record
UPSERT INTO source_customers (id, attribute1, attribute2, change_time)
VALUES ('CUSTOMER_1001', 'John Doe 2', 'Los Angeles 2', CurrentUtcTimestamp());
This action will create a CDC event with the operation type op = "u" approximately of the following form:
{
"payload":{
"op":"u",
"source":{
"txId":18446744073709551615,
"connector":"ydb",
"version":"1.0.0",
"step":1755883278000,
"ts_ms":1755883278357,
"snapshot":false
},
"after":{
"attribute1":"John Doe 2",
"change_time":"2025-08-22T17:31:18.357503Z",
"id":"CUSTOMER_1001",
"attribute2":"Los Angeles 2"
},
"before":{
"attribute1":"John Doe 2",
"change_time":"2025-08-22T17:28:03.648313Z",
"id":"CUSTOMER_1001",
"attribute2":"Los Angeles 2"
}
}
}
The content of the dimension_scd2 table after executing the command above will be as follows:
| id | attribute1 | attribute2 | deleted_at | valid_from |
|---|---|---|---|---|
| CUSTOMER_1001 | John Doe | New York | NULL | 2025-08-22T17:28:03.648313Z |
| CUSTOMER_1001 | John Doe 2 | New York 2 | NULL | 2025-08-22T17:31:18.357503Z |
Deleting a record
DELETE FROM source_customers WHERE id = 'CUSTOMER_1001';
This action will create a CDC event with the operation type op = "d" approximately of the following form:
{
"payload":{
"op":"d",
"source":{
"txId":18446744073709551615,
"connector":"ydb",
"version":"1.0.0",
"step":1755883827000,
"ts_ms":1755883827948,
"snapshot":false
},
"before":{
"attribute1":"John Doe",
"change_time":"2025-08-22T17:38:03.648313Z",
"id":"CUSTOMER_1001",
"attribute2":"Los Angeles"
}
}
}
The content of the dimension_scd2 table after executing the command above will be as follows:
| id | attribute1 | attribute2 | deleted_at | valid_from |
|---|---|---|---|---|
| CUSTOMER_1001 | John Doe | New York | NULL | 2025-08-22T17:28:03.648313Z |
| CUSTOMER_1001 | John Doe 2 | New York 2 | NULL | 2025-08-22T17:31:18.357503Z |
| CUSTOMER_1001 | John Doe 2 | New York 2 | 2025-08-22T17:38:03.648313Z | 2025-08-22T17:38:03.648313Z |
Example query to retrieve current data
To retrieve data from SCD2 append‑only tables, you can use the following query, which fetches data as of 2025-08-22 19:11:30:
DECLARE $as_of AS Timestamp;
$as_of = Timestamp("2025-08-22T19:11:30.000000Z");
SELECT
id,
data.attribute1 AS attribute1,
data.attribute2 AS attribute2,
data.valid_from AS valid_from,
data.deleted_at AS deleted_at
FROM (
SELECT
id,
MAX_BY(TableRow(), valid_from) AS data
FROM dimension_scd2
WHERE
valid_from <= $as_of
AND (deleted_at IS NULL OR deleted_at > $as_of) -- The record has not yet been deleted as of time X
GROUP BY id
)