Обогащение данных
Обогащение данных — добавление к событиям из потока дополнительной информации из справочника. Например, событие содержит только идентификатор, а справочник позволяет добавить к нему название или другие атрибуты. В качестве справочника можно использовать данные из локальной таблицы или из объектного хранилища S3.
В потоковых запросах справочник подключают с помощью конструкции JOIN. Поток должен быть слева, справочник — справа.
Важно
Справочник полностью загружается в память при запуске запроса. Если данные в справочнике изменились, для получения актуальной версии справочника необходимо перезапустить запрос — удалить его с помощью DROP STREAMING QUERY и создать заново с помощью CREATE STREAMING QUERY.
Подготовка источника данных для работы с топиками
Создайте внешний источник данных для работы с топиками. Для хранения токена используется секрет, источник создаётся через CREATE EXTERNAL DATA SOURCE.
-- Секрет с токеном для подключения к YDB
CREATE SECRET `secrets/ydb_token` WITH (value = "<ydb_token>");
-- Источник данных YDB для чтения/записи топиков
CREATE EXTERNAL DATA SOURCE ydb_source WITH (
SOURCE_TYPE = "Ydb",
LOCATION = "<ydb_endpoint>",
DATABASE_NAME = "<db_name>",
AUTH_METHOD = "TOKEN",
TOKEN_SECRET_PATH = "secrets/ydb_token"
);
Где:
<ydb_endpoint>— эндпоинт YDB, напримерgrpcs://<ydb_host>:2135.<db_name>— путь к базе данных YDB, например/Root/database.
Потоковые запросы для обогащения данных
Запросы в примерах ниже читают события из входного топика, присоединяют к каждому событию название сервиса из справочника по ServiceId и записывают результат в выходной топик.
Подробнее об использованных в запросах функциях:
Обогащение из локальной таблицы
В данном примере справочник хранится в таблице services_dict в текущей базе данных.
Создайте потоковый запрос, выполняющий обогащение:
CREATE STREAMING QUERY query_with_table_join AS
DO BEGIN
-- Чтение событий из входного топика
$topic_data = SELECT
*
FROM
ydb_source.input_topic
WITH (
FORMAT = json_each_row,
SCHEMA = (
Time String NOT NULL,
ServiceId Uint32 NOT NULL,
Message String NOT NULL
)
);
-- Присоединение справочника к потоку по ServiceId
$joined_data = SELECT
s.Name AS Name,
t.*
FROM
$topic_data AS t
LEFT JOIN
services_dict AS s
ON
t.ServiceId = s.ServiceId;
-- Запись в выходной топик (JSON)
INSERT INTO
ydb_source.output_topic
SELECT
ToBytes(Unwrap(Yson::SerializeJson(Yson::From(TableRow()))))
FROM
$joined_data;
END DO
Обогащение из S3
Справочник хранится в S3 и подключается через внешний источник данных.
Создайте дополнительный внешний источник данных для чтения справочника из S3:
-- Источник данных S3 для чтения справочника
CREATE EXTERNAL DATA SOURCE s3_source WITH (
SOURCE_TYPE = "ObjectStorage",
LOCATION = "<s3_endpoint>",
AUTH_METHOD = "NONE"
);
Где:
<s3_endpoint>— URL S3-хранилища, напримерhttps://storage.yandexcloud.net/<bucket>/для Yandex Cloud.
Создайте потоковый запрос, выполняющий обогащение:
CREATE STREAMING QUERY query_with_join AS
DO BEGIN
-- Чтение событий из входного топика
$topic_data = SELECT
*
FROM
ydb_source.input_topic
WITH (
FORMAT = json_each_row,
SCHEMA = (
Time String NOT NULL,
ServiceId Uint32 NOT NULL,
Message String NOT NULL
)
);
-- Чтение справочника сервисов из S3
$s3_data = SELECT
*
FROM
s3_source.`file.csv`
WITH (
FORMAT = csv_with_names,
SCHEMA = (
ServiceId Uint32,
Name Utf8
)
);
-- Присоединение справочника к потоку по ServiceId
$joined_data = SELECT
s.Name AS Name,
t.*
FROM
$topic_data AS t
LEFT JOIN
$s3_data AS s
ON
t.ServiceId = s.ServiceId;
-- Запись результата в выходной топик в формате JSON
INSERT INTO
ydb_source.output_topic
SELECT
ToBytes(Unwrap(Yson::SerializeJson(Yson::From(TableRow()))))
FROM
$joined_data;
END DO
Подробнее о форматах данных (json_each_row, csv_with_names и др.): Форматы данных при чтении/записи из топиков.