Обогащение данных

Обогащение данных — добавление к событиям из потока дополнительной информации из справочника. Например, событие содержит только идентификатор, а справочник позволяет добавить к нему название или другие атрибуты. В качестве справочника можно использовать данные из локальной таблицы или из объектного хранилища S3.

В потоковых запросах справочник подключают с помощью конструкции JOIN. Поток должен быть слева, справочник — справа.

Важно

Справочник полностью загружается в память при запуске запроса. Если данные в справочнике изменились, для получения актуальной версии справочника необходимо перезапустить запрос — удалить его с помощью DROP STREAMING QUERY и создать заново с помощью CREATE STREAMING QUERY.

Подготовка источника данных для работы с топиками

Создайте внешний источник данных для работы с топиками. Для хранения токена используется секрет, источник создаётся через CREATE EXTERNAL DATA SOURCE.

-- Секрет с токеном для подключения к YDB
CREATE SECRET `secrets/ydb_token` WITH (value = "<ydb_token>");

-- Источник данных YDB для чтения/записи топиков
CREATE EXTERNAL DATA SOURCE ydb_source WITH (
    SOURCE_TYPE = "Ydb",
    LOCATION = "<ydb_endpoint>",
    DATABASE_NAME = "<db_name>",
    AUTH_METHOD = "TOKEN",
    TOKEN_SECRET_PATH = "secrets/ydb_token"
);

Где:

  • <ydb_endpoint> — эндпоинт YDB, например grpcs://<ydb_host>:2135.
  • <db_name> — путь к базе данных YDB, например /Root/database.

Потоковые запросы для обогащения данных

Запросы в примерах ниже читают события из входного топика, присоединяют к каждому событию название сервиса из справочника по ServiceId и записывают результат в выходной топик.

Подробнее об использованных в запросах функциях:

Обогащение из локальной таблицы

В данном примере справочник хранится в таблице services_dict в текущей базе данных.

Создайте потоковый запрос, выполняющий обогащение:

CREATE STREAMING QUERY query_with_table_join AS
DO BEGIN

-- Чтение событий из входного топика
$topic_data = SELECT
    *
FROM
    ydb_source.input_topic
WITH (
    FORMAT = json_each_row,
    SCHEMA = (
        Time String NOT NULL,
        ServiceId Uint32 NOT NULL,
        Message String NOT NULL
    )
);

-- Присоединение справочника к потоку по ServiceId
$joined_data = SELECT
    s.Name AS Name,
    t.*
FROM
    $topic_data AS t
LEFT JOIN
    services_dict AS s
ON
    t.ServiceId = s.ServiceId;

-- Запись в выходной топик (JSON)
INSERT INTO
    ydb_source.output_topic
SELECT
    ToBytes(Unwrap(Yson::SerializeJson(Yson::From(TableRow()))))
FROM
    $joined_data;

END DO

Обогащение из S3

Справочник хранится в S3 и подключается через внешний источник данных.

Создайте дополнительный внешний источник данных для чтения справочника из S3:

-- Источник данных S3 для чтения справочника
CREATE EXTERNAL DATA SOURCE s3_source WITH (
    SOURCE_TYPE = "ObjectStorage",
    LOCATION = "<s3_endpoint>",
    AUTH_METHOD = "NONE"
);

Где:

  • <s3_endpoint> — URL S3-хранилища, например https://storage.yandexcloud.net/<bucket>/ для Yandex Cloud.

Создайте потоковый запрос, выполняющий обогащение:

CREATE STREAMING QUERY query_with_join AS
DO BEGIN

-- Чтение событий из входного топика
$topic_data = SELECT
    *
FROM
    ydb_source.input_topic
WITH (
    FORMAT = json_each_row,
    SCHEMA = (
        Time String NOT NULL,
        ServiceId Uint32 NOT NULL,
        Message String NOT NULL
    )
);

-- Чтение справочника сервисов из S3
$s3_data = SELECT
    *
FROM
    s3_source.`file.csv`
WITH (
    FORMAT = csv_with_names,
    SCHEMA = (
        ServiceId Uint32,
        Name Utf8
    )
);

-- Присоединение справочника к потоку по ServiceId
$joined_data = SELECT
    s.Name AS Name,
    t.*
FROM
    $topic_data AS t
LEFT JOIN
    $s3_data AS s
ON
    t.ServiceId = s.ServiceId;

-- Запись результата в выходной топик в формате JSON
INSERT INTO
    ydb_source.output_topic
SELECT
    ToBytes(Unwrap(Yson::SerializeJson(Yson::From(TableRow()))))
FROM
    $joined_data;

END DO

Подробнее о форматах данных (json_each_row, csv_with_names и др.): Форматы данных при чтении/записи из топиков.