写在前面 

今天不学习,明天变垃圾。最近在学习如何使用canal监听binlog并且将数据同步到es,俗话说好记性不如烂笔头,所以写一篇文章记录一下,一是为了健忘的自己,二是为了恰好有此需求的小可爱(程序员都是小可爱)欢迎大家学习讨论。

一、开启mysql的binlog写入功能

1.在mysql的my.ini配置文件中加入下面的配置

log-bin=mysql-bin 
binlog-format=ROW
server_id=1

2.重启mysql服务

查看是否开启log_bin,在mysql中执行:show variables like '%log_bin%';

看到login_bin为ON代表已经开启

3.创建一个有相关权限的mysql slave账号

# 创建账号
CREATE USER canal IDENTIFIED WITH MYSQL_NATIVE_PASSWORD BY 'canal';
# 给账号赋权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; 
# 刷新
FLUSH PRIVILEGES;

二、下载canal.deployer

canal下载地址:Releases · alibaba/canal · GitHub

1.修改conf/canal.properties配置文件

# zookeeper集群的地址和端口
canal.zkServers = 127.0.0.1:2181
# 默认为tcp,我这里选择将监听到的消息发送到rocketMQ
canal.serverMode = rocketmq
# 当前server上部署的instance列表,默认为example
canal.destinations = example
# mq的地址和端口
canal.mq.servers = 127.0.0.1:9876
# 创建的mysql slave账号和密码
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
##################################################
#########           RocketMQ         #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 127.0.0.1:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false

2.修改conf/example/instance.properties配置文件

# mysql从服务Id,未被其他mysql服务使用即可
canal.instance.mysql.slaveId = 1234
# mysql主服务的地址和端口
canal.instance.master.address = 127.0.0.1:3306
# 指定要监听的数据库
canal.instance.defaultDatabaseName = test
# username/password 数据库的用户名和密码
canal.instance.dbUsername=root
canal.instance.dbPassword=123456
# 需要监听的表的表达式,我这里只监听了test数据库下的binlog_test表
# 1.  所有表:.*   or  .*\\..*
# 2.  canal schema下所有表: canal\\..*
# 3.  canal下的以canal打头的表:canal\\.canal.*
# 4.  canal schema下的一张表:canal.test1
# 5.  多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
canal.instance.filter.regex=test\\.binlog_test
# rocketMQ的topic,如果使用rabbitMQ此处配置rabbitMQ的routingkey
canal.mq.topic=ROCKET_TEST

三、启动canal服务

1.启动canal服务:\bin\startup.bat

2.启动zookeeper:\bin\zkServer.cmd

3.启动rocketMQ

 这里给大家推荐一个rocketMQ的详细安装使用教程,我这里就不再赘述了,点击传送门

四、修改表中的数据,查看结果

表中原数据

  

修改其中一条数据然后保存

然后到rocketMQ的可视化界面查看,此时已经出现了一条消息,说明canal监听成功

点开消息详情可以看到具体的消息变更内容

五、编写rocketMQ的消费者,将数据添加到es

1.创建一个项目,添加相关依赖

<dependencies>
    <!-- web依赖 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
​
    <!-- rocketMQ依赖 -->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.9.1</version>
    </dependency>
​
    <!-- lombok依赖 -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
​
    <!-- es依赖 -->
    <dependency>
        <groupId>org.springframework.data</groupId>
        <artifactId>spring-data-elasticsearch</artifactId>
    </dependency>
​
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <scope>test</scope>
        <version>4.12</version>
    </dependency>
</dependencies>

2.编写一个dto(数据传输对象)用来接收消息中的消息体,因为我们只需要拿到消息体

@Data
public class RocketConsumerDTO {
    private JSONArray data;
    private String database;
    private Long es;
    private JSONArray old;
    private String table;
    private Long ts;
    private String type;
}

3.编写es的文档对象

@Document(indexName = "user",type = "docs")
@Data
public class UserDoc {
    @Id
    private Long id;
    /** 用户名 **/
    @Field(type = FieldType.Text,analyzer = "ik_smart",searchAnalyzer = "ik_smart")
    private String name;
    /** 年龄 **/
    private Integer age;
    /** 状态 **/
    private Integer status;
}

4.编写rocketMQ的消费者

public void testRocketConsumer() throws MQClientException {
    DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("CANAL_TEST_GROUP");
    //设置name server 地址
    defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");
    //从开始位置消费
    defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    //广播模式
    defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
    //订阅
    defaultMQPushConsumer.subscribe("canal_test", "");
    //注册消息监听器
    defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext           consumeConcurrentlyContext) {
            try {
                for (MessageExt message : list) {
                    // 获取消息体,将消息体转换为dto
                    RocketConsumerDTO rocketMq = JSON.parseObject(message.getBody(),                                            RocketConsumerDTO.class);
                    // 获取data的数据,data中的数据就是数据库中修改过的新数据
                    JSONArray data = rocketMq.getData();
                    // 遍历data数组,拿到每一条数据
                    for (int i = 0; i < data.size(); i++) {
                        // 通过循环获取对象,一个对象对应数据库中的一条记录
                        JSONObject obj = data.getJSONObject(i);
                        // 将获取到的对象转换为文档对象
                        UserDoc userDoc = JSON.parseObject(JSON.toJSONString(obj), UserDoc.class);
                        // 通过repository添加到es中
                        esRepository.save(userDoc);
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }catch (Exception e){
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        }
    });
    defaultMQPushConsumer.start();
    // 这里我使用的spring的单元测试,让程序睡眠一会儿,不然消费者一创建就会死亡,没有消费时间
    try {
        Thread.sleep(10*1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

执行消费者后,可以在kibana中看到数据已经被同步到es中了

六、结语

千山万水总是情,给我点赞行不行,人间自有真情在,一键三连会更帅。初写文章,不善表达,多多包涵,本篇文章到这里就结束了,欢迎大家学习讨论。

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐