Обогащение данных
Обогащение данных — добавление к событиям из потока дополнительной информации из справочника. Например, событие содержит только идентификатор, а справочник позволяет добавить к нему название или другие атрибуты. В качестве справочника можно использовать данные из локальной таблицы или из объектного хранилища S3.
В потоковых запросах справочник подключают с помощью конструкции JOIN. Поток должен быть слева, справочник — справа.
Важно
Справочник полностью загружается в память при запуске запроса. Если данные в справочнике изменились, для получения актуальной версии справочника необходимо перезапустить запрос — удалить его с помощью DROP STREAMING QUERY и создать заново с помощью CREATE STREAMING QUERY.
Возможно обогащение данных из локальных и внешних топиков.
В примерах ниже:
ext_source— заранее созданный внешний источник данных для топиков в другой базе YDB;input_topicиoutput_topic— топики в текущей или внешней базе YDB
Потоковые запросы для обогащения данных
Запросы в примерах ниже читают события из входного топика, присоединяют к каждому событию название сервиса из справочника по ServiceId и записывают результат в выходной топик.
Подробнее об использованных в запросах функциях:
Обогащение из локальной таблицы
В данном примере справочник хранится в таблице services_dict в текущей базе данных.
Создайте потоковый запрос, выполняющий обогащение:
CREATE STREAMING QUERY query_with_table_join AS
DO BEGIN
-- Чтение событий из входного топика
$topic_data = SELECT
*
FROM
ext_source.input_topic -- или локальный топик input_topic
WITH (
FORMAT = json_each_row,
SCHEMA = (
Time String NOT NULL,
ServiceId Uint32 NOT NULL,
Message String NOT NULL
)
);
-- Присоединение справочника к потоку по ServiceId
$joined_data = SELECT
s.Name AS Name,
t.*
FROM
$topic_data AS t
LEFT JOIN
services_dict AS s
ON
t.ServiceId = s.ServiceId;
-- Запись в выходной топик (JSON)
INSERT INTO
output_topic -- или внешний топик ext_source.output_topic
SELECT
ToBytes(Unwrap(Yson::SerializeJson(Yson::From(TableRow()))))
FROM
$joined_data;
END DO
Обогащение из S3
Справочник хранится в S3 и подключается через внешний источник данных.
Создайте дополнительный внешний источник данных для чтения справочника из S3:
-- Источник данных S3 для чтения справочника
CREATE EXTERNAL DATA SOURCE s3_source WITH (
SOURCE_TYPE = "ObjectStorage",
LOCATION = "<s3_endpoint>",
AUTH_METHOD = "NONE"
);
Где:
<s3_endpoint>— URL S3-хранилища, напримерhttps://storage.yandexcloud.net/<bucket>/для Yandex Cloud.
Создайте потоковый запрос, выполняющий обогащение:
CREATE STREAMING QUERY query_with_join AS
DO BEGIN
-- Чтение событий из входного топика
$topic_data = SELECT
*
FROM
input_topic -- или внешний топик ext_source.input_topic
WITH (
FORMAT = json_each_row,
SCHEMA = (
Time String NOT NULL,
ServiceId Uint32 NOT NULL,
Message String NOT NULL
)
);
-- Чтение справочника сервисов из S3
$s3_data = SELECT
*
FROM
s3_source.`file.csv`
WITH (
FORMAT = csv_with_names,
SCHEMA = (
ServiceId Uint32,
Name Utf8
)
);
-- Присоединение справочника к потоку по ServiceId
$joined_data = SELECT
s.Name AS Name,
t.*
FROM
$topic_data AS t
LEFT JOIN
$s3_data AS s
ON
t.ServiceId = s.ServiceId;
-- Запись результата в выходной топик в формате JSON
INSERT INTO
ext_source.output_topic -- или локальный топик output_topic
SELECT
ToBytes(Unwrap(Yson::SerializeJson(Yson::From(TableRow()))))
FROM
$joined_data;
END DO
Подробнее о форматах данных (json_each_row, csv_with_names и др.): Форматы данных при чтении/записи из топиков.