CREATE STREAMING QUERY

CREATE STREAMING QUERY создаёт потоковый запрос.

Синтаксис

CREATE [OR REPLACE] STREAMING QUERY [IF NOT EXISTS] <query_name> [WITH (
    <key1> = <value1>,
    <key2> = <value2>,
    ...
)] AS
DO BEGIN
    <query_statement1>;
    <query_statement2>;
    ...
END DO

Параметры

  • OR REPLACE — если потоковый запрос с таким именем уже существует, то он будет заменён на новый запрос с сохранением смещений чтения из топика.
  • IF NOT EXISTS — не выводить ошибку, если потоковый запрос с таким именем уже существует, в этом случае существующий запрос останется неизменённым.
  • query_name — имя потокового запроса, который нужно создать.
  • WITH (<key> = <value>) — список настроек нового потокового запроса, опционально.
  • AS DO BEGIN ... END DO — полный текст нового потокового запроса, включая все необходимые SQL-выражения. Ограничения для текста запроса приведены в Ограничения, примеры текста см. ниже.

Настройки OR REPLACE и IF NOT EXISTS нельзя использовать одновременно.

Доступные параметры блока WITH:

  • RUN = (TRUE|FALSE) — запустить запрос после создания, по умолчанию TRUE.
  • RESOURCE_POOL = <resource_pool_name> — имя пула ресурсов, в котором будет выполняться запрос.

Примеры создания потокового запроса см. ниже.

Использование читателя

Читатель (consumer) — это именованная подписка на топик, которая хранит текущую позицию чтения.

Читатель создаётся через CLI или при создании топика с помощью CREATE TOPIC. Имя читателя указывается в тексте запроса с помощью прагмы:

PRAGMA pq.Consumer="my_consumer";

Если читатель не указан, чтение из топика выполняется без читателя. Позиция чтения в обоих случаях сохраняется в чекпоинте. Указание читателя позволяет отслеживать позицию чтения и лаг со стороны топика, например, через CLI.

Примеры

Запись в топик (JSON)

Запрос читает события из входного топика, формирует JSON-объект из отдельных полей и записывает результат в выходной топик.

Функция AsStruct создаёт структуру из указанных полей, Yson::From преобразует её в Yson, Yson::SerializeJson сериализует в JSON-строку, а ToBytes конвертирует в тип String, который требуется для записи в топик.

Примечание

Запись в топики выполняется через external data source. В примере ydb_source — это заранее созданный external data source, а output_topic и input_topic — топики, доступные через него.

CREATE STREAMING QUERY my_streaming_query AS
DO BEGIN

    -- ydb_source — external data source для работы с топиками
    INSERT INTO ydb_source.output_topic
    SELECT
        -- Формирование JSON из отдельных полей
        ToBytes(Unwrap(Yson::SerializeJson(Yson::From(
            AsStruct(Id AS id, Name AS name)
        ))))
    FROM
        -- Чтение из топика
        ydb_source.input_topic
    WITH (
        FORMAT = json_each_row,  -- Формат входных данных
        SCHEMA = (               -- Схема входных данных
            Id Uint64 NOT NULL,
            Name Utf8 NOT NULL
        )
    );

END DO

Запись в таблицу

Запрос читает события из топика и записывает их в таблицу output_table. Таблица должна быть создана заранее со схемой, соответствующей выбираемым колонкам.

Важно

Запись в таблицы в потоковых запросах поддерживается только в режиме UPSERT. Операция INSERT INTO не поддерживается, так как при повторной обработке событий (гарантия at-least-once) она привела бы к дублированию строк. При UPSERT, если строка с таким первичным ключом уже существует, она будет обновлена, иначе будет вставлена новая строка, a INSERT INTO завершится с ошибкой.

CREATE STREAMING QUERY my_streaming_query AS
DO BEGIN

    -- Запись в таблицу (только UPSERT, INSERT не поддерживается)
    UPSERT INTO output_table
    SELECT
        Id,
        Name
    FROM
        -- ydb_source — external data source для работы с топиками
        ydb_source.input_topic
    WITH (
        FORMAT = json_each_row,  -- Формат входных данных
        SCHEMA = (               -- Схема входных данных
            Id Uint64 NOT NULL,
            Name Utf8 NOT NULL
        )
    );

END DO

Запуск в пуле ресурсов

Запрос создаётся в указанном пуле ресурсов, но не запускается автоматически (RUN = FALSE). Это позволяет проверить конфигурацию перед запуском или запустить запрос позже через ALTER STREAMING QUERY.

CREATE STREAMING QUERY my_streaming_query WITH (
    RUN = FALSE,                      -- Не запускать автоматически
    RESOURCE_POOL = my_resource_pool  -- Пул ресурсов для выполнения
) AS
DO BEGIN

    -- ydb_source — external data source для работы с топиками
    INSERT INTO ydb_source.output_topic
    SELECT
        ToBytes(Unwrap(Yson::SerializeJson(Yson::From(
            AsStruct(Id AS id, Name AS name)
        ))))
    FROM
        ydb_source.input_topic
    WITH (
        FORMAT = json_each_row,
        SCHEMA = (
            Id Uint64 NOT NULL,
            Name Utf8 NOT NULL
        )
    );

END DO

Другие примеры: Типичные шаблоны потоковых запросов.

См. также

Предыдущая
Следующая