Типичные шаблоны потоковых запросов

В этом разделе собраны минимальные примеры потоковых запросов для наиболее распространённых сценариев. Сначала описан базовый шаблон чтения данных из топика, затем — варианты полноценной работы с данными: обработка данных и запись результатов в топик в формате JSON, в топик в виде строки и в таблицу. Каждый пример можно использовать как отправную точку для собственных задач.

Чтение данных из топика

Чтение данных из топика выполняется с помощью SELECT ... FROM ... WITH (FORMAT, SCHEMA). Блок WITH указывает формат входных данных и схему — какие поля ожидаются в каждом сообщении и их типы. Этот шаблон используется во всех последующих примерах.

Примечание

Работа с топиками выполняется через external data source.

В примерах:

  • ydb_source — заранее созданный external data source;
  • input_topic - топик, откуда производится чтение данных;
  • output_topic - топик, куда производится запись результатов;
  • output_table — таблица YDB, куда производится запись результатов.

Следующий фрагмент показывает чтение событий из топика в формате JSON. Он используется внутри CREATE STREAMING QUERY в блоке DO BEGIN ... END DO:

SELECT
    *
FROM
    ydb_source.input_topic
WITH (
    FORMAT = json_each_row,
    SCHEMA = (
        Id Uint64 NOT NULL,
        Name Utf8 NOT NULL
    )
);

Подробнее о форматах: Форматы данных при чтении/записи из топиков.

Запись в топик (JSON)

Запрос читает события из входного топика, формирует JSON-объект из отдельных полей и записывает результат в выходной топик. Функция AsStruct создаёт структуру из указанных полей, Yson::From преобразует её в Yson, Yson::SerializeJson сериализует в JSON-строку, а ToBytes конвертирует в тип String, который требуется для записи в топик.

CREATE STREAMING QUERY write_json_example AS
DO BEGIN

-- ydb_source — external data source для работы с топиками
INSERT INTO ydb_source.output_topic
SELECT
    -- Формирование JSON из отдельных полей
    ToBytes(Unwrap(Yson::SerializeJson(Yson::From(
        AsStruct(Id AS id, Name AS name)
    ))))
FROM
    ydb_source.input_topic
WITH (
    FORMAT = json_each_row,  -- Формат входных данных
    SCHEMA = (               -- Схема входных данных
        Id Uint64 NOT NULL,
        Name Utf8 NOT NULL
    )
);

END DO

Подробнее о функциях:

Запись в топик (строка)

Запрос читает события из входного топика и записывает в выходной топик одно поле в виде строки. Для записи в топик строк SELECT должен возвращать одну колонку типа String или Utf8.

CREATE STREAMING QUERY write_utf8_example AS
DO BEGIN

-- ydb_source — external data source для работы с топиками
INSERT INTO ydb_source.output_topic
SELECT
    Name
FROM
    ydb_source.input_topic
WITH (
    FORMAT = json_each_row,  -- Формат входных данных
    SCHEMA = (               -- Схема входных данных
        Id Uint64 NOT NULL,
        Name Utf8 NOT NULL
    )
);

END DO

Подробнее о форматах записи: Форматы при записи данных.

Запись в таблицу

Запрос читает события из топика и записывает их в таблицу output_table. Таблица должна быть создана заранее со схемой, соответствующей выбираемым колонкам.

Важно

Запись в таблицы в потоковых запросах поддерживается только в режиме UPSERT. Операция INSERT INTO не поддерживается, так как при повторной обработке событий (гарантия at-least-once) она привела бы к дублированию строк. При UPSERT, если строка с таким первичным ключом уже существует, она будет обновлена, иначе будет вставлена новая строка, a INSERT INTO завершится с ошибкой.

CREATE STREAMING QUERY write_table_example AS
DO BEGIN

-- Запись в таблицу (только UPSERT, INSERT не поддерживается)
UPSERT INTO output_table
SELECT
    Id,
    Name
FROM
    -- ydb_source — external data source для работы с топиками
    ydb_source.input_topic
WITH (
    FORMAT = json_each_row,  -- Формат входных данных
    SCHEMA = (               -- Схема входных данных
        Id Uint64 NOT NULL,
        Name Utf8 NOT NULL
    )
);

END DO

Подробнее: Запись в таблицы.

См. также