使用canal监听binlog将数据发送到RocketMQ同步到es
写在前面今天不学习,明天变垃圾。最近在学习如何使用canal监听binlog并且将数据同步到es,俗话说好记性不如烂笔头。所以写一篇文章记录一下,一是为了健忘的自己,二是为了恰好有此需求的小可爱欢迎大家学习讨论。一、开启mysql的binlog写入功能1.在mysql的my.ini配置文件中加入下面的配置log-bin=mysql-binbinlog-format=ROWserver_id=12.
·
写在前面
今天不学习,明天变垃圾。最近在学习如何使用canal监听binlog并且将数据同步到es,俗话说好记性不如烂笔头,所以写一篇文章记录一下,一是为了健忘的自己,二是为了恰好有此需求的小可爱(程序员都是小可爱)欢迎大家学习讨论。
一、开启mysql的binlog写入功能
1.在mysql的my.ini配置文件中加入下面的配置
log-bin=mysql-bin
binlog-format=ROW
server_id=1
2.重启mysql服务
查看是否开启log_bin,在mysql中执行:show variables like '%log_bin%';
看到login_bin为ON代表已经开启
3.创建一个有相关权限的mysql slave账号
# 创建账号
CREATE USER canal IDENTIFIED WITH MYSQL_NATIVE_PASSWORD BY 'canal';
# 给账号赋权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
# 刷新
FLUSH PRIVILEGES;
二、下载canal.deployer
canal下载地址:Releases · alibaba/canal · GitHub
1.修改conf/canal.properties配置文件
# zookeeper集群的地址和端口
canal.zkServers = 127.0.0.1:2181
# 默认为tcp,我这里选择将监听到的消息发送到rocketMQ
canal.serverMode = rocketmq
# 当前server上部署的instance列表,默认为example
canal.destinations = example
# mq的地址和端口
canal.mq.servers = 127.0.0.1:9876
# 创建的mysql slave账号和密码
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
##################################################
######### RocketMQ #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 127.0.0.1:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
2.修改conf/example/instance.properties配置文件
# mysql从服务Id,未被其他mysql服务使用即可
canal.instance.mysql.slaveId = 1234
# mysql主服务的地址和端口
canal.instance.master.address = 127.0.0.1:3306
# 指定要监听的数据库
canal.instance.defaultDatabaseName = test
# username/password 数据库的用户名和密码
canal.instance.dbUsername=root
canal.instance.dbPassword=123456
# 需要监听的表的表达式,我这里只监听了test数据库下的binlog_test表
# 1. 所有表:.* or .*\\..*
# 2. canal schema下所有表: canal\\..*
# 3. canal下的以canal打头的表:canal\\.canal.*
# 4. canal schema下的一张表:canal.test1
# 5. 多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
canal.instance.filter.regex=test\\.binlog_test
# rocketMQ的topic,如果使用rabbitMQ此处配置rabbitMQ的routingkey
canal.mq.topic=ROCKET_TEST
三、启动canal服务
1.启动canal服务:\bin\startup.bat
2.启动zookeeper:\bin\zkServer.cmd
3.启动rocketMQ
这里给大家推荐一个rocketMQ的详细安装使用教程,我这里就不再赘述了,点击传送门
四、修改表中的数据,查看结果
表中原数据
修改其中一条数据然后保存
然后到rocketMQ的可视化界面查看,此时已经出现了一条消息,说明canal监听成功
点开消息详情可以看到具体的消息变更内容
五、编写rocketMQ的消费者,将数据添加到es
1.创建一个项目,添加相关依赖
<dependencies>
<!-- web依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- rocketMQ依赖 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.1</version>
</dependency>
<!-- lombok依赖 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- es依赖 -->
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
<version>4.12</version>
</dependency>
</dependencies>
2.编写一个dto(数据传输对象)用来接收消息中的消息体,因为我们只需要拿到消息体
@Data
public class RocketConsumerDTO {
private JSONArray data;
private String database;
private Long es;
private JSONArray old;
private String table;
private Long ts;
private String type;
}
3.编写es的文档对象
@Document(indexName = "user",type = "docs")
@Data
public class UserDoc {
@Id
private Long id;
/** 用户名 **/
@Field(type = FieldType.Text,analyzer = "ik_smart",searchAnalyzer = "ik_smart")
private String name;
/** 年龄 **/
private Integer age;
/** 状态 **/
private Integer status;
}
4.编写rocketMQ的消费者
public void testRocketConsumer() throws MQClientException {
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("CANAL_TEST_GROUP");
//设置name server 地址
defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");
//从开始位置消费
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//广播模式
defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
//订阅
defaultMQPushConsumer.subscribe("canal_test", "");
//注册消息监听器
defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
try {
for (MessageExt message : list) {
// 获取消息体,将消息体转换为dto
RocketConsumerDTO rocketMq = JSON.parseObject(message.getBody(), RocketConsumerDTO.class);
// 获取data的数据,data中的数据就是数据库中修改过的新数据
JSONArray data = rocketMq.getData();
// 遍历data数组,拿到每一条数据
for (int i = 0; i < data.size(); i++) {
// 通过循环获取对象,一个对象对应数据库中的一条记录
JSONObject obj = data.getJSONObject(i);
// 将获取到的对象转换为文档对象
UserDoc userDoc = JSON.parseObject(JSON.toJSONString(obj), UserDoc.class);
// 通过repository添加到es中
esRepository.save(userDoc);
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}catch (Exception e){
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
defaultMQPushConsumer.start();
// 这里我使用的spring的单元测试,让程序睡眠一会儿,不然消费者一创建就会死亡,没有消费时间
try {
Thread.sleep(10*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
执行消费者后,可以在kibana中看到数据已经被同步到es中了
六、结语
千山万水总是情,给我点赞行不行,人间自有真情在,一键三连会更帅。初写文章,不善表达,多多包涵,本篇文章到这里就结束了,欢迎大家学习讨论。
更多推荐
已为社区贡献1条内容
所有评论(0)