Быстрый старт: чтение и запись в топики

В этом руководстве вы создадите свой первый потоковый запрос.

Запрос будет:

  • читать события из входного топика;
  • отбирать только ошибки;
  • подсчитывать количество ошибок по каждому серверу за 10 минут;
  • записывать результат в выходной топик.

События поступают в формате JSON с полями: время, уровень логирования и имя сервера.

Вы выполните следующие шаги:

Предварительные условия

Для выполнения примеров вам потребуется:

  • запущенная база YDB — см. quick start;
  • включённые флаги enable_external_data_sources и enable_streaming_queries.
docker run -d --rm --name ydb-local -h localhost \
  --platform linux/amd64 \
  -p 2135:2135 -p 2136:2136 -p 8765:8765 -p 9092:9092 \
  -v $(pwd)/ydb_certs:/ydb_certs \
  -e GRPC_TLS_PORT=2135 -e GRPC_PORT=2136 -e MON_PORT=8765 \
  -e YDB_FEATURE_FLAGS=enable_external_data_sources,enable_streaming_queries \
  ydbplatform/local-ydb:25.4
./local_ydb deploy \
  --ydb-working-dir=/absolute/path/to/working/directory \
  --ydb-binary-path=/path/to/kikimr/driver \
  --enable-feature-flag=enable_external_data_sources \
  --enable-feature-flag=enable_streaming_queries

Примечание

В примерах используется профиль quickstart, подробнее смотрите в Создание профиля для соединения с тестовой БД.

Шаг 1. Создание топиков

Создайте входной и выходной топики:

CREATE TOPIC input_topic;
CREATE TOPIC output_topic;

Проверьте, что топики созданы:

./ydb --profile quickstart scheme ls

Шаг 2. Создание внешнего источника данных

Создайте внешний источник данных с помощью CREATE EXTERNAL DATA SOURCE:

CREATE EXTERNAL DATA SOURCE ydb_source WITH (
    SOURCE_TYPE = "Ydb",
    LOCATION = "localhost:2136",
    DATABASE_NAME = "/local",
    AUTH_METHOD = "NONE"
);

Примечание

Укажите значения LOCATION и DATABASE_NAME, соответствующие вашей базе YDB.

Шаг 3. Создание потокового запроса

Создайте потоковый запрос с помощью CREATE STREAMING QUERY:

CREATE STREAMING QUERY query_example AS
DO BEGIN

$number_errors = SELECT
    Host,
    COUNT(*) AS ErrorCount,
    CAST(HOP_START() AS String) AS Ts  -- Время начала окна, соответствующего результату агрегации
FROM
    ydb_source.input_topic
WITH (
    FORMAT = json_each_row,
    SCHEMA = (
        Time String NOT NULL,
        Level String NOT NULL,
        Host String NOT NULL
    )
)
WHERE
    Level = "error"
GROUP BY
    HOP(CAST(Time AS Timestamp), "PT600S", "PT600S", "PT0S"),  -- Число ошибок на неперекрывающихся окнах длиной 10 минут
    Host;

INSERT INTO
    ydb_source.output_topic
SELECT
    ToBytes(Unwrap(Yson::SerializeJson(Yson::From(TableRow()))))  -- Сериализация всех колонок в JSON
FROM
    $number_errors;

END DO

Подробнее:

Шаг 4. Просмотр состояния запроса

Проверьте состояние запроса через системную таблицу streaming_queries:

SELECT
    Path,
    Status,
    Issues,
    Run
FROM
    `.sys/streaming_queries`

Убедитесь, что в поле Status значение RUNNING. В противном случае проверьте поле Issues.

Если запрос находится в статусе SUSPENDED или в поле Issues есть ошибки, обратитесь к разделу диагностика ошибок.

Шаг 5. Заполнение входного топика данными

Запишите тестовые сообщения в топик с помощью YDB CLI:

echo '{"Time": "2025-01-01T00:00:00.000000Z", "Level": "error", "Host": "host-1"}' | ./ydb --profile quickstart topic write input_topic
echo '{"Time": "2025-01-01T00:04:00.000000Z", "Level": "error", "Host": "host-2"}' | ./ydb --profile quickstart topic write input_topic
echo '{"Time": "2025-01-01T00:08:00.000000Z", "Level": "error", "Host": "host-1"}' | ./ydb --profile quickstart topic write input_topic
echo '{"Time": "2025-01-01T00:12:00.000000Z", "Level": "error", "Host": "host-2"}' | ./ydb --profile quickstart topic write input_topic
echo '{"Time": "2025-01-01T00:12:00.000000Z", "Level": "error", "Host": "host-1"}' | ./ydb --profile quickstart topic write input_topic

Результат появится в выходном топике после закрытия 10-минутного окна агрегации.

Шаг 6. Проверка содержимого выходного топика

Прочитайте данные из выходного топика:

./ydb --profile quickstart topic read output_topic --partition-ids 0 --start-offset 0 --limit 10 --format newline-delimited

Ожидаемый результат:

{"ErrorCount":1,"Host":"host-2","Ts":"2025-01-01T00:00:00Z"}
{"ErrorCount":2,"Host":"host-1","Ts":"2025-01-01T00:00:00Z"}

Шаг 7. Удаление запроса

Удалите запрос с помощью DROP STREAMING QUERY:

DROP STREAMING QUERY query_example;

Что дальше

См. также