kafka备份方案

备份方案

使用两个kafka connectors,一个sink connector向外部数据源(S3、ES、Hive)导出数据,一个source connector从外部源恢复数据
使用connect同步数据到s3,然后清除60天以前的数据,可以达到备份+归档且释放硬盘空间的目的

一些背景

kafka connect

是kafka用来和其他数据系统交换数据流的工具,支持分布式,可扩展,offset原子性。

standalone模式

单机模式,服务启动时需要connector.properties文件,进而直接执行指定的connector,进行数据同步
###distributed模式
分布式模式,启动只需要worker.properties,都是服务参数,例如bootstrap_server,converter,plugin.path
需要启动connector任务时,需要通过rest接口发布任务,然后kafka-connect会自动分配任务并执行

kafka backup

https://github.com/itadventurer/kafka-backup
一个个人实现的kafka备份工具,思路上没有问题,不知道是否有坑,star数较少,且只保证kafka 2.2版本以上没问题,基本可以考虑放弃

kafka-connect-s3

confluent提供的kafka同步s3桶的connector插件,可靠性比较强,但是需要手动封装/集成脚本来实现备份和恢复
还有个问题是__consumer_offset不会自动备份

启动kafka connect

配置kafka_worker.properties

standalone模式的配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# kafka启动地址
bootstrap.servers=common1-t1.kafka.s.news:9093,common2-t1.kafka.s.news:9093,common3-t1.kafka.s.news:9093

# kafka数据和外部数据交换,数据格式可能不同,使用converter来定义变换方式。默认一般是json或者string
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

# 本地offset存储文件
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

# 插件目录,kafka-connect-s3解压后的目录
plugin.path=/home/kafkaInstance/kafka.s.afnews/plugins

distributed模式的配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# kafka启动地址
bootstrap.servers=common1-t1.kafka.s.news:9093,common2-t1.kafka.s.news:9093,common3-t1.kafka.s.news:9093

# 集群id,注意也不能和kafka broker的集群id重复
group.id=connect-cluster-1

# kafka数据和外部数据交换,数据格式可能不同,使用converter来定义变换方式。默认一般是json或者string
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

offset.storage.topic=connect-offsets
offset.storage.replication.factor=2
#offset.storage.partitions=25

config.storage.topic=connect-configs
config.storage.replication.factor=2

status.storage.topic=connect-status
status.storage.replication.factor=2
#status.storage.partitions=5

# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

# 插件目录,kafka-connect-s3解压后的目录
plugin.path=/home/kafkaInstance/kafka.s.afnews/plugins

配置kafka_connect_s3.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
name=s3-sink
connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=1
# 设定桶中顶级目录,测试环境用test
topics.dir=test
# 指定同步消息
topics=video_generate_gif_event
# 也可以使用正则
# topics.regex=.*video.*

s3.region=eu-central-1 # 桶所在地区
s3.bucket.name=afnews-kafka-backup # 桶名称
s3.part.size=5242880 # 单个文件大小
flush.size=50 # 集合50条存一个文件

storage.class=io.confluent.connect.s3.storage.S3Storage
# s3中存储格式
format.class=io.confluent.connect.s3.format.json.JsonFormat
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner

schema.compatibility=NONE

distributed模式需要调用rest接口,只支持json数据,需要改写成下方格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"name": "s3-sink",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": 1,
"topics.dir": "test",
"topics.regex": ".*video.*",
"s3.region": "eu-central-1",
"s3.bucket.name": "afnews-kafka-backup",
"s3.part.size": 5242880,
"flush.size": 100,
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility": "NONE"
}
}

standalone模式启动使用

1
2
# 传递两个参数,第一个是worker配置,第二个是要启动的connector的配置
sh /home/kafka/bin/connect-standalone.sh conf/kafka_worker_standalone.properties conf/kafka_connect_s3.properties

distributed模式启动使用

1
2
# 只需要一个worker配置
sh /home/kafka/bin/connect-distributed.sh conf/kafka_worker.properties

然后通过post请求,来发起connector任务,比如

1
2
3
curl -X POST -H "Content-Type: application/json" \
--data "@/home/kafkaInstance/kafka.s.afnews/conf/kafka_connect_s3.json" \
http://localhost:8083/connectors

参考:https://medium.com/@anatolyz/introducing-kafka-backup-9dc0677ea7ee

作者

LePhee

发布于

2020-01-02

更新于

2020-01-02

许可协议

评论