ALTER TRANSFER

Вызов ALTER TRANSFER изменяет параметры и состояние экземпляра трансфера.

Синтаксис

ALTER TRANSFER <name> [SET USING lambda | SET (option = value [, ...])]

где:

  • name — имя экземпляра трансфера.
  • lambdalambda-функция преобразования сообщений.
  • SET (option = value [, ...])параметры трансфера.

Параметры

  • STATEсостояние трансфера. Возможные значения:

    • PAUSED — остановка трансфера.
    • ACTIVE — возобновление работы трансфера после приостановки.
  • Параметры батчевания записи в таблицу позволяют настроить баланс между задержкой появления записей в таблице и ресурсами, требуемыми для работы трансфера. Параметры батчевания влияют на обработку каждой партиции топика независимо. К изменению параметров батчевания нужно подходить с осторожностью, так как их изменение может как улучшить скорость обработки потока сообщений, так и ухудшить её, и даже привести к отказу в обслуживании, если параметры будут подобраны неверно. Например, запись в таблицу маленькими по размеру батчами может привести к перегрузке таблицы и деградации скорости работы с ней, а слишком большой размер батча — к тому, что на сервере закончится вся доступная память.

    • BATCH_SIZE_BYTES — размер батча в байтах. По умолчанию — 8 МБ.
    • FLUSH_INTERVAL — периодичность записи в таблицу. По умолчанию — 60 секунд. Запись в таблицу будет осуществлена, даже если батч не достиг размера, заданного в параметре BATCH_SIZE_BYTES.

Разрешения

Для изменения трансфера требуется право изменять схемные объекты (ALTER SCHEMA).

Примеры

Следующий запрос изменяет lambda-функцию преобразования сообщений топика:

$new_lambda = ($msg) -> {
    return [
        <|
            partition:$msg._partition,
            offset: $msg._offset,
            message: CAST($msg._data || ' altered' AS Utf8)
        |>
    ];
};

ALTER TRANSFER my_transfer SET USING $new_lambda;

Следующий запрос временно приостанавливает работу трансфера:

ALTER TRANSFER my_transfer SET (STATE = "PAUSED");

Следующий запрос изменяет параметры батчевания:

ALTER TRANSFER my_transfer SET (
    BATCH_SIZE_BYTES = 1048576,
    FLUSH_INTERVAL = Interval('PT60S')
);

lambda-функция

Lambda-функция преобразования сообщений принимает один параметр со структурой, содержащей сообщение из топика, и возвращает список структур, соответствующих строкам таблицы для вставки.

Пример:

$lambda = ($msg) -> {
  return [
    <|
      column_1: $msg._create_timestamp,
      column_2: $msg._data
    |>
  ];
};

В этом примере:

  • $msg — сообщение, полученное из топика.
  • column_1 и column_2 — названия колонок таблицы.
  • $msg._create_timestamp и $msg._data — значения, которые будут записаны в таблицу. Типы значений должны совпадать с типами колонок таблицы. Например, если column_2 имеет в таблице тип String, то и тип $msg._data должен быть именно String.

У сообщения топика доступны следующие поля:

Атрибут Тип значения Описание
_create_timestamp Timestamp Время создания сообщения
_data String Тело сообщения
_offset Uint64 Смещение сообщения
_partition Uint32 Номер партиции сообщения
_producer_id String Идентификатор писателя
_seq_no Uint64 Порядковый номер сообщения
_write_timestamp Timestamp Время записи сообщения

Тестирование lambda-функций

Для тестирования lambda-функции при её разработке можно в качестве сообщения топика передавать структуру с такими же полями, как будут передаваться в трансфере. Пример:

$lambda = ($msg) -> {
  return [
    <|
      offset: $msg._offset,
      data: $msg._data
    |>
  ];
};

$msg = <|
  _data: "value",
  _offset: CAST(1 AS Uint64),
  _partition: CAST(2 AS Uint32),
  _producer_id: "producer",
  _seq_no: CAST(3 AS Uint64)
|>;

SELECT $lambda($msg);

Если lambda-функция содержит сложную логику преобразования, то её можно выделить в отдельную lambda-функцию, что упростит тестирование.

$extract_value = ($data) -> {
  -- сложные преобразования
  return $data;
};

$lambda = ($msg) -> {
  return [
    <|
      column: $extract_value($msg._data)
    |>
  ];
};

-- Тестировать lambda-функцию extract_value можно так

SELECT $extract_value('преобразуемое значение');

См. также

Предыдущая
Следующая