Topic read and write formats
This section describes data formats for streaming queries supported in YDB when reading from topics, and supported YQL types for each format.
Supported formats
YDB supports built-in formats (predefined defaults) and custom formats that you configure for specific tasks.
Built-in formats:
Examples of parsing custom formats.
In the examples on this page, ydb_source is a pre-created external data source, input_topic and output_topic are topics available through it, and output_table is a YDB table.
Write formats
When writing to a topic, SELECT must return a single column of type String, Utf8, Json, or Yson. The column cannot be Optional.
Single-column write:
CREATE STREAMING QUERY write_string_example AS
DO BEGIN
INSERT INTO ydb_source.output_topic
SELECT
CAST(Data AS String)
FROM
ydb_source.input_topic
WITH (
FORMAT = raw,
SCHEMA = (
Data String
)
);
END DO
To write multiple columns, serialize them to JSON:
CREATE STREAMING QUERY write_json_example AS
DO BEGIN
INSERT INTO ydb_source.output_topic
SELECT
ToBytes(Unwrap(Yson::SerializeJson(Yson::From(TableRow()))))
FROM
ydb_source.input_topic
WITH (
FORMAT = json_each_row,
SCHEMA = (
Id Uint64 NOT NULL,
Name Utf8 NOT NULL
)
);
END DO
See also: TableRow, Yson::From, Yson::SerializeJson, Unwrap, ToBytes.
Read formats
csv_with_names
Based on CSV. Values are comma-separated; the first line contains column names.
Example payload (one message):
Year,Manufacturer,Model,Price
1997,Man_1,Model_1,3000.00
1999,Man_2,Model_2,4900.00
Example query:
CREATE STREAMING QUERY csv_example AS
DO BEGIN
UPSERT INTO output_table
SELECT
*
FROM
ydb_source.input_topic
WITH (
FORMAT = csv_with_names,
SCHEMA = (
Year Int32 NOT NULL,
Manufacturer String NOT NULL,
Model String NOT NULL,
Price Double NOT NULL
)
);
END DO
tsv_with_names
Based on TSV. Values are tab-separated (0x9); the first line contains column names.
Example payload (one message):
Year Manufacturer Model Price
1997 Man_1 Model_1 3000.00
1999 Man_2 Model_2 4900.00
Example query:
CREATE STREAMING QUERY tsv_example AS
DO BEGIN
UPSERT INTO output_table
SELECT
*
FROM
ydb_source.input_topic
WITH (
FORMAT = tsv_with_names,
SCHEMA = (
Year Int32 NOT NULL,
Manufacturer String NOT NULL,
Model String NOT NULL,
Price Double NOT NULL
)
);
END DO
json_list
Based on JSON. Each message must be a JSON array of objects.
Valid example:
[
{ "Year": 1997, "Manufacturer": "Man_1", "Model": "Model_1", "Price": 3000.0 },
{ "Year": 1999, "Manufacturer": "Man_2", "Model": "Model_2", "Price": 4900.00 }
]
Invalid example (separate objects per line, not wrapped in an array):
{ "Year": 1997, "Manufacturer": "Man_1", "Model": "Model_1", "Price": 3000.0 }
{ "Year": 1999, "Manufacturer": "Man_2", "Model": "Model_2", "Price": 4900.00 }
Example query:
CREATE STREAMING QUERY json_list_example AS
DO BEGIN
UPSERT INTO output_table
SELECT
*
FROM
ydb_source.input_topic
WITH (
FORMAT = json_list,
SCHEMA = (
Year Int32 NOT NULL,
Manufacturer String NOT NULL,
Model String NOT NULL,
Price Double NOT NULL
)
);
END DO
json_each_row
Based on JSON. Each message must be a single JSON object. Common for systems like Apache Kafka or YDB topics.
Multiple separate JSON objects in one message are not supported; a JSON array is also not supported.
Valid example (one message):
{ "Year": 1997, "Manufacturer": "Man_1", "Model": "Model_1", "Price": 3000.0 }
Invalid example (two objects in one message):
{ "Year": 1997, "Manufacturer": "Man_1", "Model": "Model_1", "Price": 3000.0 }
{ "Year": 1999, "Manufacturer": "Man_2", "Model": "Model_2", "Price": 4900.00 }
Example query:
CREATE STREAMING QUERY json_each_row_example AS
DO BEGIN
UPSERT INTO output_table
SELECT
*
FROM
ydb_source.input_topic
WITH (
FORMAT = json_each_row,
SCHEMA = (
Year Int32 NOT NULL,
Manufacturer Utf8 NOT NULL,
Model Utf8 NOT NULL,
Price Double NOT NULL
)
);
END DO
json_as_string
Based on JSON.
Each message may contain:
- One JSON object per line in valid JSON representation;
- Or objects combined into a list.
json_as_string does not split the JSON document into fields; each message is one JSON object (or one line). Useful when the set of fields varies between rows.
Valid examples:
{ "Year": 1997, "Attrs": { "Manufacturer": "Man_1", "Model": "Model_1" }, "Price": 3000.0 }
{ "Year": 1999, "Attrs": { "Manufacturer": "Man_2", "Model": "Model_2" }, "Price": 4900.00 }
In this format the schema must be a single column of an allowed type — see below.
Example query:
CREATE STREAMING QUERY json_as_string_example AS
DO BEGIN
INSERT INTO ydb_source.output_topic
SELECT
*
FROM
ydb_source.input_topic
WITH (
FORMAT = json_as_string,
SCHEMA = (
Data Json
)
);
END DO
parquet
Reads message payloads in Apache Parquet format.
Example query:
CREATE STREAMING QUERY parquet_example AS
DO BEGIN
UPSERT INTO output_table
SELECT
*
FROM
ydb_source.input_topic
WITH (
FORMAT = parquet,
SCHEMA = (
Year Int32 NOT NULL,
Manufacturer Utf8 NOT NULL,
Model Utf8 NOT NULL,
Price Double NOT NULL
)
);
END DO
raw
Reads message payloads as raw bytes. Default schema: SCHEMA(Data String).
Example query:
CREATE STREAMING QUERY raw_example AS
DO BEGIN
INSERT INTO ydb_source.output_topic
SELECT
*
FROM
ydb_source.input_topic
WITH (
FORMAT = raw,
SCHEMA = (
Data String
)
);
END DO
Supported schema types
| Type | csv_with_names | tsv_with_names | json_list | json_each_row | json_as_string | parquet | raw |
|---|---|---|---|---|---|---|---|
Int8, Int16, Int32, Int64,Uint8, Uint16, Uint32, Uint64,Float, Double |
✓ | ✓ | ✓ | ✓ | ✓ | ✓ | |
Bool |
✓ | ✓ | ✓ | ✓ | ✓ | ||
DyNumber |
|||||||
String, Utf8 |
✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
Json |
✓ | ✓ | |||||
JsonDocument |
|||||||
Yson |
|||||||
Uuid |
✓ | ||||||
Date, Datetime, Timestamp,TzDate, TzDatetime, TzTimestamp |
|||||||
Interval |
|||||||
Date32, Datetime64, Timestamp64,Interval64,TzDate32, TzDatetime64, TzTimestamp64 |
|||||||
Optional<T> |
✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
Parsing custom formats
JSON with built-in functions
Sample data:
{"key": 1997, "value": "42"}
Example query:
CREATE STREAMING QUERY json_builtins_example AS
DO BEGIN
INSERT INTO ydb_source.output_topic
SELECT
JSON_VALUE(Data, "$.key") AS Key,
JSON_VALUE(Data, "$.value") AS Value
FROM
ydb_source.input_topic
WITH (
FORMAT = raw,
SCHEMA = (
Data Json
)
);
END DO
See JSON_VALUE.
JSON with Yson
Sample data (Change Data Capture style):
{"update":{"volume":10,"product":"bWlsaw=="},"key":[6],"ts":[1765192622420,18446744073709551615]}
Example query:
CREATE STREAMING QUERY json_yson_example AS
DO BEGIN
$input = SELECT
Yson::ConvertTo(
Data, Struct<
update: Struct<volume: Uint64>,
key: List<Uint64>,
ts: List<Uint64>
>
)
FROM
ydb_source.input_topic
WITH (
FORMAT = json_as_string,
SCHEMA = (
Data Json
)
);
INSERT INTO ydb_source.output_topic
SELECT
ts[0] AS Ts,
update.volume AS Volume
FROM
$input
FLATTEN COLUMNS;
END DO
See also:
DSV (TSKV)
Sample data:
name=Elena uid=95792365232151958
name=Denis uid=78086244452810046
name=Mikhail uid=70609792906901286
Example query:
CREATE STREAMING QUERY dsv_example AS
DO BEGIN
$input = SELECT
Dsv::Parse(Data, "\t") AS Data
FROM
ydb_source.input_topic
FLATTEN LIST BY (
String::SplitToList(Data, "\n", TRUE AS SkipEmpty) AS Data
);
INSERT INTO ydb_source.output_topic
SELECT
DictLookup(Data, "name") AS Name,
DictLookup(Data, "uid") AS Uid
FROM
$input;
END DO
See also: