备份方案 使用两个kafka connectors,一个sink connector向外部数据源(S3、ES、Hive)导出数据,一个source connector从外部源恢复数据 使用connect同步数据到s3,然后清除60天以前的数据,可以达到备份+归档且释放硬盘空间的目的
一些背景 kafka connect 是kafka用来和其他数据系统交换数据流的工具,支持分布式,可扩展,offset原子性。
standalone模式 单机模式,服务启动时需要connector.properties文件,进而直接执行指定的connector,进行数据同步 ###distributed模式 分布式模式,启动只需要worker.properties,都是服务参数,例如bootstrap_server,converter,plugin.path 需要启动connector任务时,需要通过rest接口发布任务,然后kafka-connect会自动分配任务并执行
kafka backup https://github.com/itadventurer/kafka-backup 一个个人实现的kafka备份工具,思路上没有问题,不知道是否有坑,star数较少,且只保证kafka 2.2版本以上没问题,基本可以考虑放弃
kafka-connect-s3 confluent提供的kafka同步s3桶的connector插件,可靠性比较强,但是需要手动封装/集成脚本来实现备份和恢复 还有个问题是__consumer_offset不会自动备份
启动kafka connect 配置kafka_worker.properties standalone模式的配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 # kafka启动地址 bootstrap.servers=common1-t1.kafka.s.news:9093,common2-t1.kafka.s.news:9093,common3-t1.kafka.s.news:9093 # kafka数据和外部数据交换,数据格式可能不同,使用converter来定义变换方式。默认一般是json或者string key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter # 本地offset存储文件 offset.storage.file.filename=/tmp/connect.offsets # Flush much faster than normal, which is useful for testing/debugging offset.flush.interval.ms=10000 # 插件目录,kafka-connect-s3解压后的目录 plugin.path=/home/kafkaInstance/kafka.s.afnews/plugins
distributed模式的配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 # kafka启动地址 bootstrap.servers=common1-t1.kafka.s.news:9093,common2-t1.kafka.s.news:9093,common3-t1.kafka.s.news:9093 # 集群id,注意也不能和kafka broker的集群id重复 group.id=connect-cluster-1 # kafka数据和外部数据交换,数据格式可能不同,使用converter来定义变换方式。默认一般是json或者string key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter offset.storage.topic=connect-offsets offset.storage.replication.factor=2 #offset.storage.partitions=25 config.storage.topic=connect-configs config.storage.replication.factor=2 status.storage.topic=connect-status status.storage.replication.factor=2 #status.storage.partitions=5 # Flush much faster than normal, which is useful for testing/debugging offset.flush.interval.ms=10000 # 插件目录,kafka-connect-s3解压后的目录 plugin.path=/home/kafkaInstance/kafka.s.afnews/plugins
配置kafka_connect_s3.properties 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 name=s3-sink connector.class=io.confluent.connect.s3.S3SinkConnector tasks.max=1 # 设定桶中顶级目录,测试环境用test topics.dir=test # 指定同步消息 topics=video_generate_gif_event # 也可以使用正则 # topics.regex=.*video.* s3.region=eu-central-1 # 桶所在地区 s3.bucket.name=afnews-kafka-backup # 桶名称 s3.part.size=5242880 # 单个文件大小 flush.size=50 # 集合50条存一个文件 storage.class=io.confluent.connect.s3.storage.S3Storage # s3中存储格式 format.class=io.confluent.connect.s3.format.json.JsonFormat partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner schema.compatibility=NONE
distributed模式需要调用rest接口,只支持json数据,需要改写成下方格式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 { "name" : "s3-sink" , "config" : { "connector.class" : "io.confluent.connect.s3.S3SinkConnector" , "tasks.max" : 1 , "topics.dir" : "test" , "topics.regex" : ".*video.*" , "s3.region" : "eu-central-1" , "s3.bucket.name" : "afnews-kafka-backup" , "s3.part.size" : 5242880 , "flush.size" : 100 , "storage.class" : "io.confluent.connect.s3.storage.S3Storage" , "format.class" : "io.confluent.connect.s3.format.json.JsonFormat" , "partitioner.class" : "io.confluent.connect.storage.partitioner.DefaultPartitioner" , "schema.compatibility" : "NONE" } }
standalone模式启动使用 1 2 sh /home/kafka/bin/connect-standalone.sh conf/kafka_worker_standalone.properties conf/kafka_connect_s3.properties
distributed模式启动使用 1 2 sh /home/kafka/bin/connect-distributed.sh conf/kafka_worker.properties
然后通过post请求,来发起connector任务,比如
1 2 3 curl -X POST -H "Content-Type: application/json" \ --data "@/home/kafkaInstance/kafka.s.afnews/conf/kafka_connect_s3.json" \ http://localhost:8083/connectors
参考:https://medium.com/@anatolyz/introducing-kafka-backup-9dc0677ea7ee