Quick start: reading and writing to topics

In this guide, you will create your first streaming query.

The query will:

  • read events from the input topic
  • filter only errors
  • count the number of errors per server over 10 minutes
  • write the result to the output topic.

Events arrive in JSON format with fields: time, logging level, and server name.

You will perform the following steps:

Prerequisites

To run the examples, you will need:

  • a running YDB database — see quick start
  • enabled flags enable_external_data_sources and enable_streaming_queries.
docker run -d --rm --name ydb-local -h localhost \
  --platform linux/amd64 \
  -p 2135:2135 -p 2136:2136 -p 8765:8765 -p 9092:9092 \
  -v $(pwd)/ydb_certs:/ydb_certs \
  -e GRPC_TLS_PORT=2135 -e GRPC_PORT=2136 -e MON_PORT=8765 \
  -e YDB_FEATURE_FLAGS=enable_external_data_sources,enable_streaming_queries \
  ydbplatform/local-ydb:25.4
./local_ydb deploy \
  --ydb-working-dir=/absolute/path/to/working/directory \
  --ydb-binary-path=/path/to/kikimr/driver \
  --enable-feature-flag=enable_external_data_sources \
  --enable-feature-flag=enable_streaming_queries

Note

The examples use the quickstart profile. To learn more, see Creating a profile to connect to a test database.

Step 1. Creating topics

Create the input and output topics:

CREATE TOPIC input_topic;
CREATE TOPIC output_topic;

Verify that the topics are created:

./ydb --profile quickstart scheme ls

Step 2. Creating an external data source

Create an external data source using CREATE EXTERNAL DATA SOURCE:

CREATE EXTERNAL DATA SOURCE ydb_source WITH (
    SOURCE_TYPE = "Ydb",
    LOCATION = "localhost:2136",
    DATABASE_NAME = "/local",
    AUTH_METHOD = "NONE"
);

Note

Specify the LOCATION and DATABASE_NAME values that correspond to your YDB database.

Step 3. Creating a streaming query

Create a streaming query using CREATE STREAMING QUERY:

CREATE STREAMING QUERY query_example AS
DO BEGIN

$number_errors = SELECT
    Host,
    COUNT(*) AS ErrorCount,
    CAST(HOP_START() AS String) AS Ts  -- Время начала окна, соответствующего результату агрегации
FROM
    ydb_source.input_topic
WITH (
    FORMAT = json_each_row,
    SCHEMA = (
        Time String NOT NULL,
        Level String NOT NULL,
        Host String NOT NULL
    )
)
WHERE
    Level = "error"
GROUP BY
    HOP(CAST(Time AS Timestamp), "PT600S", "PT600S", "PT0S"),  -- Число ошибок на неперекрывающихся окнах длиной 10 минут
    Host;

INSERT INTO
    ydb_source.output_topic
SELECT
    ToBytes(Unwrap(Yson::SerializeJson(Yson::From(TableRow()))))  -- Сериализация всех колонок в JSON
FROM
    $number_errors;

END DO

Details:

Step 4. Viewing the query status

Check the query status via the streaming_queries system table:

SELECT
    Path,
    Status,
    Issues,
    Run
FROM
    `.sys/streaming_queries`

Make sure that the Status field has the value RUNNING. Otherwise, check the Issues field.

If the query is in the SUSPENDED status or there are errors in the Issues field, refer to the error diagnostics section.

Step 5. Populating the input topic with data

Write test messages to the topic using the YDB CLI:

echo '{"Time": "2025-01-01T00:00:00.000000Z", "Level": "error", "Host": "host-1"}' | ./ydb --profile quickstart topic write input_topic
echo '{"Time": "2025-01-01T00:04:00.000000Z", "Level": "error", "Host": "host-2"}' | ./ydb --profile quickstart topic write input_topic
echo '{"Time": "2025-01-01T00:08:00.000000Z", "Level": "error", "Host": "host-1"}' | ./ydb --profile quickstart topic write input_topic
echo '{"Time": "2025-01-01T00:12:00.000000Z", "Level": "error", "Host": "host-2"}' | ./ydb --profile quickstart topic write input_topic
echo '{"Time": "2025-01-01T00:12:00.000000Z", "Level": "error", "Host": "host-1"}' | ./ydb --profile quickstart topic write input_topic

The result will appear in the output topic after the 10-minute aggregation window closes.

Step 6. Checking the output topic contents

Read data from the output topic:

./ydb --profile quickstart topic read output_topic --partition-ids 0 --start-offset 0 --limit 10 --format newline-delimited

Expected result:

{"ErrorCount":1,"Host":"host-2","Ts":"2025-01-01T00:00:00Z"}
{"ErrorCount":2,"Host":"host-1","Ts":"2025-01-01T00:00:00Z"}

Step 7. Deleting the query

Delete the query using DROP STREAMING QUERY:

DROP STREAMING QUERY query_example;

What's next

See also