Примеры настройки коннекторов
В разделе приведены примеры файлов настройки коннекторов Kafka Connect для работы с YDB по протоколу Kafka.
Из файла в YDB
Пример файла настроек FileSource коннектора /etc/kafka-connect-worker/file-sink.properties
для переноса данных из файла в топик:
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/etc/kafka-connect-worker/file_to_read.json
topic=<topic-name>
Из YDB в PostgreSQL
Пример файла настроек JDBCSink коннектора /etc/kafka-connect-worker/jdbc-sink.properties
для переноса данных из топика в таблицу PostgreSQL. Используется коннектор Kafka Connect JDBC Connector.
name=postgresql-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.url=jdbc:postgresql://<postgresql-host>:<postgresql-port>/<db>
connection.user=<pg-user>
connection.password=<pg-user-pass>
topics=<topic-name>
batch.size=2000
auto.commit.interval.ms=1000
transforms=wrap
transforms.wrap.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.wrap.field=data
auto.create=true
insert.mode=insert
pk.mode=none
auto.evolve=true
Из PostgreSQL в YDB
Пример файла настроек JDBCSource коннектора /etc/kafka-connect-worker/jdbc-source.properties
для переноса данных из PostgreSQL таблицы в топик. Используется коннектор Kafka Connect JDBC Connector.
name=postgresql-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.url=jdbc:postgresql://<postgresql-host>:<postgresql-port>/<db>
connection.user=<pg-user>
connection.password=<pg-user-pass>
mode=bulk
query=SELECT * FROM "<topic-name>";
topic.prefix=<topic-name>
poll.interval.ms=1000
validate.non.null=false
Из YDB в S3
Пример файла настроек S3Sink коннектора /etc/kafka-connect-worker/s3-sink.properties
для переноса данных из топика в S3. Используется коннектор Aiven's S3 Sink Connector for Apache Kafka.
name=s3-sink
connector.class=io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector
topics=<topic-name>
aws.access.key.id=<s3-access-key>
aws.secret.access.key=<s3-secret>
aws.s3.bucket.name=<bucket-name>
aws.s3.endpoint=<s3-endpoint>
format.output.type=json
file.compression.type=none