spring boot+kafka+canal实现监听MySQL数据库

一、zookeeper安装

kafka依赖于zookeeper,安装kafka前先安装zookeeper

下载地址:Apache ZooKeeper

本次示例使用的是 3.5.9版本,下载后将压缩文件上传至linux环境并且解压

解压后bin目录下有zoo_sample.cfg文件,zookeeper使用的配置文件是zoo.cfg,所以复制一份zoo_sample.cfg重命名为zoo.cfg。配置里面有端口等信息,默认端口为2181

然后启动zookeeper,切换至bin目录下,执行命令: ./zkServer.sh start

停止:./zkServer.sh stop

二、kafka安装

下载地址:Apache Kafka

这里选择的版本为 kafka_2.12-2.8.1.tgz

上传至linux然后解压如下图

修改配置:

1、server.properties 主要修改kafka的端口,以及listeners 两个ip换成安装的服务器ip,zookeeper所在服务器的ip和端口,日志记录目录等。

broker.id=0 port=9092 listeners=PLAINTEXT://ip:9092 advertised.listeners=PLAINTEXT://ip:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/opt/mq/kafka/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0

2、consumer.properties 主要配置group.id,和springboot配置的群组id一致(我没试过不一致会不会失败)

bootstrap.servers=localhost:9080

group.id=default_consumer_group

3、zoopeeker.properties zookeeper的相关配置,不改zookeeper默认端口可以不用管。

启动kafka,,执行命令:bin/kafka-server-start.sh config/server.properties &

停止服务命令:bin/kafka-server-stop.sh

三、MySQL数据库支持canal监控配置

1、查看是否开启binlog模式

show binary logs; 如下为开启。

2、如果没有开启,需要更改mysql的my.cnf文件。文件所在目录不记得了,我的测试环境mysql是装在docker容器中,目录为etc/mysql下

配置如下:

[mysqld] lower_case_table_names=1 max_connections=1000 max_allowed_packet=512M skip-name-resolve 加入以下三行配置开启binlog

server_id=1 log-bin=mysql-bin binlog-format=ROW

四、安装canal

下载地址:Releases · alibaba/canal · GitHub

同样的解压到服务器,使用的版本为1.1.5

1、修改配置:conf目录下canal.properties 主要修改端口以及zookeeper的地址,消息中间件的类型配置为kafka

canal.port = 11111

canal.zkServers = ip:2181

canal.serverMode = kafka

2、conf/example目录下的instance.properties

#数据库地址

canal.instance.master.address=ip:3306

#日志名称,上面数据库语句查询出来的log_name

canal.instance.master.journal.name=mysql-bin.000001

#数据库账号

canal.instance.dbUsername=username

#数据库密码

canal.instance.dbPassword=password

#监听数据库表(可以配置多个、全数据库)

canal.instance.filter.regex=database.tablename

#kafka主题名称

canal.mq.topic=test

3、启动canal

sh startup.sh

五、springboot集成配置,监听来自kafka test主题的消息

添加依赖项:
​​​​​​​
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.3</version>
</dependency>
yml文件配置:
spring:
    kafka:
    listener:
      missing-topics-fatal: false
    bootstrap-servers: ip:9080 #指定kafka server的地址,集群配多个,中间,逗号隔开
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: default_consumer_group #群组ID
      #enable-auto-commit: true
      #auto-commit-interval: 1000
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
@Component
public class ConsumerSms {
    /**
     * 定义此消费者接收topics = "demo"的消息
     * @param record 变量代表消息本身,可以通过ConsumerRecord<?,?>类型的record变量来打印接收的消息的各种信息
     */
    @KafkaListener(topics = "test")
    public void listen (ConsumerRecord<?, ?> record){
        System.out.printf(""+record.value());
    }
}

数据库中,给监听的表插入一条数据,可以看到监听程序输出:

{"data":[{"id":"4","bsm":"444"}],"database":"nmjf","es":1650967508000,"id":2316,"isDdl":false,"mysqlType":{"id":"bigint(0)","bsm":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"bsm":12},"table":"t_sms","ts":1650967508207,"type":"INSERT"}

到此集成成功

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐