Kafka数据同步到kafka的几种方案
1、Kafka自带的镜像工具2、Flume配置agent3、Flink硬编码
·
背景:
针对老系统重构涉及业务系统众多,全链路部署测试环境耗时较长,于是有了kafka中间层集群之间数据同步的想法。
方案:
1、Kafka自带的镜像工具kafka-mirror-maker.sh
MirrorMaker是Kafka附带的一个用于在Kafka集群之间制作镜像数据的工具。该工具从源集群中消费并生产到目标群集。
step1:编写consumer-test.properties
bootstrap.servers=172.25.10.18:19092
# consumer group id
group.id=test-consumer-group
step2:编写producer-test.properties
bootstrap.servers=192.168.12.30:19092,192.168.12.30:29092,192.168.12.30:39092
# specify the compression codec for all data generated: none, gzip, snappy, lz4, zstd
compression.type=none
step3:启动kafka-mirror-maker
bin/kafka-mirror-maker.sh --consumer.config config/consumer-test.properties --producer.config config/producer-test.properties --num.streams 8 --whitelist 'customer.order*'
备注:kafka集群之间的borker数不一样可能会出现错误。
2、Flume配置agent
step1:编写配置文件
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = 172.25.10.18:19092
a1.sources.r1.kafka.topics=customer.logindata
a1.sources.r1.kafka.groupId = flume
a1.sources.r1.kafka.consumer.timeout.ms = 100
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers=192.168.12.30:19092
a1.sinks.k1.kafka.topic=customer.logindate
#a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
a1.sinks.k1.kafka.producer.acks=1
a1.sinks.k1.custom.encoding=UTF-8
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
step2:启动agent
bin/flume-ng agent -n a1 -c conf -f conf/flume-calls-kafka.properties -Dflume.root.logger=INFO,console
3、Flink硬编码
flink与flume有点类似,都是通过source获取数据然后通过sink将数据输出到指定环境,起到一个管道的作用。具体代码如下:
//获取运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "172.25.10.18:19092");
properties.setProperty("group.id", "flink");
String topic = "customer.logindata";
DataStreamSource<String> student = env.addSource(new FlinkKafkaConsumer<>(
topic,
new SimpleStringSchema(),
properties)).setParallelism(1);
student.print();
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", "192.168.12.30:19092");
prop.setProperty("group.id", "flink");
student.addSink(new FlinkKafkaProducer<String>("customer.logindata1, new SimpleStringSchema(), prop));
env.execute("flink kafka to kafka");
更多推荐
已为社区贡献3条内容
所有评论(0)