Форматы данных при чтении/записи из топиков
В данном разделе описываются форматы данных потоковых запросов, поддерживаемые в YDB при чтении из топиков, и список поддерживаемых YQL типов для каждого формата данных.
Поддерживаемые форматы данных
YDB поддерживает встроенные форматы данных (предопределённые по умолчанию) и пользовательские форматы (настраиваемые пользователем под специфические задачи).
Список встроенных в YDB форматов данных приведен ниже:
Примеры парсинга данных в пользовательских форматах.
В примерах на этой странице ydb_source — это заранее созданный внешний источник данных, input_topic и output_topic — топики, доступные через него, а output_table — таблица YDB.
Форматы при записи данных
При записи в топик SELECT должен возвращать одну колонку типа String, Utf8, Json или Yson. Колонка не может быть Optional.
Запись одной колонки:
CREATE STREAMING QUERY write_string_example AS
DO BEGIN
INSERT INTO ydb_source.output_topic
SELECT
CAST(Data AS String)
FROM
ydb_source.input_topic
WITH (
FORMAT = raw,
SCHEMA = (
Data String
)
);
END DO
Чтобы записать несколько колонок, сериализуйте их в JSON:
CREATE STREAMING QUERY write_json_example AS
DO BEGIN
INSERT INTO ydb_source.output_topic
SELECT
ToBytes(Unwrap(Yson::SerializeJson(Yson::From(TableRow()))))
FROM
ydb_source.input_topic
WITH (
FORMAT = json_each_row,
SCHEMA = (
Id Uint64 NOT NULL,
Name Utf8 NOT NULL
)
);
END DO
Подробнее о функциях: TableRow, Yson::From, Yson::SerializeJson, Unwrap, ToBytes.
Форматы при чтении данных
Формат csv_with_names
Данный формат основан на формате CSV. Данные размещены в колонках, разделены запятыми, в первой строке находятся имена колонок.
Пример данных (в одном сообщении):
Year,Manufacturer,Model,Price
1997,Man_1,Model_1,3000.00
1999,Man_2,Model_2,4900.00
Пример запроса:
CREATE STREAMING QUERY csv_example AS
DO BEGIN
UPSERT INTO output_table
SELECT
*
FROM
ydb_source.input_topic
WITH (
FORMAT = csv_with_names,
SCHEMA = (
Year Int32 NOT NULL,
Manufacturer String NOT NULL,
Model String NOT NULL,
Price Double NOT NULL
)
);
END DO
Формат tsv_with_names
Данный формат основан на формате TSV. Данные размещены в колонках, разделены символами табуляции (код 0x9), в первой строке находятся имена колонок.
Пример данных (в одном сообщении):
Year Manufacturer Model Price
1997 Man_1 Model_1 3000.00
1999 Man_2 Model_2 4900.00
Пример запроса:
CREATE STREAMING QUERY tsv_example AS
DO BEGIN
UPSERT INTO output_table
SELECT
*
FROM
ydb_source.input_topic
WITH (
FORMAT = tsv_with_names,
SCHEMA = (
Year Int32 NOT NULL,
Manufacturer String NOT NULL,
Model String NOT NULL,
Price Double NOT NULL
)
);
END DO
Формат json_list
Данный формат основан на JSON-представлении данных. В этом формате каждое сообщение должно представлять собой JSON‑массив объектов.
Пример корректных данных (в виде списка объектов JSON):
[
{ "Year": 1997, "Manufacturer": "Man_1", "Model": "Model_1", "Price": 3000.0 },
{ "Year": 1999, "Manufacturer": "Man_2", "Model": "Model_2", "Price": 4900.00 }
]
Пример некорректных данных (в каждом сообщении находится отдельный объект в формате JSON, но эти объекты не объединены в список):
{ "Year": 1997, "Manufacturer": "Man_1", "Model": "Model_1", "Price": 3000.0 }
{ "Year": 1999, "Manufacturer": "Man_2", "Model": "Model_2", "Price": 4900.00 }
Пример запроса:
CREATE STREAMING QUERY json_list_example AS
DO BEGIN
UPSERT INTO output_table
SELECT
*
FROM
ydb_source.input_topic
WITH (
FORMAT = json_list,
SCHEMA = (
Year Int32 NOT NULL,
Manufacturer String NOT NULL,
Model String NOT NULL,
Price Double NOT NULL
)
);
END DO
Формат json_each_row
Данный формат основан на JSON-представлении данных. В этом формате каждое сообщение должно представлять собой JSON-объект. Такой формат используется при передаче данных через потоковые системы, например, Apache Kafka или Топики YDB.
Несколько отдельных JSON в одном сообщении не поддерживается; JSON-список также не поддерживается.
Пример корректных данных (в одном сообщении):
{ "Year": 1997, "Manufacturer": "Man_1", "Model": "Model_1", "Price": 3000.0 }
Пример некорректных данных:
{ "Year": 1997, "Manufacturer": "Man_1", "Model": "Model_1", "Price": 3000.0 }
{ "Year": 1999, "Manufacturer": "Man_2", "Model": "Model_2", "Price": 4900.00 }
Пример запроса:
CREATE STREAMING QUERY json_each_row_example AS
DO BEGIN
UPSERT INTO output_table
SELECT
*
FROM
ydb_source.input_topic
WITH (
FORMAT = json_each_row,
SCHEMA = (
Year Int32 NOT NULL,
Manufacturer Utf8 NOT NULL,
Model Utf8 NOT NULL,
Price Double NOT NULL
)
);
END DO
Формат json_as_string
Данный формат основан на JSON-представлении данных.
В этом формате внутри каждого сообщения должен находиться:
- объект в корректном JSON-представлении в каждой отдельной строке файла;
- объекты в корректном JSON-представлении, объединенные в список.
Формат json_as_string не разбивает входной JSON-документ на поля, а представляет каждое сообщение в виде одного объекта JSON (или одной строки). Такой формат удобен, если список полей не одинаков во всех строках, а может изменяться.
Пример корректных данных:
{ "Year": 1997, "Attrs": { "Manufacturer": "Man_1", "Model": "Model_1" }, "Price": 3000.0 }
{ "Year": 1999, "Attrs": { "Manufacturer": "Man_2", "Model": "Model_2" }, "Price": 4900.00 }
В этом формате схема читаемых данных должна состоять только из одной колонки с одним из разрешённых типов данных, подробнее см. ниже.
Пример запроса:
CREATE STREAMING QUERY json_as_string_example AS
DO BEGIN
INSERT INTO ydb_source.output_topic
SELECT
*
FROM
ydb_source.input_topic
WITH (
FORMAT = json_as_string,
SCHEMA = (
Data Json
)
);
END DO
Формат parquet
Данный формат позволяет считывать содержимое сообщений в формате Apache Parquet.
Пример запроса:
CREATE STREAMING QUERY parquet_example AS
DO BEGIN
UPSERT INTO output_table
SELECT
*
FROM
ydb_source.input_topic
WITH (
FORMAT = parquet,
SCHEMA = (
Year Int32 NOT NULL,
Manufacturer Utf8 NOT NULL,
Model Utf8 NOT NULL,
Price Double NOT NULL
)
);
END DO
Формат raw
Данный формат позволяет считывать содержимое сообщений как есть, в "сыром" виде. Считанные таким образом данные можно обработать средствами YQL. Cхема по умолчанию: SCHEMA(Data String).
Пример запроса:
CREATE STREAMING QUERY raw_example AS
DO BEGIN
INSERT INTO ydb_source.output_topic
SELECT
*
FROM
ydb_source.input_topic
WITH (
FORMAT = raw,
SCHEMA = (
Data String
)
);
END DO
Поддерживаемые типы данных
Таблица всех поддерживаемых типов в схеме запроса:
| Тип | csv_with_names | tsv_with_names | json_list | json_each_row | json_as_string | parquet | raw |
|---|---|---|---|---|---|---|---|
Int8, Int16, Int32, Int64,Uint8, Uint16, Uint32, Uint64,Float, Double |
✓ | ✓ | ✓ | ✓ | ✓ | ✓ | |
Bool |
✓ | ✓ | ✓ | ✓ | ✓ | ||
DyNumber |
|||||||
String, Utf8 |
✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
Json |
✓ | ✓ | |||||
JsonDocument |
|||||||
Yson |
|||||||
Uuid |
✓ | ||||||
Date, Datetime, Timestamp,TzDate, TzDateTime, TzTimestamp |
|||||||
Interval |
|||||||
Date32, Datetime64, Timestamp64,Interval64,TzDate32, TzDateTime64, TzTimestamp64 |
|||||||
Optional<T> |
✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
Примеры парсинга данных в пользовательских форматах
Парсинг JSON встроенными функциями
Пример данных:
{"key": 1997, "value": "42"}
Пример запроса:
CREATE STREAMING QUERY json_builtins_example AS
DO BEGIN
INSERT INTO ydb_source.output_topic
SELECT
JSON_VALUE(Data, "$.key") AS Key,
JSON_VALUE(Data, "$.value") AS Value
FROM
ydb_source.input_topic
WITH (
FORMAT = raw,
SCHEMA = (
Data Json
)
);
END DO
Подробнее о функциях: JSON_VALUE.
Парсинг JSON библиотекой Yson
Пример данных (формат Change Data Capture):
{"update":{"volume":10,"product":"bWlsaw=="},"key":[6],"ts":[1765192622420,18446744073709551615]}
Пример запроса:
CREATE STREAMING QUERY json_yson_example AS
DO BEGIN
$input = SELECT
Yson::ConvertTo(
Data, Struct<
update: Struct<volume: Uint64>,
key: List<Uint64>,
ts: List<Uint64>
>
)
FROM
ydb_source.input_topic
WITH (
FORMAT = json_as_string,
SCHEMA = (
Data Json
)
);
INSERT INTO ydb_source.output_topic
SELECT
ts[0] AS Ts,
update.volume AS Volume
FROM
$input
FLATTEN COLUMNS;
END DO
Подробнее о функциях:
Парсинг DSV (TSKV)
Пример данных:
name=Elena uid=95792365232151958
name=Denis uid=78086244452810046
name=Mikhail uid=70609792906901286
Пример запроса:
CREATE STREAMING QUERY dsv_example AS
DO BEGIN
$input = SELECT
Dsv::Parse(Data, "\t") AS Data
FROM
ydb_source.input_topic
FLATTEN LIST BY (
String::SplitToList(Data, "\n", TRUE AS SkipEmpty) AS Data
);
INSERT INTO ydb_source.output_topic
SELECT
DictLookup(Data, "name") AS Name,
DictLookup(Data, "uid") AS Uid
FROM
$input;
END DO
Подробнее о функциях: