Quickstart: reading and writing topics

This tutorial walks you through your first streaming query.

The query will:

  • Read events from an input topic;
  • Keep only errors;
  • Count errors per server over 10-minute windows;
  • Write results to an output topic.

Events are JSON with timestamp, log level, and host name.

Steps:

Prerequisites

You need:

  • A running YDB database — see quick start;
  • Feature flags enable_external_data_sources and enable_streaming_queries enabled.
docker run -d --rm --name ydb-local -h localhost \
  --platform linux/amd64 \
  -p 2135:2135 -p 2136:2136 -p 8765:8765 -p 9092:9092 \
  -v $(pwd)/ydb_certs:/ydb_certs \
  -e GRPC_TLS_PORT=2135 -e GRPC_PORT=2136 -e MON_PORT=8765 \
  -e YDB_FEATURE_FLAGS=enable_external_data_sources,enable_streaming_queries \
  ydbplatform/local-ydb:25.4
./local_ydb deploy \
  --ydb-working-dir=/absolute/path/to/working/directory \
  --ydb-binary-path=/path/to/kikimr/driver \
  --enable-feature-flag=enable_external_data_sources \
  --enable-feature-flag=enable_streaming_queries

Note

The examples use the quickstart profile. To learn more, see Creating a profile to connect to a test database.

Step 1. Create topics

Create input and output topics:

CREATE TOPIC input_topic;
CREATE TOPIC output_topic;

Verify:

./ydb --profile quickstart scheme ls

Step 2. Create an external data source

Create an external data source with CREATE EXTERNAL DATA SOURCE:

CREATE EXTERNAL DATA SOURCE ydb_source WITH (
    SOURCE_TYPE = "Ydb",
    LOCATION = "localhost:2136",
    DATABASE_NAME = "/local",
    AUTH_METHOD = "NONE"
);

Note

Set LOCATION and DATABASE_NAME to match your YDB deployment.

Step 3. Create the streaming query

Create a streaming query with CREATE STREAMING QUERY:

CREATE STREAMING QUERY query_example AS
DO BEGIN

$number_errors = SELECT
    Host,
    COUNT(*) AS ErrorCount,
    CAST(HOP_START() AS String) AS Ts  -- Window start time for the aggregate row
FROM
    ydb_source.input_topic
WITH (
    FORMAT = json_each_row,
    SCHEMA = (
        Time String NOT NULL,
        Level String NOT NULL,
        Host String NOT NULL
    )
)
WHERE
    Level = "error"
GROUP BY
    HOP(CAST(Time AS Timestamp), "PT600S", "PT600S", "PT0S"),  -- Non-overlapping 10-minute windows
    Host;

INSERT INTO
    ydb_source.output_topic
SELECT
    ToBytes(Unwrap(Yson::SerializeJson(Yson::From(TableRow()))))  -- Serialize columns to JSON
FROM
    $number_errors;

END DO

More detail:

Step 4. Check query state

Inspect the .sys/streaming_queries system view View streaming query metadata:

SELECT
    Path,
    Status,
    Issues,
    Run
FROM
    `.sys/streaming_queries`

Status should be RUNNING. Otherwise inspect Issues.

If the query is SUSPENDED or Issues contains errors, see troubleshooting documentation.

Step 5. Produce sample input

Write test messages with YDB CLI:

echo '{"Time": "2025-01-01T00:00:00.000000Z", "Level": "error", "Host": "host-1"}' | ./ydb --profile quickstart topic write input_topic
echo '{"Time": "2025-01-01T00:04:00.000000Z", "Level": "error", "Host": "host-2"}' | ./ydb --profile quickstart topic write input_topic
echo '{"Time": "2025-01-01T00:08:00.000000Z", "Level": "error", "Host": "host-1"}' | ./ydb --profile quickstart topic write input_topic
echo '{"Time": "2025-01-01T00:12:00.000000Z", "Level": "error", "Host": "host-2"}' | ./ydb --profile quickstart topic write input_topic
echo '{"Time": "2025-01-01T00:12:00.000000Z", "Level": "error", "Host": "host-1"}' | ./ydb --profile quickstart topic write input_topic

Results appear in the output topic after the 10-minute aggregation window closes.

Step 6. Read the output topic

./ydb --profile quickstart topic read output_topic --partition-ids 0 --start-offset 0 --limit 10 --format newline-delimited

Expected output:

{"ErrorCount":1,"Host":"host-2","Ts":"2025-01-01T00:00:00Z"}
{"ErrorCount":2,"Host":"host-1","Ts":"2025-01-01T00:00:00Z"}

Step 7. Delete the query

DROP STREAMING QUERY query_example;

Next steps

See also