Настройка интеграции между Logstash и YDB
В разделе представлены варианты интеграции между системой сбора и анализа данных Logstash и YDB.
Введение
Logstash – инструмент сбора, фильтрации и нормализации логов. Данный продукт позволяет динамически получать, фильтровать и доставлять данные независимо от их формата и сложности. Конфигурация системы производиться с помощью плагинов различных типов - input, output и filter. Набор Logstash плагинов для работы с YDB располагается в репозитории ydb-logstash-plugins:
- Storage Plugin для сохранения данных в строчной или колоночной таблице YDB;
- Input Topic Plugin для чтение данных из YDB топика;
- Output Topic Plugin для отправки данных в YDB топик.
Плагины можно собрать самостоятельно из исходного кода, либо воспользоваться готовыми сборками под две последнии версии Logstash.
Примечание
Дальнейшие инструкции используют placeholder <path-to-logstash>
, который нужно заменить на путь до установленного в системе Logstash.
Установить любой собранный плагин в Logstash можно с помощью команды
<path-to-logstash>/bin/logstash-plugin install </path/to/logstash-plugin.gem>
Проверить что плагин установлен, можно командой
<path-to-logstash>/bin/logstash-plugin list
Которая выведет список всех установленных плагинов, в котором можно найти интересующие.
Конфигурация подключения плагинов к YDB
Все плагины используют одни и те же параметры для настройки подключения к базе данных YDB. Среди этих параметров обязателен только connection_string
, а все остальные параметры опциональны и позволяют задать режим аутентификации. Если ни один из них не указан, то будет использоваться анонимная аутентификация.
# Пример указан для плагина ydb_storage, настройка подключения
# в плагинах ydb_topics_output и ydb_topics_input аналогична
ydb_storage {
# Строка подключения к БД, содержит схему, адрес, порт и путь БД
connection_string => "grpc://localhost:2136/local"
# Значение токена аутентификации (режим Access Token)
token_auth => "<token_value>"
# Путь до файла со значением токена аутентификации (режим Access Token)
token_file => "</path/to/token/file>"
# Путь до ключа сервисного аккаунта (режим Service Account Key)
sa_key_file => "</path/to/key.json>"
# Флаг для использования сервиса метаданных (режим Metadata)
use_metadata => true
}
Плагин YDB Storage
Данный плагин позволяет перенаправить поток данных из Logstash в таблицу YDB для дальнейшего анализа. Для этого удобно использовать колоночные таблицы, оптимизированные для аналитических запросов. Каждое отдельное поле события Logstash будет записано в отдельный столбец таблицы. Tе поля, которые не соответствуют ни одной колонке, будут проигнорированы.
Конфигурация плагина
Для настройки плагина необходимо добавить секцию ydb_storage
в раздел output
файла конфигурации Logstash. Плагин поддерживает стандартный набор опций для подключения плагинов YDB, плюс несколько специфичных для него опций:
table_name
— обязательный параметр с именем таблицы, в которую будут записываться события Logstashuuid_column
— необязательный параметр с именем колонки, в которую плагин будет записывать случайно сгенерированный идентификаторtimestamp_column
— необязательный параметр с именем колонку для записи времени создания события Logstash
Важно
Плагин Storage не проверяет события Logstash с точки зрения корректности и уникальности первичных ключей таблицы YDB. Для записи в YDB используется пакетная загрузка данных и при получении различных событий с одинаковыми идентификаторами они будут перезаписывать друг друга. Рекомендуется выбирать в качестве первичные ключей те поля события, которые будут присутствовать в каждом сообщении и однозначно его идентифицировать. Если же такого набора полей нет, то можно добавить в первичный ключ специальную колонку и заполнять ее случайным значением с помощью параметра uuid_column
Пример использования
Создание колоночной таблицы для хранения записей
В выбранной базе данных YDB cоздадим новую колоночную таблицу с нужными нами колонками. В качестве первичного ключа будем использовать случайное значение, генерируемое на стороне плагина:
CREATE TABLE `logstash_demo` (
`uuid` Text NOT NULL, -- Уникальный идентификатор
`ts` Timestamp NOT NULL, -- Время создания сообщения
`message` Text,
`user` Text,
`level` Int32,
PRIMARY KEY (
`uuid`
)
)
WITH (STORE = COLUMN);
Настройка плагина в Logstash
Добавим секцию плагина ydb_storage
в раздел output
файла конфигурации Logstash. Дополнительно для проверки работоспособности также добавим плагин http
в секцию input
, что позволит создавать события в Logstash
через http запросы:
output {
ydb_storage {
connection_string => "..." # Адрес подключения к YDB
table_name => "logstash_demo" # Имя таблицы
uuid_column => "uuid" # Колонка в которую плагин будет записывать сгенерированный идентификатор
timestamp_column => "ts" # Колонку в которую плагин будет записывать timestamp события
}
}
input {
http {
port => 9876 # Можно указать любой порт
}
}
После изменения данного файла Logstash
нужно перезапустить.
Отправка тестовых сообщений
Затем можно отправить несколько тестовых сообщений:
curl -H "content-type: application/json" -XPUT 'http://127.0.0.1:9876/http/ping' -d '{ "user" : "demo_user", "message" : "demo message", "level": 4}'
curl -H "content-type: application/json" -XPUT 'http://127.0.0.1:9876/http/ping' -d '{ "user" : "test1", "level": 1}'
curl -H "content-type: application/json" -XPUT 'http://127.0.0.1:9876/http/ping' -d '{ "message" : "error", "level": -3}'
Все команды должны вернуть ok
в случае успешного выполнения.
Проверка наличия записанных сообщений в YDB
Теперь можно убедиться что все отправленные сообщения записаны в таблице. Выполним запрос (не забывая что чтение из колоночной таблицы возможно только в режиме ScanQuery):
SELECT * FROM `logstash_demo`;
и получим список записанных событий:
┌───────┬────────────────┬───────────────────────────────┬─────────────┬────────────────────────────────────────┐
│ level │ message │ ts │ user │ uuid │
├───────┼────────────────┼───────────────────────────────┼─────────────┼────────────────────────────────────────┤
│ -3 │ "error" │ "2024-05-22T13:16:06.491000Z" │ null │ "74cd4048-0b61-4fb9-9385-308714e21881" │
│ 1 │ null │ "2024-05-22T13:15:56.591000Z" │ "test1" │ "1df27d0a-9aa0-42c7-9ea2-ab69bc1f5d87" │
│ 4 │ "demo message" │ "2024-05-22T13:15:38.760000Z" │ "demo_user" │ "b7468cb1-e1e3-46fa-965d-83e604e80a31" │
└───────┴────────────────┴───────────────────────────────┴─────────────┴────────────────────────────────────────┘
Плагин YDB Topic Input
Данный плагин позволяет читать из YDB топика и преобразовывать их в события Logstash
для дальнейшей обработки.
Конфигурация плагина
Для настройки плагина мы должны добавить секцию ydb_topic
в раздел input
файла конфигурации Logstash. Плагин поддерживает стандартный набор опций для подключения плагинов YDB, плюс несколько специфичных для него опций:
topic_path
— обязательный параметр с полным путем топика для чтения;consumer_name
— обязательный параметр с именем читателя топика;schema
— необязательный параметр с вариантам обработки сообщений YDB. По умолчанию плагин читает и отправляет сообщения топика в бинарном виде, но если указать режимJSON
, то каждое сообщение из топика будет трактоваться как JSON объект.
Пример использования
Создание топика
В выбранной базе данных YDB заранее создадим топик и читателя, из которого будет производиться чтение:
ydb -e grpc://localhost:2136 -d /local topic create /local/logstash_demo_topic
ydb -e grpc://localhost:2136 -d /local topic consumer add --consumer logstash-consumer /local/logstash_demo_topic
Настройка плагина в Logstash
Добавим секцию плагина ydb_topic
в раздел input
файла конфигурации Logstash. Дополнительно для проверки работоспособности также добавим плагин stdout
в секцию output
, что позволит отслеживать все события в логе выполнения Logstash
:
input {
ydb_topic {
connection_string => "grpc://localhost:2136/local" # Адрес подключения к YDB
topic_path => "/local/logstash_demo_topic" # Имя топика
consumer_name => "logstash-consumer" # Имя читателя
schema => "JSON" # Используем JSON в качестве формата сообщений топика
}
}
output {
stdout {
codec => rubydebug
}
}
После изменения данного файла Logstash
нужно перезапустить.
Тестовая запись сообщение в топик
Затем отправим несколько тестовых сообщений:
echo '{"message":"test"}' | ydb -e grpc://localhost:2136 -d /local topic write /local/logstash_demo_topic
echo '{"user":123}' | ydb -e grpc://localhost:2136 -d /local topic write /local/logstash_demo_topic
Проверка обработки сообщений в Logstash
Благодаря включенному плагину stdout
в логе Logstash появятся прочитанные из топика сообщения:
{
"message" => "test",
"@timestamp" => 2024-05-23T10:31:47.712896899Z,
"@version" => "1"
}
{
"user" => 123.0,
"@timestamp" => 2024-05-23T10:34:08.574599108Z,
"@version" => "1"
}
Плагин YDB Topic Output
Данный плагин позволяет записывать события Logstash
в YDB топик.
Конфигурация плагина
Для настройки плагина мы должны добавить секцию ydb_topic
в раздел output
файла конфигурации Logstash. Плагин поддерживает стандартный набор опций для подключения плагинов YDB, плюс дополнительные опции:
topic_path
— обязательный параметр с полным путем топика для записи.
Пример использования
Создание топика
В выбранной базе данных YDB заранее создадим топик, в который будет производиться запись:
ydb -e grpc://localhost:2136 -d /local topic create /local/logstash_demo_topic
Настройка плагина в Logstash
Добавим секцию плагина ydb_topic
в раздел output
файла конфигурации Logstash.. Дополнительно для проверки работоспособности также добавим плагин http
в секцию input
, что позволит создавать события в Logstash
через http запросы:
output {
ydb_topic {
connection_string => "grpc://localhost:2136/local" # Адрес подключения к YDB
topic_path => "/local/logstash_demo_topic" # Имя топика для записи
}
}
input {
http {
port => 9876 # Можно указать любой порт
}
}
После изменения данного файла Logstash
нужно перезапустить.
Отправка тестового сообщения
С помощью плагина http
отправим тестовое сообщение:
curl -H "content-type: application/json" -XPUT 'http://127.0.0.1:9876/http/ping' -d '{ "user" : "demo_user", "message" : "demo message", "level": 4}'
Команда вернет ok
в случае успешного выполнения.
Контрольное чтение сообщения из топика
Для проверки успешности записи в топик создадим нового читателя и прочитаем одно сообщение из топика:
ydb -e grpc://localhost:2136 -d /local topic consumer add --consumer logstash-consumer /local/logstash_demo_topic
ydb -e grpc://localhost:2136 -d /local topic read /local/logstash_demo_topic --consumer logstash-consumer --commit true
Чтение вернет содержимое отправленного сообщения:
{"level":4,"message":"demo message","timestamp":1716470292640,"user":"demo_user"}