背景:
针对老系统重构涉及业务系统众多,全链路部署测试环境耗时较长,于是有了kafka中间层集群之间数据同步的想法。

方案:
1、Kafka自带的镜像工具kafka-mirror-maker.sh

MirrorMaker是Kafka附带的一个用于在Kafka集群之间制作镜像数据的工具。该工具从源集群中消费并生产到目标群集。

step1:编写consumer-test.properties

bootstrap.servers=172.25.10.18:19092
# consumer group id
group.id=test-consumer-group

step2:编写producer-test.properties

bootstrap.servers=192.168.12.30:19092,192.168.12.30:29092,192.168.12.30:39092

# specify the compression codec for all data generated: none, gzip, snappy, lz4, zstd
compression.type=none

step3:启动kafka-mirror-maker

bin/kafka-mirror-maker.sh --consumer.config config/consumer-test.properties --producer.config config/producer-test.properties --num.streams 8 --whitelist 'customer.order*'

备注:kafka集群之间的borker数不一样可能会出现错误。

2、Flume配置agent

step1:编写配置文件

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = 172.25.10.18:19092
a1.sources.r1.kafka.topics=customer.logindata
a1.sources.r1.kafka.groupId = flume
a1.sources.r1.kafka.consumer.timeout.ms = 100

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers=192.168.12.30:19092
a1.sinks.k1.kafka.topic=customer.logindate
#a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
a1.sinks.k1.kafka.producer.acks=1
a1.sinks.k1.custom.encoding=UTF-8

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

step2:启动agent

bin/flume-ng agent -n a1 -c conf -f conf/flume-calls-kafka.properties -Dflume.root.logger=INFO,console

3、Flink硬编码

flink与flume有点类似,都是通过source获取数据然后通过sink将数据输出到指定环境,起到一个管道的作用。具体代码如下:

//获取运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "172.25.10.18:19092");
        properties.setProperty("group.id", "flink");
        String topic = "customer.logindata";

        DataStreamSource<String> student = env.addSource(new FlinkKafkaConsumer<>(
                topic,   
                new SimpleStringSchema(),
                properties)).setParallelism(1);
        student.print();
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers", "192.168.12.30:19092");
        prop.setProperty("group.id", "flink");

        student.addSink(new FlinkKafkaProducer<String>("customer.logindata1, new SimpleStringSchema(), prop));
        env.execute("flink kafka to kafka");
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐