Writing to tables
Writing to tables lets you persist streaming query results for analysis with regular SQL. For example, you can aggregate events from a stream and store summaries in a table.
Writes use UPSERT INTO — insert a new row or update an existing row by primary key. UPSERT is idempotent by primary key: writing the same row again updates it rather than duplicating. That matters because streaming queries provide at-least-once delivery — after recovery from a checkpoint, some events may be processed more than once.
Alert
Not supported:
- INSERT INTO — use UPSERT INTO instead.
INSERT INTOwould duplicate rows on retries under at-least-once delivery. - Writing to YDB tables in external databases. Currently only local tables can be written to.
Example
The query reads events from a topic and writes them to output_table. Ts is cast from string to Timestamp, and Unwrap removes optionality.
CREATE STREAMING QUERY query_with_table_write AS
DO BEGIN
-- Read from topic and write to table
UPSERT INTO
output_table
SELECT
-- String to Timestamp
Unwrap(CAST(Ts AS Timestamp)) AS Ts,
Country,
Count
FROM
-- Read events from topic
ydb_source.input_topic
WITH (
-- Topic data format
FORMAT = json_each_row,
-- Data schema
SCHEMA = (
Ts String NOT NULL,
Count Uint64 NOT NULL,
Country Utf8 NOT NULL
)
);
END DO
Was the article helpful?
Previous
Next