ALTER TRANSFER

The ALTER TRANSFER statement modifies the parameters and state of a transfer instance.

Syntax

ALTER TRANSFER <name> [SET USING lambda | SET (option = value [, ...])]

where:

  • name — the name of the transfer instance.
  • lambda — the lambda-function for message transformation.
  • SET (option = value [, ...]) — the transfer parameters.

Parameters

  • STATE — the transfer state. Possible values:

  • PAUSED — pauses the transfer.

  • ACTIVE — resumes a paused transfer.

  • Table write batching parameters let you balance the latency of records appearing in the table against the resources required by the transfer. Batching parameters affect the processing of each topic partition independently. Change batching parameters with caution, as this can either improve or degrade message stream processing speed, and may even lead to a denial of service if the parameters are misconfigured. For example, writing to the table in small batches can overload the table and degrade its performance, while an excessively large batch size can cause the server to run out of available memory.

    • BATCH_SIZE_BYTES — the batch size in bytes. Default: 8 MB.
    • FLUSH_INTERVAL — the table write interval. Default: 60 seconds. Data is written to the table at this interval, even if the batch has not reached the size specified in the BATCH_SIZE_BYTES parameter.

Permissions

Modifying a transfer requires the ALTER SCHEMA permissions.

Examples

The following query modifies the message transformation lambda-function:

$new_lambda = ($msg) -> {
    return [
        <|
            partition: $msg._partition,
            offset: $msg._offset,
            message: CAST($msg._data || ' altered' AS Utf8)
        |>
    ];
};

ALTER TRANSFER my_transfer SET USING $new_lambda;

The following query pauses the transfer:

ALTER TRANSFER my_transfer SET (STATE = "PAUSED");

The following query modifies the batching parameters:

ALTER TRANSFER my_transfer SET (
    BATCH_SIZE_BYTES = 1048576,
    FLUSH_INTERVAL = Interval('PT60S')
);

Lambda function

A message transformation lambda function takes a single structured parameter containing the message from the topic and returns a list of structures corresponding to the table rows for insertion.

Example:

$lambda = ($msg) -> {
  return [
    <|
      column_1: $msg._create_timestamp,
      column_2: $msg._data
    |>
  ];
};

In this example:

  • $msg — the message received from the topic.
  • column_1 and column_2 — the names of the table columns.
  • $msg._create_timestamp and $msg._data — the values that will be written to the table. The value types must match the table column types. For example, if the column_2 table column has the String type, the type of $msg._data must also be String.

The following fields are available in a topic message:

Attribute Value type Description
_create_timestamp Timestamp Message creation time
_data String Message body
_offset Uint64 Message offset
_partition Uint32 Message's partition number
_producer_id String Producer ID
_seq_no Uint64 Message sequence number
_write_timestamp Timestamp Message write time

Testing lambda functions

To test a lambda function during development, you can simulate a topic message by passing a structure with the same fields that the transfer will provide. Example:

$lambda = ($msg) -> {
  return [
    <|
      offset: $msg._offset,
      data: $msg._data
    |>
  ];
};

$msg = <|
  _data: "value",
  _offset: CAST(1 AS Uint64),
  _partition: CAST(2 AS Uint32),
  _producer_id: "producer",
  _seq_no: CAST(3 AS Uint64)
|>;

SELECT $lambda($msg);

If a lambda function contains complex transformation logic, you can extract it into a separate lambda function to simplify testing.

$extract_value = ($data) -> {
  -- complex transformations
  return $data;
};

$lambda = ($msg) -> {
  return [
    <|
      column: $extract_value($msg._data)
    |>
  ];
};

-- You can test the extract_value lambda function like this

SELECT $extract_value('converted value');

See Also

Previous