ALTER TRANSFER
Вызов ALTER TRANSFER
изменяет параметры и состояние экземпляра трансфера.
Синтаксис
ALTER TRANSFER <name> [SET USING lambda | SET (option = value [, ...])]
где:
name
— имя экземпляра трансфера.lambda
— lambda-функция преобразования сообщений.SET (option = value [, ...])
— параметры трансфера.
Параметры
-
STATE
— состояние трансфера. Возможные значения:PAUSED
— остановка трансфера.ACTIVE
— возобновление работы трансфера после приостановки.
-
Параметры батчевания записи в таблицу позволяют настроить баланс между задержкой появления записей в таблице и ресурсами, требуемыми для работы трансфера. Параметры батчевания влияют на обработку каждой партиции топика независимо. К изменению параметров батчевания нужно подходить с осторожностью, так как их изменение может как улучшить скорость обработки потока сообщений, так и ухудшить её, и даже привести к отказу в обслуживании, если параметры будут подобраны неверно. Например, запись в таблицу маленькими по размеру батчами может привести к перегрузке таблицы и деградации скорости работы с ней, а слишком большой размер батча — к тому, что на сервере закончится вся доступная память.
BATCH_SIZE_BYTES
— размер батча в байтах. По умолчанию — 8 МБ.FLUSH_INTERVAL
— периодичность записи в таблицу. По умолчанию — 60 секунд. Запись в таблицу будет осуществлена, даже если батч не достиг размера, заданного в параметреBATCH_SIZE_BYTES
.
Скопировано
Разрешения
Для изменения трансфера требуется право изменять схемные объекты (ALTER SCHEMA
).
Примеры
Следующий запрос изменяет lambda-функцию преобразования сообщений топика:
$new_lambda = ($msg) -> {
return [
<|
partition:$msg._partition,
offset: $msg._offset,
message: CAST($msg._data || ' altered' AS Utf8)
|>
];
};
ALTER TRANSFER my_transfer SET USING $new_lambda;
Следующий запрос временно приостанавливает работу трансфера:
ALTER TRANSFER my_transfer SET (STATE = "PAUSED");
Следующий запрос изменяет параметры батчевания:
ALTER TRANSFER my_transfer SET (
BATCH_SIZE_BYTES = 1048576,
FLUSH_INTERVAL = Interval('PT60S')
);
lambda-функция
Lambda-функция преобразования сообщений принимает один параметр со структурой, содержащей сообщение из топика, и возвращает список структур, соответствующих строкам таблицы для вставки.
Пример:
$lambda = ($msg) -> {
return [
<|
column_1: $msg._create_timestamp,
column_2: $msg._data
|>
];
};
В этом примере:
$msg
— сообщение, полученное из топика.column_1
иcolumn_2
— названия колонок таблицы.$msg._create_timestamp
и$msg._data
— значения, которые будут записаны в таблицу. Типы значений должны совпадать с типами колонок таблицы. Например, еслиcolumn_2
имеет в таблице типString
, то и тип$msg._data
должен быть именноString
.
У сообщения топика доступны следующие поля:
Атрибут | Тип значения | Описание |
---|---|---|
_create_timestamp |
Timestamp |
Время создания сообщения |
_data |
String |
Тело сообщения |
_offset |
Uint64 |
Смещение сообщения |
_partition |
Uint32 |
Номер партиции сообщения |
_producer_id |
String |
Идентификатор писателя |
_seq_no |
Uint64 |
Порядковый номер сообщения |
_write_timestamp |
Timestamp |
Время записи сообщения |
Тестирование lambda-функций
Для тестирования lambda-функции при её разработке можно в качестве сообщения топика передавать структуру с такими же полями, как будут передаваться в трансфере. Пример:
$lambda = ($msg) -> {
return [
<|
offset: $msg._offset,
data: $msg._data
|>
];
};
$msg = <|
_data: "value",
_offset: CAST(1 AS Uint64),
_partition: CAST(2 AS Uint32),
_producer_id: "producer",
_seq_no: CAST(3 AS Uint64)
|>;
SELECT $lambda($msg);
Если lambda-функция содержит сложную логику преобразования, то её можно выделить в отдельную lambda-функцию, что упростит тестирование.
$extract_value = ($data) -> {
-- сложные преобразования
return $data;
};
$lambda = ($msg) -> {
return [
<|
column: $extract_value($msg._data)
|>
];
};
-- Тестировать lambda-функцию extract_value можно так
SELECT $extract_value('преобразуемое значение');