CREATE TRANSFER
Создает трансфер из топика в таблицу.
Синтаксис:
CREATE TRANSFER transfer_name
FROM topic_name TO table_name USING lambda
WITH (option = value[, ...])
-
transfer_name
— имя создаваемого трансфера. -
topic_name
— имя топика, содержащего исходные сообщения для последующего преобразования и записи в таблицу. -
table_name
— имя таблицы, в которую будут записываться данные. -
lambda
— lambda-функция преобразования сообщений. -
option
— опция команды:-
CONNECTION_STRING
— строка соединения с базой данных, содержащей топик. Указывается только если топик находится в другой базе YDB. -
Настройки для аутентификации в базе топика одним из способов (обязательно, если топик находится в другой базе):
-
С помощью токена:
TOKEN_SECRET_NAME
— имя секрета, содержащего токен.
-
С помощью логина и пароля:
USER
— имя пользователя.PASSWORD_SECRET_NAME
— имя секрета, содержащего пароль.
-
-
CONSUMER
— имя читателя топика-источника. Если имя задано, то в топике уже должен существовать читатель с указанным именем, и трансфер начнёт обрабатывать сообщения, начиная с первого незакоммиченного сообщения в топике. Если имя не задано, то читатель будет добавлен в топик автоматически, и трансфер начнёт обрабатывать сообщения, начиная с первого хранящегося сообщения в топике. Имя автоматически созданного читателя можно получить из описания экземпляра трансфера. -
Параметры батчевания записи в таблицу позволяют настроить баланс между задержкой появления записей в таблице и ресурсами, требуемыми для работы трансфера. Параметры батчевания влияют на обработку каждой партиции топика независимо. К изменению параметров батчевания нужно подходить с осторожностью, так как их изменение может как улучшить скорость обработки потока сообщений, так и ухудшить её, и даже привести к отказу в обслуживании, если параметры будут подобраны неверно. Например, запись в таблицу маленькими по размеру батчами может привести к перегрузке таблицы и деградации скорости работы с ней, а слишком большой размер батча — к тому, что на сервере закончится вся доступная память.
BATCH_SIZE_BYTES
— размер батча в байтах. По умолчанию — 8 МБ.FLUSH_INTERVAL
— периодичность записи в таблицу. По умолчанию — 60 секунд. Запись в таблицу будет осуществлена, даже если батч не достиг размера, заданного в параметреBATCH_SIZE_BYTES
.
Скопировано
-
Разрешения
Для создания трансфера требуются следующие права:
CREATE TABLE
— для создания экземпляра трансфера;ALTER SCHEMA
— для автоматического создания читателя топика (если применимо);SELECT ROW
— для чтения сообщений из топика, содержащего исходные сообщения;UPDATE ROW
— для обновления строк в таблице, в которую будут записываться данные.
Примеры
Создание экземпляра трансфера из топика example_topic
в таблицу example_table
текущей базы данных:
CREATE TABLE example_table (
partition Uint32 NOT NULL,
offset Uint64 NOT NULL,
message Utf8,
PRIMARY KEY (partition, offset)
);
CREATE TOPIC example_topic;
$transformation_lambda = ($msg) -> {
return [
<|
partition: $msg._partition,
offset: $msg._offset,
message: CAST($msg._data AS Utf8)
|>
];
};
CREATE TRANSFER example_transfer
FROM example_topic TO example_table USING $transformation_lambda;
Создание экземпляра трансфера из топика example_topic
базы данных /Root/another_database
в таблицу example_table
текущей базы данных. Перед созданием трансфера необходимо в текущей базе создать таблицу в которую будут записываться данные; в базе данных /Root/another_database
создать топик, из которого будут обрабатываться сообщения:
Совет
Перед выполнением операции создайте секрет с аутентификационными данными для подключения или убедитесь в его существовании и наличии доступа к нему.
$transformation_lambda = ($msg) -> {
return [
<|
partition: $msg._partition,
offset: $msg._offset,
message: CAST($msg._data AS Utf8)
|>
];
};
CREATE TRANSFER example_transfer
FROM example_topic TO example_table USING $transformation_lambda
WITH (
CONNECTION_STRING = 'grpcs://example.com:2135/?database=/Root/another_database',
TOKEN_SECRET_NAME = 'my_secret'
);
Создание экземпляра трансфера с явным указанием имени консьюмера existing_consumer_of_topic
:
$transformation_lambda = ($msg) -> {
return [
<|
partition: $msg._partition,
offset: $msg._offset,
message: CAST($msg._data AS Utf8)
|>
];
};
CREATE TRANSFER example_transfer
FROM example_topic TO example_table USING $transformation_lambda
WITH (
CONSUMER = 'existing_consumer_of_topic'
);
Пример обработки сообщения в формате JSON
// example message:
// {
// "update": {
// "operation":"value_1"
// },
// "key": [
// "id_1",
// "2019-01-01T15:30:00.000000Z"
// ]
// }
$transformation_lambda = ($msg) -> {
$json = CAST($msg._data AS JSON);
return [
<|
timestamp: CAST(Yson::ConvertToString($json.key[1]) AS Timestamp),
object_id: CAST(Yson::ConvertToString($json.key[0]) AS Utf8),
operation: CAST(Yson::ConvertToString($json.update.operation) AS Utf8)
|>
];
};
CREATE TRANSFER example_transfer
FROM example_topic TO example_table USING $transformation_lambda;
Создание экземпляра трансфера с явным указанием опции батчевания:
$transformation_lambda = ($msg) -> {
return [
<|
partition: $msg._partition,
offset: $msg._offset,
message: CAST($msg._data AS Utf8)
|>
];
};
CREATE TRANSFER example_transfer
FROM example_topic TO example_table USING $transformation_lambda
WITH (
BATCH_SIZE_BYTES = 1048576,
FLUSH_INTERVAL = Interval('PT60S')
);
lambda-функция
Lambda-функция преобразования сообщений принимает один параметр со структурой, содержащей сообщение из топика, и возвращает список структур, соответствующих строкам таблицы для вставки.
Пример:
$lambda = ($msg) -> {
return [
<|
column_1: $msg._create_timestamp,
column_2: $msg._data
|>
];
};
В этом примере:
$msg
— сообщение, полученное из топика.column_1
иcolumn_2
— названия колонок таблицы.$msg._create_timestamp
и$msg._data
— значения, которые будут записаны в таблицу. Типы значений должны совпадать с типами колонок таблицы. Например, еслиcolumn_2
имеет в таблице типString
, то и тип$msg._data
должен быть именноString
.
У сообщения топика доступны следующие поля:
Атрибут | Тип значения | Описание |
---|---|---|
_create_timestamp |
Timestamp |
Время создания сообщения |
_data |
String |
Тело сообщения |
_offset |
Uint64 |
Смещение сообщения |
_partition |
Uint32 |
Номер партиции сообщения |
_producer_id |
String |
Идентификатор писателя |
_seq_no |
Uint64 |
Порядковый номер сообщения |
_write_timestamp |
Timestamp |
Время записи сообщения |
Тестирование lambda-функций
Для тестирования lambda-функции при её разработке можно в качестве сообщения топика передавать структуру с такими же полями, как будут передаваться в трансфере. Пример:
$lambda = ($msg) -> {
return [
<|
offset: $msg._offset,
data: $msg._data
|>
];
};
$msg = <|
_data: "value",
_offset: CAST(1 AS Uint64),
_partition: CAST(2 AS Uint32),
_producer_id: "producer",
_seq_no: CAST(3 AS Uint64)
|>;
SELECT $lambda($msg);
Если lambda-функция содержит сложную логику преобразования, то её можно выделить в отдельную lambda-функцию, что упростит тестирование.
$extract_value = ($data) -> {
-- сложные преобразования
return $data;
};
$lambda = ($msg) -> {
return [
<|
column: $extract_value($msg._data)
|>
];
};
-- Тестировать lambda-функцию extract_value можно так
SELECT $extract_value('преобразуемое значение');