Kafka API usage examples
This example shows a code snippet for reading data from a topic via Kafka API without a consumer group (Manual Partition Assignment).
You don't need to create a consumer for this reading mode.
Before proceeding with the examples:
- Create a topic.
- Add a consumer.
- If authentication is enabled, create a user.
How to try the Kafka API
In Docker
Run Docker following the quickstart guide, and the Kafka API will be available on port 9092.
Kafka API usage examples
Reading
Consider the following limitations of using the Kafka API for reading:
- No support for the check.crcs option.
- Only one partition assignment strategy -
roundrobin
. - No reading without a pre-created consumer group.
Therefore, in the consumer configuration, you must always specify the consumer group name and the parameters:
check.crc=false
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
Below are examples of reading using the Kafka protocol for various applications, programming languages, and frameworks without authentication.
For examples of how to set up authentication, see Authentication examples.
Note
If you get the java.lang.UnsupportedOperationException: getSubject is supported only if a security manager is allowed
error when using Kafka CLI tools with Java 23, perform one of the following steps:
- Run the command using a different version of Java (how to change the Java version on macOS).
- Run the command with the Java flag
-Djava.security.manager=allow
. For example:KAFKA_OPTS=-Djava.security.manager=allow kafka-topics --bootstrap-servers localhost:9092 --list
.
kafka-console-consumer --bootstrap-server localhost:9092 \
--topic my-topic \
--group my-group \
--from-beginning \
--consumer-property check.crcs=false \
--consumer-property partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
kcat -C \
-b <ydb-endpoint> \
-X check.crcs=false \
-X partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor \
-G <consumer-name> <topic-name>
String HOST = "<ydb-endpoint>";
String TOPIC = "<topic-name>";
String CONSUMER = "<consumer-name>";
Properties props = new Properties();
props.put("bootstrap.servers", HOST);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("check.crcs", false);
props.put("partition.assignment.strategy", RoundRobinAssignor.class.getName());
props.put("group.id", CONSUMER);
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(new String[] {TOPIC}));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(10000); // timeout 10 sec
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.key() + ":" + record.value());
}
}
When working with Kafka, Apache Spark does not use any of the Kafka API features that are currently not supported in YDB Topics. Therefore, all features of Spark-Kafka integrations should work through the YDB Topics Kafka API.
public class ExampleReadApp {
public static void main(String[] args) {
var conf = new SparkConf().setAppName("my-app").setMaster("local");
var context = new SparkContext(conf);
context.setCheckpointDir("checkpoints");
SparkSession spark = SparkSession.builder()
.sparkContext(context)
.config(conf)
.appName("Simple Application")
.getOrCreate();
Dataset<Row> df = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "flink-demo-input-topic")
.option("kafka.group.id", "spark-example-app")
.option("startingOffsets", "earliest")
.option("kafka." + ConsumerConfig.CHECK_CRCS_CONFIG, "false")
.load();
df.foreach((ForeachFunction<Row>) row -> {
System.out.println(row);
});
}
}
In the example above, Apache Spark 2.12:3.5.3, with a dependency on org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.3
, was used.
Note
Currently, not all functionality of Flink is supported for reading and writing. The following limitations exist:
-
Exactly-once functionality via the Kafka API is not supported at the moment because transaction support in the Kafka API is still under development.
-
Subscription to topics using a pattern is currently unavailable.
-
Using message
CreateTime
as a watermark is not available at the moment because the current read time is used instead ofCreateTime
(this will be fixed in future versions).
public class YdbKafkaApiReadExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
.enableCheckpointing(5000, CheckpointingMode.AT_LEAST_ONCE);
Configuration config = new Configuration();
config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file:///path/to/your/checkpoints");
env.configure(config);
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setProperty(ConsumerConfig.CHECK_CRCS_CONFIG, "false")
.setGroupId("flink-demo-consumer")
.setTopics("my-topic")
.setStartingOffsets(OffsetsInitializer.earliest())
.setBounded(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source").print();
env.execute("YDB Kafka API example read app");
}
}
In the example above, Apache Flink 1.20 is used with the Flink DataStream connector for Kafka.
Frequent problems and solutions
Unexpected error in join group response
Full text of an exception:
Unexpected error in join group response: This most likely occurs because of a request being malformed by the client library or the message was sent to an incompatible broker. See the broker logs for more details.
Most likely it means that a consumer group is not specified or, if specified, it does not exist in the YDB cluster.
Solution: create a consumer group in YDB using CLI or SDK.
Writing
Note
Using Kafka transactions when writing via Kafka API is currently not supported. Transactions are only available when using the YDB Topic API.
Otherwise, writing to Apache Kafka and YDB Topics through Kafka API is no different.
Note
If you get the java.lang.UnsupportedOperationException: getSubject is supported only if a security manager is allowed
error when using Kafka CLI tools with Java 23, perform one of the following steps:
- Run the command using a different version of Java (how to change the Java version on macOS).
- Run the command with the Java flag
-Djava.security.manager=allow
. For example:KAFKA_OPTS=-Djava.security.manager=allow kafka-topics --bootstrap-servers localhost:9092 --list
.
kafka-console-producer --broker-list localhost:9092 --topic my-topic
echo "test message" | kcat -P \
-b <ydb-endpoint> \
-t <topic-name> \
-k key
String HOST = "<ydb-endpoint>";
String TOPIC = "<topic-name>";
Properties props = new Properties();
props.put("bootstrap.servers", HOST);
props.put("acks", "all");
props.put("key.serializer", StringSerializer.class.getName());
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("compression.type", "none");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, String>(TOPIC, "msg-key", "msg-body"));
producer.flush();
producer.close();
When working with Kafka, Apache Spark does not use any of the Kafka API features that are currently not supported in YDB Topics. Therefore, all features of Spark-Kafka integrations should work through the YDB Topics Kafka API.
public class ExampleWriteApp {
public static void main(String[] args) {
var conf = new SparkConf().setAppName("my-app").setMaster("local");
var context = new SparkContext(conf);
context.setCheckpointDir("path/to/dir/with/checkpoints");
SparkSession spark = SparkSession.builder()
.sparkContext(context)
.config(conf)
.appName("Simple Application")
.getOrCreate();
spark
.createDataset(List.of("spark-1", "spark-2", "spark-3", "spark-4"), Encoders.STRING())
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "flink-demo-output-topic")
.option("kafka.group.id", "spark-example-app")
.option("startingOffsets", "earliest")
.save();
}
}
In the example above, Apache Spark 2.12:3.5.3, with a dependency on org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.3
, was used.
Note
Currently, not all functionality of Flink is supported for reading and writing. The following limitations exist:
-
Exactly-once functionality via the Kafka API is not supported at the moment because transaction support in the Kafka API is still under development.
-
Subscription to topics using a pattern is currently unavailable.
-
Using message
CreateTime
as a watermark is not available at the moment because the current read time is used instead ofCreateTime
(this will be fixed in future versions).
public class YdbKafkaApiProduceExample {
private static final String TOPIC = "my-topic";
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Sink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers("localhost:9092") // assuming ydb is running locally with kafka proxy on 9092 port
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(TOPIC)
.setValueSerializationSchema(new SimpleStringSchema())
.setKeySerializationSchema(new SimpleStringSchema())
.build())
.setRecordSerializer((el, ctx, ts) -> new ProducerRecord<>(TOPIC, el.getBytes()))
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
env.setParallelism(1)
.fromSequence(0, 10)
.map(i -> i + "")
.sinkTo(kafkaSink);
// Execute program, beginning computation.
env.execute("ydb_kafka_api_write_example");
}
}
In the example above, Apache Flink 1.20 is used with the Flink DataStream connector for Kafka.
output {
kafka {
codec => json
topic_id => "<topic-name>"
bootstrap_servers => "<ydb-endpoint>"
compression_type => none
}
}
[OUTPUT]
name kafka
match *
Brokers <ydb-endpoint>
Topics <topic-name>
rdkafka.client.id Fluent-bit
rdkafka.request.required.acks 1
rdkafka.log_level 7
rdkafka.sasl.mechanism PLAIN
Authentication examples
For more details on authentication, see the Authentication section. Below are examples of authentication in a cloud database and a local database.
Note
Currently, the only available authentication mechanism with Kafka API in YDB Topics is SASL_PLAIN
.
Authentication examples in on-prem YDB
To use authentication in a multinode self-deployed database:
-
Create a user. How to do this in YQL. How to execute YQL from CLI.
-
Connect to the Kafka API as shown in the examples below. In all examples, it is assumed that:
-
YDB is running locally with the environment variable
YDB_KAFKA_PROXY_PORT=9092
, meaning that the Kafka API is available atlocalhost:9092
. For example, you can run YDB in Docker as described here. -
is the username you specified when creating the user. -
is the user's password you specified when creating the user.
-
Examples are shown for reading, but the same configuration parameters work for writing to a topic as well.
Note
If you get the java.lang.UnsupportedOperationException: getSubject is supported only if a security manager is allowed
error when using Kafka CLI tools with Java 23, perform one of the following steps:
- Run the command using a different version of Java (how to change the Java version on macOS).
- Run the command with the Java flag
-Djava.security.manager=allow
. For example:KAFKA_OPTS=-Djava.security.manager=allow kafka-topics --bootstrap-servers localhost:9092 --list
.
kafka-console-consumer --bootstrap-server localhost:9092 \
--topic <topic-name> \
--group <consumer-name> \
--from-beginning \
--consumer-property check.crcs=false \
--consumer-property partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor \
--consumer-property security.protocol=SASL_PLAINTEXT \
--consumer-property sasl.mechanism=PLAIN \
--consumer-property "sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<username>\" password=\"<password>\";"
kcat -C \
-b localhost:9092 \
-X security.protocol=SASL_PLAINTEXT \
-X sasl.mechanism=PLAIN \
-X sasl.username="<username>" \
-X sasl.password="<password>" \
-X check.crcs=false \
-X partition.assignment.strategy=roundrobin \
-G <consumer-name> <topic-name>
String TOPIC = "<topic-name>";
String CONSUMER = "<consumer-name>";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("check.crcs", false);
props.put("partition.assignment.strategy", RoundRobinAssignor.class.getName());
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config", "sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<username>\" password=\"<password>\";");
props.put("group.id", CONSUMER);
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(new String[] {TOPIC}));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(10000); // timeout 10 sec
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.key() + ":" + record.value());
}
}