Topic read and write formats

This section describes data formats for streaming queries supported in YDB when reading from topics, and supported YQL types for each format.

Supported formats

YDB supports built-in formats (predefined defaults) and custom formats that you configure for specific tasks.

Built-in formats:

Examples of parsing custom formats.

In the examples on this page, ydb_source is a pre-created external data source, input_topic and output_topic are topics available through it, and output_table is a YDB table.

Write formats

When writing to a topic, SELECT must return a single column of type String, Utf8, Json, or Yson. The column cannot be Optional.

Single-column write:

CREATE STREAMING QUERY write_string_example AS
DO BEGIN

    INSERT INTO ydb_source.output_topic
    SELECT
        CAST(Data AS String)
    FROM
        ydb_source.input_topic
    WITH (
        FORMAT = raw,
        SCHEMA = (
            Data String
        )
    );

END DO

To write multiple columns, serialize them to JSON:

CREATE STREAMING QUERY write_json_example AS
DO BEGIN

    INSERT INTO ydb_source.output_topic
    SELECT
        ToBytes(Unwrap(Yson::SerializeJson(Yson::From(TableRow()))))
    FROM
        ydb_source.input_topic
    WITH (
        FORMAT = json_each_row,
        SCHEMA = (
            Id Uint64 NOT NULL,
            Name Utf8 NOT NULL
        )
    );

END DO

See also: TableRow, Yson::From, Yson::SerializeJson, Unwrap, ToBytes.

Read formats

csv_with_names

Based on CSV. Values are comma-separated; the first line contains column names.

Example payload (one message):

Year,Manufacturer,Model,Price
1997,Man_1,Model_1,3000.00
1999,Man_2,Model_2,4900.00

Example query:

CREATE STREAMING QUERY csv_example AS
DO BEGIN

    UPSERT INTO output_table
    SELECT
        *
    FROM
        ydb_source.input_topic
    WITH (
        FORMAT = csv_with_names,
        SCHEMA = (
            Year Int32 NOT NULL,
            Manufacturer String NOT NULL,
            Model String NOT NULL,
            Price Double NOT NULL
        )
    );

END DO

tsv_with_names

Based on TSV. Values are tab-separated (0x9); the first line contains column names.

Example payload (one message):

Year    Manufacturer    Model   Price
1997    Man_1   Model_1    3000.00
1999    Man_2   Model_2    4900.00

Example query:

CREATE STREAMING QUERY tsv_example AS
DO BEGIN

    UPSERT INTO output_table
    SELECT
        *
    FROM
        ydb_source.input_topic
    WITH (
        FORMAT = tsv_with_names,
        SCHEMA = (
            Year Int32 NOT NULL,
            Manufacturer String NOT NULL,
            Model String NOT NULL,
            Price Double NOT NULL
        )
    );

END DO

json_list

Based on JSON. Each message must be a JSON array of objects.

Valid example:

[
    { "Year": 1997, "Manufacturer": "Man_1", "Model": "Model_1", "Price": 3000.0 },
    { "Year": 1999, "Manufacturer": "Man_2", "Model": "Model_2", "Price": 4900.00 }
]

Invalid example (separate objects per line, not wrapped in an array):

{ "Year": 1997, "Manufacturer": "Man_1", "Model": "Model_1", "Price": 3000.0 }
{ "Year": 1999, "Manufacturer": "Man_2", "Model": "Model_2", "Price": 4900.00 }

Example query:

CREATE STREAMING QUERY json_list_example AS
DO BEGIN

    UPSERT INTO output_table
    SELECT
        *
    FROM
        ydb_source.input_topic
    WITH (
        FORMAT = json_list,
        SCHEMA = (
            Year Int32 NOT NULL,
            Manufacturer String NOT NULL,
            Model String NOT NULL,
            Price Double NOT NULL
        )
    );

END DO

json_each_row

Based on JSON. Each message must be a single JSON object. Common for systems like Apache Kafka or YDB topics.

Multiple separate JSON objects in one message are not supported; a JSON array is also not supported.

Valid example (one message):

{ "Year": 1997, "Manufacturer": "Man_1", "Model": "Model_1", "Price": 3000.0 }

Invalid example (two objects in one message):

{ "Year": 1997, "Manufacturer": "Man_1", "Model": "Model_1", "Price": 3000.0 }
{ "Year": 1999, "Manufacturer": "Man_2", "Model": "Model_2", "Price": 4900.00 }

Example query:

CREATE STREAMING QUERY json_each_row_example AS
DO BEGIN

    UPSERT INTO output_table
    SELECT
        *
    FROM
        ydb_source.input_topic
    WITH (
        FORMAT = json_each_row,
        SCHEMA = (
            Year Int32 NOT NULL,
            Manufacturer Utf8 NOT NULL,
            Model Utf8 NOT NULL,
            Price Double NOT NULL
        )
    );

END DO

json_as_string

Based on JSON.

Each message may contain:

  • One JSON object per line in valid JSON representation;
  • Or objects combined into a list.

json_as_string does not split the JSON document into fields; each message is one JSON object (or one line). Useful when the set of fields varies between rows.

Valid examples:

{ "Year": 1997, "Attrs": { "Manufacturer": "Man_1", "Model": "Model_1" }, "Price": 3000.0 }
{ "Year": 1999, "Attrs": { "Manufacturer": "Man_2", "Model": "Model_2" }, "Price": 4900.00 }

In this format the schema must be a single column of an allowed type — see below.

Example query:

CREATE STREAMING QUERY json_as_string_example AS
DO BEGIN

    INSERT INTO ydb_source.output_topic
    SELECT
        *
    FROM
        ydb_source.input_topic
    WITH (
        FORMAT = json_as_string,
        SCHEMA = (
            Data Json
        )
    );

END DO

parquet

Reads message payloads in Apache Parquet format.

Example query:

CREATE STREAMING QUERY parquet_example AS
DO BEGIN

    UPSERT INTO output_table
    SELECT
        *
    FROM
        ydb_source.input_topic
    WITH (
        FORMAT = parquet,
        SCHEMA = (
            Year Int32 NOT NULL,
            Manufacturer Utf8 NOT NULL,
            Model Utf8 NOT NULL,
            Price Double NOT NULL
        )
    );

END DO

raw

Reads message payloads as raw bytes. Default schema: SCHEMA(Data String).

Example query:

CREATE STREAMING QUERY raw_example AS
DO BEGIN

    INSERT INTO ydb_source.output_topic
    SELECT
        *
    FROM
        ydb_source.input_topic
    WITH (
        FORMAT = raw,
        SCHEMA = (
            Data String
        )
    );

END DO

Supported schema types

Type csv_with_names tsv_with_names json_list json_each_row json_as_string parquet raw
Int8, Int16, Int32, Int64,
Uint8, Uint16, Uint32, Uint64,
Float, Double
Bool
DyNumber
String, Utf8
Json
JsonDocument
Yson
Uuid
Date, Datetime, Timestamp,
TzDate, TzDatetime, TzTimestamp
Interval
Date32, Datetime64, Timestamp64,
Interval64,
TzDate32, TzDatetime64, TzTimestamp64
Optional<T>

Parsing custom formats

JSON with built-in functions

Sample data:

{"key": 1997, "value": "42"}

Example query:

CREATE STREAMING QUERY json_builtins_example AS
DO BEGIN

    INSERT INTO ydb_source.output_topic
    SELECT
        JSON_VALUE(Data, "$.key") AS Key,
        JSON_VALUE(Data, "$.value") AS Value
    FROM
        ydb_source.input_topic
    WITH (
        FORMAT = raw,
        SCHEMA = (
            Data Json
        )
    );

END DO

See JSON_VALUE.

JSON with Yson

Sample data (Change Data Capture style):

{"update":{"volume":10,"product":"bWlsaw=="},"key":[6],"ts":[1765192622420,18446744073709551615]}

Example query:

CREATE STREAMING QUERY json_yson_example AS
DO BEGIN

    $input = SELECT
        Yson::ConvertTo(
            Data, Struct<
                update: Struct<volume: Uint64>,
                key: List<Uint64>,
                ts: List<Uint64>
            >
        )
    FROM
        ydb_source.input_topic
    WITH (
        FORMAT = json_as_string,
        SCHEMA = (
            Data Json
        )
    );

    INSERT INTO ydb_source.output_topic
    SELECT
        ts[0] AS Ts,
        update.volume AS Volume
    FROM
        $input
    FLATTEN COLUMNS;

END DO

See also:

DSV (TSKV)

Sample data:

name=Elena  uid=95792365232151958
name=Denis  uid=78086244452810046
name=Mikhail    uid=70609792906901286

Example query:

CREATE STREAMING QUERY dsv_example AS
DO BEGIN

    $input = SELECT
        Dsv::Parse(Data, "\t") AS Data
    FROM
        ydb_source.input_topic
    FLATTEN LIST BY (
        String::SplitToList(Data, "\n", TRUE AS SkipEmpty) AS Data
    );

    INSERT INTO ydb_source.output_topic
    SELECT
        DictLookup(Data, "name") AS Name,
        DictLookup(Data, "uid") AS Uid
    FROM
        $input;

END DO

See also: