Быстрый старт: чтение и запись в топики
В этом руководстве вы создадите свой первый потоковый запрос.
Запрос будет:
- читать события из входного топика;
- отбирать только ошибки;
- подсчитывать количество ошибок по каждому серверу за 10 минут;
- записывать результат в выходной топик.
События поступают в формате JSON с полями: время, уровень логирования и имя сервера.
Вы выполните следующие шаги:
- создание топиков;
- создание внешнего источника данных;
- создание потокового запроса;
- просмотр состояния запроса;
- заполнение входного топика данными;
- проверка содержимого выходного топика;
- удаление потокового запроса.
Предварительные условия
Для выполнения примеров вам потребуется:
- запущенная база YDB — см. quick start;
- включённые флаги
enable_external_data_sourcesиenable_streaming_queries.
docker run -d --rm --name ydb-local -h localhost \
--platform linux/amd64 \
-p 2135:2135 -p 2136:2136 -p 8765:8765 -p 9092:9092 \
-v $(pwd)/ydb_certs:/ydb_certs \
-e GRPC_TLS_PORT=2135 -e GRPC_PORT=2136 -e MON_PORT=8765 \
-e YDB_FEATURE_FLAGS=enable_external_data_sources,enable_streaming_queries \
ydbplatform/local-ydb:25.4
./local_ydb deploy \
--ydb-working-dir=/absolute/path/to/working/directory \
--ydb-binary-path=/path/to/kikimr/driver \
--enable-feature-flag=enable_external_data_sources \
--enable-feature-flag=enable_streaming_queries
Примечание
В примерах используется профиль quickstart, подробнее смотрите в Создание профиля для соединения с тестовой БД.
Шаг 1. Создание топиков
Создайте входной и выходной топики:
CREATE TOPIC input_topic;
CREATE TOPIC output_topic;
Проверьте, что топики созданы:
./ydb --profile quickstart scheme ls
Шаг 2. Создание внешнего источника данных
Создайте внешний источник данных с помощью CREATE EXTERNAL DATA SOURCE:
CREATE EXTERNAL DATA SOURCE ydb_source WITH (
SOURCE_TYPE = "Ydb",
LOCATION = "localhost:2136",
DATABASE_NAME = "/local",
AUTH_METHOD = "NONE"
);
Примечание
Укажите значения LOCATION и DATABASE_NAME, соответствующие вашей базе YDB.
Шаг 3. Создание потокового запроса
Создайте потоковый запрос с помощью CREATE STREAMING QUERY:
CREATE STREAMING QUERY query_example AS
DO BEGIN
$number_errors = SELECT
Host,
COUNT(*) AS ErrorCount,
CAST(HOP_START() AS String) AS Ts -- Время начала окна, соответствующего результату агрегации
FROM
ydb_source.input_topic
WITH (
FORMAT = json_each_row,
SCHEMA = (
Time String NOT NULL,
Level String NOT NULL,
Host String NOT NULL
)
)
WHERE
Level = "error"
GROUP BY
HOP(CAST(Time AS Timestamp), "PT600S", "PT600S", "PT0S"), -- Число ошибок на неперекрывающихся окнах длиной 10 минут
Host;
INSERT INTO
ydb_source.output_topic
SELECT
ToBytes(Unwrap(Yson::SerializeJson(Yson::From(TableRow())))) -- Сериализация всех колонок в JSON
FROM
$number_errors;
END DO
Подробнее:
- Агрегация
GROUP BY HOPи функцияHOP_START— GROUP BY ... HOP. - Запись данных в топик — Форматы при записи данных.
- Сериализация в JSON: TableRow, Yson::From, Yson::SerializeJson, Unwrap, ToBytes.
Шаг 4. Просмотр состояния запроса
Проверьте состояние запроса через системную таблицу streaming_queries:
SELECT
Path,
Status,
Issues,
Run
FROM
`.sys/streaming_queries`
Убедитесь, что в поле Status значение RUNNING. В противном случае проверьте поле Issues.
Если запрос находится в статусе SUSPENDED или в поле Issues есть ошибки, обратитесь к разделу диагностика ошибок.
Шаг 5. Заполнение входного топика данными
Запишите тестовые сообщения в топик с помощью YDB CLI:
echo '{"Time": "2025-01-01T00:00:00.000000Z", "Level": "error", "Host": "host-1"}' | ./ydb --profile quickstart topic write input_topic
echo '{"Time": "2025-01-01T00:04:00.000000Z", "Level": "error", "Host": "host-2"}' | ./ydb --profile quickstart topic write input_topic
echo '{"Time": "2025-01-01T00:08:00.000000Z", "Level": "error", "Host": "host-1"}' | ./ydb --profile quickstart topic write input_topic
echo '{"Time": "2025-01-01T00:12:00.000000Z", "Level": "error", "Host": "host-2"}' | ./ydb --profile quickstart topic write input_topic
echo '{"Time": "2025-01-01T00:12:00.000000Z", "Level": "error", "Host": "host-1"}' | ./ydb --profile quickstart topic write input_topic
Результат появится в выходном топике после закрытия 10-минутного окна агрегации.
Шаг 6. Проверка содержимого выходного топика
Прочитайте данные из выходного топика:
./ydb --profile quickstart topic read output_topic --partition-ids 0 --start-offset 0 --limit 10 --format newline-delimited
Ожидаемый результат:
{"ErrorCount":1,"Host":"host-2","Ts":"2025-01-01T00:00:00Z"}
{"ErrorCount":2,"Host":"host-1","Ts":"2025-01-01T00:00:00Z"}
Шаг 7. Удаление запроса
Удалите запрос с помощью DROP STREAMING QUERY:
DROP STREAMING QUERY query_example;
Что дальше
- Изучите форматы данных, поддерживаемые в потоковых запросах.
- Узнайте, как обогащать данные справочником из локальной таблицы или из S3.
- Научитесь записывать результаты в таблицы.