Common streaming query patterns

This section collects minimal examples of streaming queries for typical scenarios. It starts with a basic topic read, then shows end-to-end processing: handling data and writing results to a topic as JSON, to a topic as a plain string, and to a table. Each example can be used as a starting point for your own workloads.

Reading from a topic

Read from a topic using SELECT ... FROM ... WITH (FORMAT, SCHEMA). The WITH block specifies the input format and schema—the fields expected in each message and their types. This pattern appears in all examples below.

Note

Topics are accessed through an external data source.

In the examples:

  • ydb_source — a pre-created external data source;
  • input_topic — topic to read from;
  • output_topic — topic to write results to;
  • output_table — YDB table to write results to.

The following snippet reads JSON events from a topic. Use it inside CREATE STREAMING QUERY in a DO BEGIN ... END DO block:

SELECT
    *
FROM
    ydb_source.input_topic
WITH (
    FORMAT = json_each_row,
    SCHEMA = (
        Id Uint64 NOT NULL,
        Name Utf8 NOT NULL
    )
);

For more on formats, see Topic read and write formats.

Writing to a topic (JSON)

The query reads events from the input topic, builds a JSON object from fields, and writes to the output topic. AsStruct builds a structure from the fields, Yson::From converts it to Yson, Yson::SerializeJson serializes to a JSON string, and ToBytes converts to String, which is required for topic writes.

CREATE STREAMING QUERY write_json_example AS
DO BEGIN

-- ydb_source — external data source for topics
INSERT INTO ydb_source.output_topic
SELECT
    -- Build JSON from fields
    ToBytes(Unwrap(Yson::SerializeJson(Yson::From(
        AsStruct(Id AS id, Name AS name)
    ))))
FROM
    ydb_source.input_topic
WITH (
    FORMAT = json_each_row,  -- Input data format
    SCHEMA = (               -- Input schema
        Id Uint64 NOT NULL,
        Name Utf8 NOT NULL
    )
);

END DO

More on the functions:

Writing to a topic (string)

The query reads events from the input topic and writes a single field as a string to the output topic. Topic writes require SELECT to return a single column of type String or Utf8.

CREATE STREAMING QUERY write_utf8_example AS
DO BEGIN

-- ydb_source — external data source for topics
INSERT INTO ydb_source.output_topic
SELECT
    Name
FROM
    ydb_source.input_topic
WITH (
    FORMAT = json_each_row,  -- Input data format
    SCHEMA = (               -- Input schema
        Id Uint64 NOT NULL,
        Name Utf8 NOT NULL
    )
);

END DO

More on write formats: Write formats.

Writing to a table

The query reads events from a topic and writes them to output_table. Create the table beforehand with a schema that matches the selected columns.

Warning

Table writes in streaming queries support UPSERT only. INSERT INTO is not supported: with at-least-once delivery, retries would duplicate rows. With UPSERT, an existing row with the same primary key is updated; otherwise a new row is inserted, while INSERT INTO fails.

CREATE STREAMING QUERY write_table_example AS
DO BEGIN

-- Write to table (UPSERT only; INSERT is not supported)
UPSERT INTO output_table
SELECT
    Id,
    Name
FROM
    -- ydb_source — external data source for topics
    ydb_source.input_topic
WITH (
    FORMAT = json_each_row,  -- Input data format
    SCHEMA = (               -- Input schema
        Id Uint64 NOT NULL,
        Name Utf8 NOT NULL
    )
);

END DO

More details: Writing to tables.

See also