CREATE STREAMING QUERY
CREATE STREAMING QUERY создаёт потоковый запрос.
Синтаксис
CREATE [OR REPLACE] STREAMING QUERY [IF NOT EXISTS] <query_name> [WITH (
<key1> = <value1>,
<key2> = <value2>,
...
)] AS
DO BEGIN
<query_statement1>;
<query_statement2>;
...
END DO
Параметры
OR REPLACE— если потоковый запрос с таким именем уже существует, то он будет заменён на новый запрос с сохранением смещений чтения из топика.IF NOT EXISTS— не выводить ошибку, если потоковый запрос с таким именем уже существует, в этом случае существующий запрос останется неизменённым.query_name— имя потокового запроса, который нужно создать.WITH (<key> = <value>)— список настроек нового потокового запроса, опционально.AS DO BEGIN ... END DO— полный текст нового потокового запроса, включая все необходимые SQL-выражения. Ограничения для текста запроса приведены в Ограничения, примеры текста см. ниже.
Настройки OR REPLACE и IF NOT EXISTS нельзя использовать одновременно.
Доступные параметры блока WITH:
RUN = (TRUE|FALSE)— запустить запрос после создания, по умолчаниюTRUE.RESOURCE_POOL = <resource_pool_name>— имя пула ресурсов, в котором будет выполняться запрос.
Примеры создания потокового запроса см. ниже.
Использование читателя
Читатель (consumer) — это именованная подписка на топик, которая хранит текущую позицию чтения.
Читатель создаётся через CLI или при создании топика с помощью CREATE TOPIC. Имя читателя указывается в тексте запроса с помощью прагмы:
PRAGMA pq.Consumer="my_consumer";
Если читатель не указан, чтение из топика выполняется без читателя. Позиция чтения в обоих случаях сохраняется в чекпоинте. Указание читателя позволяет отслеживать позицию чтения и лаг со стороны топика, например, через CLI.
Примеры
Запись в топик (JSON)
Запрос читает события из входного топика, формирует JSON-объект из отдельных полей и записывает результат в выходной топик.
Функция AsStruct создаёт структуру из указанных полей, Yson::From преобразует её в Yson, Yson::SerializeJson сериализует в JSON-строку, а ToBytes конвертирует в тип String, который требуется для записи в топик.
Примечание
Запись в топики выполняется через external data source. В примере ydb_source — это заранее созданный external data source, а output_topic и input_topic — топики, доступные через него.
CREATE STREAMING QUERY my_streaming_query AS
DO BEGIN
-- ydb_source — external data source для работы с топиками
INSERT INTO ydb_source.output_topic
SELECT
-- Формирование JSON из отдельных полей
ToBytes(Unwrap(Yson::SerializeJson(Yson::From(
AsStruct(Id AS id, Name AS name)
))))
FROM
-- Чтение из топика
ydb_source.input_topic
WITH (
FORMAT = json_each_row, -- Формат входных данных
SCHEMA = ( -- Схема входных данных
Id Uint64 NOT NULL,
Name Utf8 NOT NULL
)
);
END DO
Запись в таблицу
Запрос читает события из топика и записывает их в таблицу output_table. Таблица должна быть создана заранее со схемой, соответствующей выбираемым колонкам.
Важно
Запись в таблицы в потоковых запросах поддерживается только в режиме UPSERT. Операция INSERT INTO не поддерживается, так как при повторной обработке событий (гарантия at-least-once) она привела бы к дублированию строк. При UPSERT, если строка с таким первичным ключом уже существует, она будет обновлена, иначе будет вставлена новая строка, a INSERT INTO завершится с ошибкой.
CREATE STREAMING QUERY my_streaming_query AS
DO BEGIN
-- Запись в таблицу (только UPSERT, INSERT не поддерживается)
UPSERT INTO output_table
SELECT
Id,
Name
FROM
-- ydb_source — external data source для работы с топиками
ydb_source.input_topic
WITH (
FORMAT = json_each_row, -- Формат входных данных
SCHEMA = ( -- Схема входных данных
Id Uint64 NOT NULL,
Name Utf8 NOT NULL
)
);
END DO
Запуск в пуле ресурсов
Запрос создаётся в указанном пуле ресурсов, но не запускается автоматически (RUN = FALSE). Это позволяет проверить конфигурацию перед запуском или запустить запрос позже через ALTER STREAMING QUERY.
CREATE STREAMING QUERY my_streaming_query WITH (
RUN = FALSE, -- Не запускать автоматически
RESOURCE_POOL = my_resource_pool -- Пул ресурсов для выполнения
) AS
DO BEGIN
-- ydb_source — external data source для работы с топиками
INSERT INTO ydb_source.output_topic
SELECT
ToBytes(Unwrap(Yson::SerializeJson(Yson::From(
AsStruct(Id AS id, Name AS name)
))))
FROM
ydb_source.input_topic
WITH (
FORMAT = json_each_row,
SCHEMA = (
Id Uint64 NOT NULL,
Name Utf8 NOT NULL
)
);
END DO
Другие примеры: Типичные шаблоны потоковых запросов.