Типичные шаблоны потоковых запросов
В этом разделе собраны минимальные примеры потоковых запросов для наиболее распространённых сценариев. Сначала описан базовый шаблон чтения данных из топика, затем — варианты полноценной работы с данными: обработка данных и запись результатов в топик в формате JSON, в топик в виде строки и в таблицу. Каждый пример можно использовать как отправную точку для собственных задач.
Чтение данных из топика
Чтение данных из топика выполняется с помощью SELECT ... FROM ... WITH (FORMAT, SCHEMA). Блок WITH указывает формат входных данных и схему — какие поля ожидаются в каждом сообщении и их типы. Этот шаблон используется во всех последующих примерах.
Примечание
Работа с топиками выполняется через external data source.
В примерах:
ydb_source— заранее созданныйexternal data source;input_topic- топик, откуда производится чтение данных;output_topic- топик, куда производится запись результатов;output_table— таблица YDB, куда производится запись результатов.
Следующий фрагмент показывает чтение событий из топика в формате JSON. Он используется внутри CREATE STREAMING QUERY в блоке DO BEGIN ... END DO:
SELECT
*
FROM
ydb_source.input_topic
WITH (
FORMAT = json_each_row,
SCHEMA = (
Id Uint64 NOT NULL,
Name Utf8 NOT NULL
)
);
Подробнее о форматах: Форматы данных при чтении/записи из топиков.
Запись в топик (JSON)
Запрос читает события из входного топика, формирует JSON-объект из отдельных полей и записывает результат в выходной топик. Функция AsStruct создаёт структуру из указанных полей, Yson::From преобразует её в Yson, Yson::SerializeJson сериализует в JSON-строку, а ToBytes конвертирует в тип String, который требуется для записи в топик.
CREATE STREAMING QUERY write_json_example AS
DO BEGIN
-- ydb_source — external data source для работы с топиками
INSERT INTO ydb_source.output_topic
SELECT
-- Формирование JSON из отдельных полей
ToBytes(Unwrap(Yson::SerializeJson(Yson::From(
AsStruct(Id AS id, Name AS name)
))))
FROM
ydb_source.input_topic
WITH (
FORMAT = json_each_row, -- Формат входных данных
SCHEMA = ( -- Схема входных данных
Id Uint64 NOT NULL,
Name Utf8 NOT NULL
)
);
END DO
Подробнее о функциях:
Запись в топик (строка)
Запрос читает события из входного топика и записывает в выходной топик одно поле в виде строки. Для записи в топик строк SELECT должен возвращать одну колонку типа String или Utf8.
CREATE STREAMING QUERY write_utf8_example AS
DO BEGIN
-- ydb_source — external data source для работы с топиками
INSERT INTO ydb_source.output_topic
SELECT
Name
FROM
ydb_source.input_topic
WITH (
FORMAT = json_each_row, -- Формат входных данных
SCHEMA = ( -- Схема входных данных
Id Uint64 NOT NULL,
Name Utf8 NOT NULL
)
);
END DO
Подробнее о форматах записи: Форматы при записи данных.
Запись в таблицу
Запрос читает события из топика и записывает их в таблицу output_table. Таблица должна быть создана заранее со схемой, соответствующей выбираемым колонкам.
Важно
Запись в таблицы в потоковых запросах поддерживается только в режиме UPSERT. Операция INSERT INTO не поддерживается, так как при повторной обработке событий (гарантия at-least-once) она привела бы к дублированию строк. При UPSERT, если строка с таким первичным ключом уже существует, она будет обновлена, иначе будет вставлена новая строка, a INSERT INTO завершится с ошибкой.
CREATE STREAMING QUERY write_table_example AS
DO BEGIN
-- Запись в таблицу (только UPSERT, INSERT не поддерживается)
UPSERT INTO output_table
SELECT
Id,
Name
FROM
-- ydb_source — external data source для работы с топиками
ydb_source.input_topic
WITH (
FORMAT = json_each_row, -- Формат входных данных
SCHEMA = ( -- Схема входных данных
Id Uint64 NOT NULL,
Name Utf8 NOT NULL
)
);
END DO
Подробнее: Запись в таблицы.