Quickstart: reading and writing topics
This tutorial walks you through your first streaming query.
The query will:
- Read events from an input topic;
- Keep only errors;
- Count errors per server over 10-minute windows;
- Write results to an output topic.
Events are JSON with timestamp, log level, and host name.
Steps:
- Create topics;
- Create an external data source;
- Create the streaming query;
- Check query state;
- Produce sample input;
- Read the output topic;
- Delete the streaming query.
Prerequisites
You need:
- A running YDB database — see quick start;
- Feature flags
enable_external_data_sourcesandenable_streaming_queriesenabled.
docker run -d --rm --name ydb-local -h localhost \
--platform linux/amd64 \
-p 2135:2135 -p 2136:2136 -p 8765:8765 -p 9092:9092 \
-v $(pwd)/ydb_certs:/ydb_certs \
-e GRPC_TLS_PORT=2135 -e GRPC_PORT=2136 -e MON_PORT=8765 \
-e YDB_FEATURE_FLAGS=enable_external_data_sources,enable_streaming_queries \
ydbplatform/local-ydb:25.4
./local_ydb deploy \
--ydb-working-dir=/absolute/path/to/working/directory \
--ydb-binary-path=/path/to/kikimr/driver \
--enable-feature-flag=enable_external_data_sources \
--enable-feature-flag=enable_streaming_queries
Note
The examples use the quickstart profile. To learn more, see Creating a profile to connect to a test database.
Step 1. Create topics
Create input and output topics:
CREATE TOPIC input_topic;
CREATE TOPIC output_topic;
Verify:
./ydb --profile quickstart scheme ls
Step 2. Create an external data source
Create an external data source with CREATE EXTERNAL DATA SOURCE:
CREATE EXTERNAL DATA SOURCE ydb_source WITH (
SOURCE_TYPE = "Ydb",
LOCATION = "localhost:2136",
DATABASE_NAME = "/local",
AUTH_METHOD = "NONE"
);
Note
Set LOCATION and DATABASE_NAME to match your YDB deployment.
Step 3. Create the streaming query
Create a streaming query with CREATE STREAMING QUERY:
CREATE STREAMING QUERY query_example AS
DO BEGIN
$number_errors = SELECT
Host,
COUNT(*) AS ErrorCount,
CAST(HOP_START() AS String) AS Ts -- Window start time for the aggregate row
FROM
ydb_source.input_topic
WITH (
FORMAT = json_each_row,
SCHEMA = (
Time String NOT NULL,
Level String NOT NULL,
Host String NOT NULL
)
)
WHERE
Level = "error"
GROUP BY
HOP(CAST(Time AS Timestamp), "PT600S", "PT600S", "PT0S"), -- Non-overlapping 10-minute windows
Host;
INSERT INTO
ydb_source.output_topic
SELECT
ToBytes(Unwrap(Yson::SerializeJson(Yson::From(TableRow())))) -- Serialize columns to JSON
FROM
$number_errors;
END DO
More detail:
GROUP BY HOPandHOP_START— GROUP BY ... HOP.- Writing to topics — Write formats.
- JSON serialization: TableRow, Yson::From, Yson::SerializeJson, Unwrap, ToBytes.
Step 4. Check query state
Inspect the .sys/streaming_queries system view View streaming query metadata:
SELECT
Path,
Status,
Issues,
Run
FROM
`.sys/streaming_queries`
Status should be RUNNING. Otherwise inspect Issues.
If the query is SUSPENDED or Issues contains errors, see troubleshooting documentation.
Step 5. Produce sample input
Write test messages with YDB CLI:
echo '{"Time": "2025-01-01T00:00:00.000000Z", "Level": "error", "Host": "host-1"}' | ./ydb --profile quickstart topic write input_topic
echo '{"Time": "2025-01-01T00:04:00.000000Z", "Level": "error", "Host": "host-2"}' | ./ydb --profile quickstart topic write input_topic
echo '{"Time": "2025-01-01T00:08:00.000000Z", "Level": "error", "Host": "host-1"}' | ./ydb --profile quickstart topic write input_topic
echo '{"Time": "2025-01-01T00:12:00.000000Z", "Level": "error", "Host": "host-2"}' | ./ydb --profile quickstart topic write input_topic
echo '{"Time": "2025-01-01T00:12:00.000000Z", "Level": "error", "Host": "host-1"}' | ./ydb --profile quickstart topic write input_topic
Results appear in the output topic after the 10-minute aggregation window closes.
Step 6. Read the output topic
./ydb --profile quickstart topic read output_topic --partition-ids 0 --start-offset 0 --limit 10 --format newline-delimited
Expected output:
{"ErrorCount":1,"Host":"host-2","Ts":"2025-01-01T00:00:00Z"}
{"ErrorCount":2,"Host":"host-1","Ts":"2025-01-01T00:00:00Z"}
Step 7. Delete the query
DROP STREAMING QUERY query_example;
Next steps
- Data formats supported in streaming queries.
- Enrich with a lookup from a local table or S3.
- Write results to tables.