Canal介绍

查看Canal官网信息
Canal译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
基于日志增量订阅和消费的业务包括:

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

Canal原理

官网地址 https://gitee.com/bwaylon/canal

原理在官网的介绍很清晰明了,总之就是把自己伪装成Mysql集群中的从节点,向主节点发送dump协议,从而接收来自主节点推送的binary log,并通过解析binary log实现增量数据的实时获取。

Canal安装

下载

https://github.com/alibaba/canal/releases

从官网下载最新的 应用文件

解压并修改配置文件 conf/example/instance.properties

canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .*\\…*

数据库开启 binary log 修改my.cnf ,并重启Mysql服务

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

运行下面的命令
CREATE USER canal IDENTIFIED BY ‘canal’;
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘canal’@’%’;
– GRANT ALL PRIVILEGES ON . TO ‘canal’@’%’ ;
FLUSH PRIVILEGES;

启动

windows 运行startup.cmd
linux 运行startup.sh

SpringBoot 整合Canal

代码

这里主要是利用 ApplicationRunner创建一个在Springboot应用启动成功之后的初始化操作

添加依赖,其自带的日志框架可能会导致冲突,如有冲突,排除掉即可

        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.0.12</version>
            <exclusions>
                <exclusion>
                    <groupId>ch.qos.logback</groupId>
                    <artifactId>logback-core</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>ch.qos.logback</groupId>
                    <artifactId>logback-classic</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

配置属性

@Data
@ConfigurationProperties(prefix = "custom.canal")
public class CanalProperties {

    private String  ip;//Canal IP
    private Integer port;//Canal Port
    // canal.properties 配置canal.destinations后,需要创建对应的文件夹,并各自有一份instance.properties
    private String  destination;// 目标配置 即放目标instance.properties的文件夹名字
    private String  username;//用户名
    private String  password;//密码
}  
@Slf4j
@Configuration
public class CanalListener implements ApplicationRunner {

    private static CanalProperties properties;

    @Autowired
    public void setProperties(CanalProperties properties) {
        this.properties = properties;
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        //开始干活 
        dowork();
    }

    //官方示例代码
    public void dowork(){
        // 创建链接  
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(properties.getIp(),
                properties.getPort()), properties.getDestination(), properties.getUsername(), properties.getPassword());
        int batchSize = 1000;
        try {  
            connector.connect();  
            connector.subscribe(properties.getSubscribe());
            connector.rollback();
            while (true) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        log.error("cannal wait error");
                    }
                } else {
                    run(message.getEntries());
                }
                connector.ack(batchId); // 提交确认接收成功
//                    connector.rollback(batchId); // 处理失败, 回滚数据 ,下次还能接收到这条数据
            }
        } finally {  
            connector.disconnect();  
        }  
    }
    
    public void run(List<CanalEntry.Entry> entrys) {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChage = null;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            CanalEntry.EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------> before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------> after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private void printColumn(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}  

启动应用之后,我们在配置的数据中作操作时,就会看到控制台打印的内容。

常用配置以及可能遇到的问题、

canal.instance.mysql.slaveId

模仿slave而使用的ID ,不能和数据库my.ini中的配置冲突

canal.instance.filter.regex 过滤设置

instanse.properties中的该配置可以设置canal对哪些格式的表进行监听,格式如下:

  • .\… 监听全部库全部表
  • dbname\…* dbname数据库下的所有表
  • dbname\.aa.* dbname数据库下 aa 开头的表、
  • dbname.table1 指定数据库及表
  • dbname\.aa.* ,dbname.table1 多个规则用 , 隔开

在配置过滤器时,要十分注意以下几点:

  • 在代码中不要配置CanalConnector.subscribe(”…“),否则配置文件中的过滤配置会被覆盖掉
  • 代码中没有配置subscribe但是过滤器仍不生效
    我遇到了这个问题但是不知奥原因是什么,可以根据配置文件查找此配置
    1、查看 conf/canal.properties 中的canal.instance.global.spring.xml 属性 
       一般默认为classpath:spring/file-instance.xml
    2、打开上面找到的文件 spring/file-instance.xml并修改此处
       <!-- 解析过滤处理 -->
    	<property name="eventFilter">
    		<bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
    			<constructor-arg index="0" value="${canal.instance.filter.regex:.*\..*}" />
    			更改为
    			<constructor-arg index="0" value="${canal.instance.filter.regex}" />
    		</bean>
    	</property>
    删掉缺省值配置并重启canal就可以了,后来我又把该配置还原,再次重启还仍然生效。。。懵了
    

canal.instance.filter.black.regex 黑名单设置

设置类名单的表将不被监听,要注意如果表的规则既满足过滤正则,又满足黑名单的正则,那么他将不被监听

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐