CREATE STREAMING QUERY

CREATE STREAMING QUERY creates a streaming query.

Syntax

CREATE [OR REPLACE] STREAMING QUERY [IF NOT EXISTS] <query_name> [WITH (
    <key1> = <value1>,
    <key2> = <value2>,
    ...
)] AS
DO BEGIN
    <query_statement1>;
    <query_statement2>;
    ...
END DO

Parameters

  • OR REPLACE — if a streaming query with this name already exists, replace it while preserving read offsets from topics.
  • IF NOT EXISTS — do not fail if a streaming query with this name already exists; leave the existing query unchanged.
  • query_name — name of the streaming query to create.
  • WITH (<key> = <value>) — optional list of settings for the new streaming query.
  • AS DO BEGIN ... END DO — full query text including all SQL statements. Limitations are described in Limitations; examples are below.

You cannot use OR REPLACE and IF NOT EXISTS together.

WITH parameters:

  • RUN = (TRUE|FALSE) — start the query after creation; default TRUE.
  • RESOURCE_POOL = <resource_pool_name> — name of the resource pool where the query runs.

Creation examples are below.

Consumer usage

A consumer is a named subscription to a topic that stores the current read position.

Create a consumer with the CLI or when creating a topic with CREATE TOPIC. Set the consumer name in the query with a pragma:

PRAGMA pq.Consumer="my_consumer";

If no consumer is specified, the topic is read without a named consumer. In both cases the read position is stored in a checkpoint. A consumer lets you track position and lag from the topic side, for example via the CLI.

Examples

Write to a topic (JSON)

The query reads events from the input topic, builds a JSON object from fields, and writes to the output topic.

AsStruct builds a structure, Yson::From converts to Yson, Yson::SerializeJson serializes to JSON, and ToBytes converts to String for topic writes.

Note

Topic writes go through an external data source. In the example, ydb_source is a pre-created external data source; output_topic and input_topic are topics available through it.

CREATE STREAMING QUERY my_streaming_query AS
DO BEGIN

    -- ydb_source — external data source for topics
    INSERT INTO ydb_source.output_topic
    SELECT
        -- Build JSON from fields
        ToBytes(Unwrap(Yson::SerializeJson(Yson::From(
            AsStruct(Id AS id, Name AS name)
        ))))
    FROM
        -- Read from topic
        ydb_source.input_topic
    WITH (
        FORMAT = json_each_row,  -- Input format
        SCHEMA = (               -- Input schema
            Id Uint64 NOT NULL,
            Name Utf8 NOT NULL
        )
    );

END DO

Write to a table

The query reads events from a topic and writes them to output_table. Create the table beforehand with a matching schema.

Warning

Table writes in streaming queries support UPSERT only. INSERT INTO is not supported: with at-least-once retries, it would duplicate rows. With UPSERT, an existing row with the same primary key is updated; otherwise a row is inserted, while INSERT INTO fails.

CREATE STREAMING QUERY my_streaming_query AS
DO BEGIN

    -- Table write (UPSERT only; INSERT not supported)
    UPSERT INTO output_table
    SELECT
        Id,
        Name
    FROM
        -- ydb_source — external data source for topics
        ydb_source.input_topic
    WITH (
        FORMAT = json_each_row,  -- Input format
        SCHEMA = (               -- Input schema
            Id Uint64 NOT NULL,
            Name Utf8 NOT NULL
        )
    );

END DO

Start in a resource pool

The query is created in the given resource pool but not started automatically (RUN = FALSE). You can validate configuration first or start later with ALTER STREAMING QUERY.

CREATE STREAMING QUERY my_streaming_query WITH (
    RUN = FALSE,                      -- Do not auto-start
    RESOURCE_POOL = my_resource_pool  -- Pool for execution
) AS
DO BEGIN

    -- ydb_source — external data source for topics
    INSERT INTO ydb_source.output_topic
    SELECT
        ToBytes(Unwrap(Yson::SerializeJson(Yson::From(
            AsStruct(Id AS id, Name AS name)
        ))))
    FROM
        ydb_source.input_topic
    WITH (
        FORMAT = json_each_row,
        SCHEMA = (
            Id Uint64 NOT NULL,
            Name Utf8 NOT NULL
        )
    );

END DO

More examples: Common streaming query patterns.

See also