Springboot -- 整合Canal实现数据库更改的实时监听
文章目录Canal介绍Canal安装SpringBoot 整合CanalCanal介绍Canal安装SpringBoot 整合Canal
Canal介绍
查看Canal官网信息
Canal译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
基于日志增量订阅和消费的业务包括:
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
Canal原理
官网地址 https://gitee.com/bwaylon/canal
原理在官网的介绍很清晰明了,总之就是把自己伪装成Mysql集群中的从节点,向主节点发送dump协议,从而接收来自主节点推送的binary log,并通过解析binary log实现增量数据的实时获取。
Canal安装
下载
https://github.com/alibaba/canal/releases
从官网下载最新的 应用文件
解压并修改配置文件 conf/example/instance.properties
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .*\\…*
数据库开启 binary log 修改my.cnf ,并重启Mysql服务
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
运行下面的命令
CREATE USER canal IDENTIFIED BY ‘canal’;
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘canal’@’%’;
– GRANT ALL PRIVILEGES ON . TO ‘canal’@’%’ ;
FLUSH PRIVILEGES;
启动
windows 运行startup.cmd
linux 运行startup.sh
SpringBoot 整合Canal
代码
这里主要是利用 ApplicationRunner创建一个在Springboot应用启动成功之后的初始化操作
添加依赖,其自带的日志框架可能会导致冲突,如有冲突,排除掉即可
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.0.12</version>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
</exclusions>
</dependency>
配置属性
@Data
@ConfigurationProperties(prefix = "custom.canal")
public class CanalProperties {
private String ip;//Canal IP
private Integer port;//Canal Port
// canal.properties 配置canal.destinations后,需要创建对应的文件夹,并各自有一份instance.properties
private String destination;// 目标配置 即放目标instance.properties的文件夹名字
private String username;//用户名
private String password;//密码
}
@Slf4j
@Configuration
public class CanalListener implements ApplicationRunner {
private static CanalProperties properties;
@Autowired
public void setProperties(CanalProperties properties) {
this.properties = properties;
}
@Override
public void run(ApplicationArguments args) throws Exception {
//开始干活
dowork();
}
//官方示例代码
public void dowork(){
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(properties.getIp(),
properties.getPort()), properties.getDestination(), properties.getUsername(), properties.getPassword());
int batchSize = 1000;
try {
connector.connect();
connector.subscribe(properties.getSubscribe());
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
log.error("cannal wait error");
}
} else {
run(message.getEntries());
}
connector.ack(batchId); // 提交确认接收成功
// connector.rollback(batchId); // 处理失败, 回滚数据 ,下次还能接收到这条数据
}
} finally {
connector.disconnect();
}
}
public void run(List<CanalEntry.Entry> entrys) {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChage = null;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
CanalEntry.EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
启动应用之后,我们在配置的数据中作操作时,就会看到控制台打印的内容。
常用配置以及可能遇到的问题、
canal.instance.mysql.slaveId
模仿slave而使用的ID ,不能和数据库my.ini中的配置冲突
canal.instance.filter.regex 过滤设置
instanse.properties中的该配置可以设置canal对哪些格式的表进行监听,格式如下:
- .\… 监听全部库全部表
- dbname\…* dbname数据库下的所有表
- dbname\.aa.* dbname数据库下 aa 开头的表、
- dbname.table1 指定数据库及表
- dbname\.aa.* ,dbname.table1 多个规则用 , 隔开
在配置过滤器时,要十分注意以下几点:
- 在代码中不要配置CanalConnector.subscribe(”…“),否则配置文件中的过滤配置会被覆盖掉
- 代码中没有配置subscribe但是过滤器仍不生效
我遇到了这个问题但是不知奥原因是什么,可以根据配置文件查找此配置 1、查看 conf/canal.properties 中的canal.instance.global.spring.xml 属性 一般默认为classpath:spring/file-instance.xml 2、打开上面找到的文件 spring/file-instance.xml并修改此处 <!-- 解析过滤处理 --> <property name="eventFilter"> <bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" > <constructor-arg index="0" value="${canal.instance.filter.regex:.*\..*}" /> 更改为 <constructor-arg index="0" value="${canal.instance.filter.regex}" /> </bean> </property> 删掉缺省值配置并重启canal就可以了,后来我又把该配置还原,再次重启还仍然生效。。。懵了
canal.instance.filter.black.regex 黑名单设置
设置类名单的表将不被监听,要注意如果表的规则既满足过滤正则,又满足黑名单的正则,那么他将不被监听
更多推荐
所有评论(0)