Примеры чтения и записи по Kafka API
В этой статье приведены примеры чтения и записи в топики с использованием Kafka API.
Перед выполнением примеров:
- Создайте топик.
- Добавьте читателя.
- Если у вас включена аутентификация, создайте пользователя.
Начало работы
В Docker
Запустите Docker по этой инструкции. Kafka API будет доступен на 9092 порте.
Примеры работы с Kafka API
Чтение
При чтении отличительной особенностью Kafka API являются:
- отсутствие поддержки опции check.crcs;
- только одна стратегия назначения партиция - roundrobin;
- отсутствие возможности читать без предварительно созданной группы читателей.
Поэтому в конфигурации читателя всегда нужно указывать имя группы читателей и параметры:
check.crcs=false
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
Ниже даны примеры чтения по Kafka протоколу для разных приложений, языков программирования и фреймворков подключения без аутентификации.
Примеры того, как настроить аутентификацию, смотри в разделе Примеры с аутентификацией
Примечание
При использовании консольных утилит Kafka с Java 23 и получении ошибки
java.lang.UnsupportedOperationException: getSubject is supported only if a security manager is allowed
, либо запустите команду, используя другую версию Java (как сменить версию Java на macos)
, либо запустите команду, указав для java флаг -Djava.security.manager=allow
.
Например: KAFKA_OPTS=-Djava.security.manager=allow kafka-topics --boostratp-servers localhost:9092 --list
kafka-console-consumer --bootstrap-server localhost:9092 \
--topic my-topic \
--group my-group \
--from-beginning \
--consumer-property check.crcs=false \
--consumer-property partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
kcat -C \
-b <ydb-endpoint> \
-X check.crcs=false \
-X partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor \
-G <consumer-name> <topic-name>
String HOST = "<ydb-endpoint>";
String TOPIC = "<topic-name>";
String CONSUMER = "<consumer-name>";
Properties props = new Properties();
props.put("bootstrap.servers", HOST);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("check.crcs", false);
props.put("partition.assignment.strategy", RoundRobinAssignor.class.getName());
props.put("group.id", CONSUMER);
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(new String[] {TOPIC}));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(10000); // timeout 10 sec
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.key() + ":" + record.value());
}
}
Apache Spark при работе с Kafka не использует ничего из существующих ограничений Kafka API в YDB Topics.
Благодаря этому использование Spark с Ydb topics возможно в полном объеме.
public class ExampleReadApp {
public static void main(String[] args) {
var conf = new SparkConf().setAppName("my-app").setMaster("local");
var context = new SparkContext(conf);
context.setCheckpointDir("checkpoints");
SparkSession spark = SparkSession.builder()
.sparkContext(context)
.config(conf)
.appName("Simple Application")
.getOrCreate();
Dataset<Row> df = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "flink-demo-input-topic")
.option("kafka.group.id", "spark-example-app")
.option("startingOffsets", "earliest")
.option("kafka." + ConsumerConfig.CHECK_CRCS_CONFIG, "false")
.load();
df.foreach((ForeachFunction<Row>) row -> {
System.out.println(row);
});
}
}
В примере выше использовался Apache Spark 2.12:3.5.3 с зависимостью на org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.3
.
Примечание
Сейчас поддержана не вся функциональность Flink при чтении и записи. Существуют следующие ограничения:
- Работа exactly-once по Kafka API сейчас не поддержана, так как поддержка транзакций в Kafka API сейчас в разработке;
- Подписка на топики через паттерн сейчас недоступна;
- Использование CreateTime сообщения в качестве watermark сейчас недоступно, так как вместо CreateTime используется текущее время вычитки.
public class YdbKafkaApiReadExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
.enableCheckpointing(5000, CheckpointingMode.AT_LEAST_ONCE);
Configuration config = new Configuration();
config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file:///path/to/your/checkpoints");
env.configure(config);
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setProperty(ConsumerConfig.CHECK_CRCS_CONFIG, "false")
.setGroupId("flink-demo-consumer")
.setTopics("my-topic")
.setStartingOffsets(OffsetsInitializer.earliest())
.setBounded(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source").print();
env.execute("YDB Kafka API example read app");
}
}
В примере выше используется Apache Flink версии 1.20 и flink datastream connector к Kafka.
Частые проблемы и их решение
Ошибка Unexpected error in join group response
Полный текст ошибки:
Unexpected error in join group response: This most likely occurs because of a request being malformed by the client library or the message was sent to an incompatible broker. See the broker logs for more details.
Скорее всего проблема в том, что не указано имя читателя или указанное имя читателя не существует в кластере YDB.
Решение: создайте читателя с помощью CLI или SDK
Запись
Примечание
Сейчас не поддержана запись по Kafka API с использованием Kafka транзакций. Транзакции доступны только при использовании
YDB Topic API.
В остальном запись в Apache Kafka и в YDB Topics через Kafka API ничем не отличается.
Примечание
При использовании консольных утилит Kafka с Java 23 и получении ошибки
java.lang.UnsupportedOperationException: getSubject is supported only if a security manager is allowed
, либо запустите команду, используя другую версию Java (как сменить версию Java на macos)
, либо запустите команду, указав для java флаг -Djava.security.manager=allow
.
Например: KAFKA_OPTS=-Djava.security.manager=allow kafka-topics --boostratp-servers localhost:9092 --list
kafka-console-producer --broker-list localhost:9092 --topic my-topic
echo "test message" | kcat -P \
-b <ydb-endpoint> \
-t <topic-name> \
-k key
String HOST = "<ydb-endpoint>";
String TOPIC = "<topic-name>";
Properties props = new Properties();
props.put("bootstrap.servers", HOST);
props.put("acks", "all");
props.put("key.serializer", StringSerializer.class.getName());
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("compression.type", "none");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, String>(TOPIC, "msg-key", "msg-body"));
producer.flush();
producer.close();
Apache Spark при работе с Kafka не использует ничего из существующих ограничений Kafka API в YDB Topics.
Благодаря этому использование Spark с Ydb topics возможно в полном объеме.
public class ExampleWriteApp {
public static void main(String[] args) {
var conf = new SparkConf().setAppName("my-app").setMaster("local");
var context = new SparkContext(conf);
context.setCheckpointDir("path/to/dir/with/checkpoints");
SparkSession spark = SparkSession.builder()
.sparkContext(context)
.config(conf)
.appName("Simple Application")
.getOrCreate();
spark
.createDataset(List.of("spark-1", "spark-2", "spark-3", "spark-4"), Encoders.STRING())
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "flink-demo-output-topic")
.option("kafka.group.id", "spark-example-app")
.option("startingOffsets", "earliest")
.save();
}
}
В примере выше использовался Apache Spark 2.12:3.5.3 с зависимостью на org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.3
.
Примечание
Сейчас поддержана не вся функциональность Flink при чтении и записи. Существуют следующие ограничения:
- Работа exactly-once по Kafka API сейчас не поддержана, так как поддержка транзакций в Kafka API сейчас в разработке;
- Подписка на топики через паттерн сейчас недоступна;
- Использование CreateTime сообщения в качестве watermark сейчас недоступно, так как вместо CreateTime используется текущее время вычитки.
public class YdbKafkaApiProduceExample {
private static final String TOPIC = "my-topic";
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Sink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers("localhost:9092") // assuming ydb is running locally with kafka proxy on 9092 port
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(TOPIC)
.setValueSerializationSchema(new SimpleStringSchema())
.setKeySerializationSchema(new SimpleStringSchema())
.build())
.setRecordSerializer((el, ctx, ts) -> new ProducerRecord<>(TOPIC, el.getBytes()))
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
env.setParallelism(1)
.fromSequence(0, 10)
.map(i -> i + "")
.sinkTo(kafkaSink);
// Execute program, beginning computation.
env.execute("ydb_kafka_api_write_example");
}
}
В примере выше используется Apache Flink версии 1.20 и flink datastream connector к Kafka.
output {
kafka {
codec => json
topic_id => "<topic-name>"
bootstrap_servers => "<ydb-endpoint>"
compression_type => none
}
}
[OUTPUT]
name kafka
match *
Brokers <ydb-endpoint>
Topics <topic-name>
rdkafka.client.id Fluent-bit
rdkafka.request.required.acks 1
rdkafka.log_level 7
rdkafka.sasl.mechanism PLAIN
Примеры с аутентификацией
Подробнее про аутентификацию смотри в разделе Аутентификация. Ниже есть примеры аутентификации в облачной базе
и в локальной базе.
Примечание
Сейчас единственным доступным механизмом аутентификации с Kafka API в YDB Topics является SASL_PLAIN
.
Примеры аутентификации в самостоятельно развернутом YDB
Для того, чтобы проверить работу с аутентификацией в локальной базе:
- Создайте пользователя. Как это сделать в YQL. Как выполнить YQL из CLI.
- Подключитесь к Kafka API, как в примерах ниже. Во всех примерах предполагается, что:
- YDB запущен локально с переменной окружения YDB_KAFKA_PROXY_PORT=9092 - то есть Kafka API доступен по адресу localhost:9092. Например можно поднять YDB в докере, как указано здесь.
- это имя пользователя, которое вы указали при создании пользователя. - это пароль пользователя, который вы указали при создании пользователя.
Примеры показаны для чтения, но те же самые параметры конфигурации работают и для записи в топик.
Примечание
При использовании консольных утилит Kafka с Java 23 и получении ошибки
java.lang.UnsupportedOperationException: getSubject is supported only if a security manager is allowed
, либо запустите команду, используя другую версию Java (как сменить версию Java на macos)
, либо запустите команду, указав для java флаг -Djava.security.manager=allow
.
Например: KAFKA_OPTS=-Djava.security.manager=allow kafka-topics --boostratp-servers localhost:9092 --list
kafka-console-consumer --bootstrap-server localhost:9092 \
--topic <topic-name> \
--group <consumer-name> \
--from-beginning \
--consumer-property check.crcs=false \
--consumer-property partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor \
--consumer-property security.protocol=SASL_PLAINTEXT \
--consumer-property sasl.mechanism=PLAIN \
--consumer-property "sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<username>\" password=\"<password>\";"
kcat -C \
-b localhost:9092 \
-X security.protocol=SASL_PLAINTEXT \
-X sasl.mechanism=PLAIN \
-X sasl.username="<username>" \
-X sasl.password="<password>" \
-X check.crcs=false \
-X partition.assignment.strategy=roundrobin \
-G <consumer-name> <topic-name>
String TOPIC = "<topic-name>";
String CONSUMER = "<consumer-name>";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("check.crcs", false);
props.put("partition.assignment.strategy", RoundRobinAssignor.class.getName());
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config", "sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<username>\" password=\"<password>\";");
props.put("group.id", CONSUMER);
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(new String[] {TOPIC}));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(10000); // timeout 10 sec
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.key() + ":" + record.value());
}
}