Integrating Logstash and YDB

This section describes the integration options between YDB and Logstash, a server-side data collection and processing pipeline.

Introduction

Logstash dynamically ingests, transforms, and ships data regardless of format or complexity. A Logstash pipeline can contain different types of plugins: input, output, and filter. The YDB Logstash plugins repository is hosted on GitHub and contains the following plugins:

  • Storage Plugin for persisting Logstash events in row-oriented or column-oriented YDB tables;
  • Input Topic Plugin for reading Logstash events from YDB topics;
  • Output Topic Plugin for sending Logstash events to YDB topics.

These plugins can be built from the source code or downloaded as pre-built artifacts for the two latest versions of Logstash.

Note

Further code snippets use the placeholder <path-to-logstash>, which must be replaced by a path to a directory with installed Logstash.

Execute the following command to install any Logstash plugin:

<path-to-logstash>/bin/logstash-plugin install </path/to/logstash-plugin.gem>

Check that the plugin has been installed:

<path-to-logstash>/bin/logstash-plugin list

The command will return a list of all installed plugins, which contain the plugin's name.

Configure YDB connection

All plugins use the same set of parameters to configure the connection to YDB. This set contains only one required parameter, connection_string. Other parameters are optional and allow configuring an authentication mode. An anonymous mode will be used if the configuration doesn't contain any of these parameters.

# This example demonstrates configuration for ydb_storage plugin.
# The plugins ydb_topics_output and ydb_topics_input configure the same way.
ydb_storage {
    # Database connection string contains a schema, hostname, port, and database path
    connection_string => "grpc://localhost:2136/local"
    # Authentication token (for using the "Access Token" mode)
    token_auth => "<token_value>"
    # Authentication token file path (for using the "Access Token" mode)
    token_file => "</path/to/token/file>"
    # Service account key file path (for using the "Service Account Key" mode)
    sa_key_file => "</path/to/key.json>"
    # Flag to use metadata authentication service (for using the "Metadata" mode)
    use_metadata => true
}

YDB Storage Plugin

This plugin allows storing the Logstash events stream in YDB tables for further analysis. This is especially useful with column-oriented tables optimized for handling Online Analytical Processing (OLAP) requests. Every field of a Logstash event will be stored in a column with a corresponding name. Fields that do not match any column will be ignored.

Configuration

The plugin configuration is done by adding a ydb_storage block in the output section of the Logstash pipeline config file. The plugin supports the standard set of connection parameters and a few additional options:

  • table_name — the required name of the destination table.
  • uuid_column — the optional name of the column that the plugin will use for storing an auto-generated identifier.
  • timestamp_column — the optional name of the column that the plugin will use for storing the events' timestamp.

Warning

The Storage plugin doesn't check if the Logstash event has correct and unique values for the table's primary key. It uses the bulk upsert method for storing data in YDB, and if multiple events have the same primary keys, they will overwrite each other. The primary key is recommended to contain those event fields that will be present in every event and can uniquely identify each event. If no such set of fields exists, add an extra column to the primary key and fill it with a random value using the uuid_column parameter.

Usage example

Creating a table

Create a new column-oriented table with the necessary columns in any existing YDB database. For example, the table below uses a random value generated by the plugin.

CREATE TABLE `logstash_demo` (
    `uuid`     Text NOT NULL,      -- identifier
    `ts`       Timestamp NOT NULL, -- timestamp of event creation
    `message`  Text,
    `user`     Text,
    `level`    Int32,

    PRIMARY KEY (
         `uuid`
    )
)
WITH (STORE = COLUMN);

Setup plugin in Logstash pipeline config

To activate the plugin, add the ydb_storage block in the output section of the Logstash pipeline config. Additionally, add the http block in the input section for creating test messages via HTTP requests:

output {
  ydb_storage {
    connection_string => "..."    # YDB connection string
    table_name => "logstash_demo" # the table name
    uuid_column => "uuid"         # the primary key column with a random UUID
    timestamp_column => "ts"      # the column for storing the event timestamp
  }
}

input {
  http {
    port => 9876 # Any free port
  }
}

To apply these changes, restart Logstash.

Send test messages

Send a few test messages:

curl -H "content-type: application/json" -XPUT 'http://127.0.0.1:9876/http/ping' -d '{ "user" : "demo_user", "message" : "demo message", "level": 4}'
curl -H "content-type: application/json" -XPUT 'http://127.0.0.1:9876/http/ping' -d '{ "user" : "test1", "level": 1}'
curl -H "content-type: application/json" -XPUT 'http://127.0.0.1:9876/http/ping' -d '{ "message" : "error", "level": -3}'

All commands return ok if the messages are sent.

Check that the messages are stored in YDB

To check that all sent messages are written to the table, execute the following query using ScanQuery mode:

SELECT * FROM `logstash_demo`;

The query will return a list of written events:

┌───────┬────────────────┬───────────────────────────────┬─────────────┬────────────────────────────────────────┐
│ level │ message        │ ts                            │  user       │ uuid                                   │
├───────┼────────────────┼───────────────────────────────┼─────────────┼────────────────────────────────────────┤
│ -3    │ "error"        │ "2024-05-22T13:16:06.491000Z" │  null       │ "74cd4048-0b61-4fb9-9385-308714e21881" │
│  1    │ null           │ "2024-05-22T13:15:56.591000Z" │ "test1"     │ "1df27d0a-9aa0-42c7-9ea2-ab69bc1f5d87" │
│  4    │ "demo message" │ "2024-05-22T13:15:38.760000Z" │ "demo_user" │ "b7468cb1-e1e3-46fa-965d-83e604e80a31" │
└───────┴────────────────┴───────────────────────────────┴─────────────┴────────────────────────────────────────┘

YDB Topic Input Plugin

This plugin allows reading messages from YDB topics and transforming them into Logstash events.

Configuration

The plugin configuration is done by adding a ydb_topic block to the input section of the Logstash pipeline config. The plugin supports the standard set of connection parameters and a few additional options:

  • topic_path — the required the full path of the topic for reading.
  • consumer_name — the required the name of the topic consumer.
  • schema — the optional mode for processing of the YDB events. By default, the plugin reads and sends messages as binary data, but if you specify the JSON mode, each message will be parsed as a JSON object.

Usage example

Create a topic

Create a topic and add a consumer to it in any existing YDB database:

ydb -e grpc://localhost:2136 -d /local topic create /local/logstash_demo_topic
ydb -e grpc://localhost:2136 -d /local topic consumer add --consumer logstash-consumer /local/logstash_demo_topic

Setup plugin in Logstash pipeline config

To activate the plugin, add the ydb_topic block in the input section of the Logstash pipeline config. Additionally, add the stdout plugin in the output section for logging all Logstash events:

input {
  ydb_topic {
    connection_string => "grpc://localhost:2136/local" # YDB connection string
    topic_path => "/local/logstash_demo_topic"         # The full path of the topic to read
    consumer_name => "logstash-consumer"               # The consumer name
    schema => "JSON"                                   # Use JSON mode
  }
}

output {
  stdout { }
}

To apply these changes, restart Logstash.

Write a test message to the topic

Send a few test messages to the topic:

echo '{"message":"test"}' | ydb -e grpc://localhost:2136 -d /local topic write /local/logstash_demo_topic
echo '{"user":123}' | ydb -e grpc://localhost:2136 -d /local topic write /local/logstash_demo_topic

Check if Logstash processed the messages

The stdout plugin writes the messages to the Logstash logs:

{
       "message" => "test",
    "@timestamp" => 2024-05-23T10:31:47.712896899Z,
      "@version" => "1"
}
{
          "user" => 123.0,
    "@timestamp" => 2024-05-23T10:34:08.574599108Z,
      "@version" => "1"
}

YDB Topic Output Plugin

This plugin allows writing Logstash events to a YDB topic.

Configuration

The plugin configuration is done by adding a ydb_topic block to the output section of the Logstash pipeline config. The plugin supports the standard set of connection parameters and a few additional options:

  • topic_path — the required the full path of the topic for writing.

Usage example

Create a topic

Create a topic in any existing YDB database:

ydb -e grpc://localhost:2136 -d /local topic create /local/logstash_demo_topic

Setup plugin in Logstash pipeline config

To activate the plugin, add the ydb_topic block in the output section of the Logstash pipeline config. Additionally, add the http block in the input section for creating test messages via HTTP requests:

output {
  ydb_topic {
    connection_string => "grpc://localhost:2136/local" # YDB connection string
    topic_path => "/local/logstash_demo_topic"         # The topic name for writing
  }
}

input {
  http {
    port => 9876 # Any free port
  }
}

To apply these changes, restart Logstash.

Send a test message

Send a test message via the http plugin:

curl -H "content-type: application/json" -XPUT 'http://127.0.0.1:9876/http/ping' -d '{ "user" : "demo_user", "message" : "demo message", "level": 4}'

The command returns ok if the message has been sent successfully.

Reading the message from the topic

Check that the plugin wrote the message to the topic successfully by reading this message via CLI:

ydb -e grpc://localhost:2136 -d /local topic consumer add --consumer logstash-consumer /local/logstash_demo_topic
ydb -e grpc://localhost:2136 -d /local topic read /local/logstash_demo_topic --consumer logstash-consumer --commit true

The latter command will return the message content:

{"level":4,"message":"demo message","timestamp":1716470292640,"user":"demo_user"}