Quick start: reading and writing to topics
In this guide, you will create your first streaming query.
The query will:
- read events from the input topic
- filter only errors
- count the number of errors per server over 10 minutes
- write the result to the output topic.
Events arrive in JSON format with fields: time, logging level, and server name.
You will perform the following steps:
- creating topics
- creating an external data source
- creating a streaming query
- viewing the query status
- populating the input topic with data
- checking the output topic contents
- deleting the streaming query.
Prerequisites
To run the examples, you will need:
- a running YDB database — see quick start
- enabled flags
enable_external_data_sourcesandenable_streaming_queries.
docker run -d --rm --name ydb-local -h localhost \
--platform linux/amd64 \
-p 2135:2135 -p 2136:2136 -p 8765:8765 -p 9092:9092 \
-v $(pwd)/ydb_certs:/ydb_certs \
-e GRPC_TLS_PORT=2135 -e GRPC_PORT=2136 -e MON_PORT=8765 \
-e YDB_FEATURE_FLAGS=enable_external_data_sources,enable_streaming_queries \
ydbplatform/local-ydb:25.4
./local_ydb deploy \
--ydb-working-dir=/absolute/path/to/working/directory \
--ydb-binary-path=/path/to/kikimr/driver \
--enable-feature-flag=enable_external_data_sources \
--enable-feature-flag=enable_streaming_queries
Note
The examples use the quickstart profile. To learn more, see Creating a profile to connect to a test database.
Step 1. Creating topics
Create the input and output topics:
CREATE TOPIC input_topic;
CREATE TOPIC output_topic;
Verify that the topics are created:
./ydb --profile quickstart scheme ls
Step 2. Creating an external data source
Create an external data source using CREATE EXTERNAL DATA SOURCE:
CREATE EXTERNAL DATA SOURCE ydb_source WITH (
SOURCE_TYPE = "Ydb",
LOCATION = "localhost:2136",
DATABASE_NAME = "/local",
AUTH_METHOD = "NONE"
);
Note
Specify the LOCATION and DATABASE_NAME values that correspond to your YDB database.
Step 3. Creating a streaming query
Create a streaming query using CREATE STREAMING QUERY:
CREATE STREAMING QUERY query_example AS
DO BEGIN
$number_errors = SELECT
Host,
COUNT(*) AS ErrorCount,
CAST(HOP_START() AS String) AS Ts -- Время начала окна, соответствующего результату агрегации
FROM
ydb_source.input_topic
WITH (
FORMAT = json_each_row,
SCHEMA = (
Time String NOT NULL,
Level String NOT NULL,
Host String NOT NULL
)
)
WHERE
Level = "error"
GROUP BY
HOP(CAST(Time AS Timestamp), "PT600S", "PT600S", "PT0S"), -- Число ошибок на неперекрывающихся окнах длиной 10 минут
Host;
INSERT INTO
ydb_source.output_topic
SELECT
ToBytes(Unwrap(Yson::SerializeJson(Yson::From(TableRow())))) -- Сериализация всех колонок в JSON
FROM
$number_errors;
END DO
Details:
- Aggregation
GROUP BY HOPand functionHOP_START— GROUP BY ... HOP. - Writing data to a topic — Write formats.
- Serialization to JSON: TableRow, Yson::From, Yson::SerializeJson, Unwrap, ToBytes.
Step 4. Viewing the query status
Check the query status via the streaming_queries system table:
SELECT
Path,
Status,
Issues,
Run
FROM
`.sys/streaming_queries`
Make sure that the Status field has the value RUNNING. Otherwise, check the Issues field.
If the query is in the SUSPENDED status or there are errors in the Issues field, refer to the error diagnostics section.
Step 5. Populating the input topic with data
Write test messages to the topic using the YDB CLI:
echo '{"Time": "2025-01-01T00:00:00.000000Z", "Level": "error", "Host": "host-1"}' | ./ydb --profile quickstart topic write input_topic
echo '{"Time": "2025-01-01T00:04:00.000000Z", "Level": "error", "Host": "host-2"}' | ./ydb --profile quickstart topic write input_topic
echo '{"Time": "2025-01-01T00:08:00.000000Z", "Level": "error", "Host": "host-1"}' | ./ydb --profile quickstart topic write input_topic
echo '{"Time": "2025-01-01T00:12:00.000000Z", "Level": "error", "Host": "host-2"}' | ./ydb --profile quickstart topic write input_topic
echo '{"Time": "2025-01-01T00:12:00.000000Z", "Level": "error", "Host": "host-1"}' | ./ydb --profile quickstart topic write input_topic
The result will appear in the output topic after the 10-minute aggregation window closes.
Step 6. Checking the output topic contents
Read data from the output topic:
./ydb --profile quickstart topic read output_topic --partition-ids 0 --start-offset 0 --limit 10 --format newline-delimited
Expected result:
{"ErrorCount":1,"Host":"host-2","Ts":"2025-01-01T00:00:00Z"}
{"ErrorCount":2,"Host":"host-1","Ts":"2025-01-01T00:00:00Z"}
Step 7. Deleting the query
Delete the query using DROP STREAMING QUERY:
DROP STREAMING QUERY query_example;
What's next
- Explore the data formats supported in streaming queries.
- Learn how to enrich data with a reference from a local table or from S3.
- Learn how to write results to tables.