Обогащение данных (S3)

Обогащение данных — добавление к событиям из потока дополнительной информации из справочника. Например, событие содержит только идентификатор, а справочник позволяет добавить к нему название или другие атрибуты.

В потоковых запросах можно присоединить к потоку данные, хранимые в S3, с помощью конструкции JOIN. Поток должен быть слева, справочник из S3 — справа.

Важно

Справочник полностью загружается в память при запуске запроса. Если данные в S3 изменились, для получения актуальной версии справочника необходимо перезапустить запрос — удалить его с помощью DROP STREAMING QUERY и создать заново с помощью CREATE STREAMING QUERY.

Справочник хранится в S3 и подключается через внешний источник данных.

Подготовка источников данных

Создаём два внешних источника: один для YDB (топики), другой для S3 (справочник). Для хранения токена используется секрет, источники создаются через CREATE EXTERNAL DATA SOURCE.

-- Секрет с токеном для подключения к YDB
CREATE SECRET `secrets/ydb_token` WITH (value = "<ydb_token>");

-- Источник данных YDB для чтения/записи топиков
CREATE EXTERNAL DATA SOURCE ydb_source WITH (
    SOURCE_TYPE = "Ydb",
    LOCATION = "<ydb_endpoint>",
    DATABASE_NAME = "<db_name>",
    AUTH_METHOD = "TOKEN",
    TOKEN_SECRET_NAME = "secrets/ydb_token"
);

-- Источник данных S3 для чтения справочника
CREATE EXTERNAL DATA SOURCE s3_source WITH (
    SOURCE_TYPE = "ObjectStorage",
    LOCATION = "<s3_endpoint>",
    AUTH_METHOD = "NONE"
)

Где:

  • <ydb_endpoint> — эндпоинт YDB, например grpcs://<ydb_host>:2135 для Yandex Cloud.
  • <db_name> — путь к базе данных YDB, например /Root/database.
  • <s3_endpoint> — URL S3-хранилища, например https://storage.yandexcloud.net/<bucket> для Yandex Cloud.

Создание потокового запроса

Запрос читает события из входного топика, присоединяет к каждому событию название сервиса из справочника по ServiceId и записывает результат в выходной топик. Запрос создаётся с помощью CREATE STREAMING QUERY.

Подробнее о форматах данных (json_each_row, csv_with_names и др.): Форматы данных при чтении/записи из топиков.

CREATE STREAMING QUERY query_with_join AS
DO BEGIN

-- Чтение событий из входного топика
$topic_data = SELECT
    *
FROM
    ydb_source.input_topic
WITH (
    FORMAT = json_each_row,
    SCHEMA = (
        Time String NOT NULL,
        ServiceId Uint32 NOT NULL,
        Message String NOT NULL
    )
);

-- Чтение справочника сервисов из S3
$s3_data = SELECT
    *
FROM
    s3_source.`file.csv`
WITH (
    FORMAT = csv_with_names,
    SCHEMA = (
        ServiceId Uint32,
        Name Utf8
    )
);

-- Присоединение справочника к потоку по ServiceId
$joined_data = SELECT
    s.Name AS Name,
    t.*
FROM
    $topic_data AS t
LEFT JOIN
    $s3_data AS s
ON
    t.ServiceId = s.ServiceId;

-- Запись результата в выходной топик в формате JSON
INSERT INTO
    ydb_source.output_topic
SELECT
    ToBytes(Unwrap(Yson::SerializeJson(Yson::From(TableRow()))))
FROM
    $joined_data;

END DO

Подробнее о функциях: