一、canal 简介

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件 (github.com)

二、下载安装使用

下载链接地址 :Release v1.1.5 · alibaba/canal (github.com)

1.准备阶段

  • 对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

    [mysqld]
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
    
    • 注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步
  • 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;

 请务必查看基本操作官网地址说明:QuickStart · alibaba/canal Wiki (github.com)

2.选择版本后下载相应的软件

分别下载 adapter、depyer 、admin(按需下载) 

创建文件夹canal-adapter 、canal-admin、canal-deployer 上传到服务器后在相应文件夹解压即可

1.修改canal-deployer 配置文件

$ cd /data/canal-deployer/conf/example
$ vim instance.properties

为什么会自带有example 文件夹,请参考官网:AdminGuide · alibaba/canal Wiki (github.com)

修改数据库链接地址找到配置信息

#数据库地址
canal.instance.master.address=127.0.0.1:3306
#要同步的数据库,可空,空的话就是同步全库
#canal.instance.defaultDatabaseName =
canal.instance.defaultDatabaseName = shop
#上面初始化的用户信息
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

 保存退出即可,启动命令在/bin文件夹下  执行 

$ cd /data/canal-deployer
$ ./bin/startup.sh 

关闭命令

$ $ ./bin/stop.sh 

2.配置canal-admin 

参考文档 :Canal Admin QuickStart · alibaba/canal Wiki (github.com)

1.首先下载初始化SQL文件地址:canal/admin/admin-web/src/main/resources at master · alibaba/canal (github.com)https://github.com/alibaba/canal/tree/master/admin/admin-web/src/main/resources

2.然后修改配置文件

$ cd data/canal-admin/conf
$ vim application.yml
server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

spring.datasource:
  address:127.0.0.1:3306
  database: canal_manager
  username: canal
  password: canal
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

canal:
  adminUser: admin
  adminPasswd: admin

3.启动命令

$ cd /data/canal-admin
$  ./bin/startup.sh

可以通过 http://127.0.0.1:8089/ 访问,默认密码:admin/123456

4.关闭命令

$ ./bin/stop.sh

5.配置 canal-deployer 连接admin,启动命令 后面 加 local 即可

$ cd /data/canal-deployer
$ ./bin/startup.sh local

6.启动 admin、然后启动 canal-deployer

如果登录admin 发现 没有初始化实例和服务,点击依次创建即可,载入模板-修改配置

 3.canal-adapter 配置同步数据增量操作

1.修改配置文件指向自己想要的数据存储连接,我这边用的是es7版本。

$ cd /data/canal-adapter/conf
$ vim application.yml
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer
    kafka.bootstrap.servers: 127.0.0.1:9092
    kafka.enable.auto.commit: false
    kafka.auto.commit.interval.ms: 1000
    kafka.auto.offset.reset: latest
    kafka.request.timeout.ms: 40000
    kafka.session.timeout.ms: 30000
    kafka.isolation.level: read_committed
    kafka.max.poll.records: 1000
    # rocketMQ consumer
    rocketmq.namespace:
    rocketmq.namesrv.addr: 127.0.0.1:9876
    rocketmq.batch.size: 1000
    rocketmq.enable.message.trace: false
    rocketmq.customized.trace.topic:
    rocketmq.access.channel:
    rocketmq.subscribe.filter:
    # rabbitMQ consumer
    rabbitmq.host:
    rabbitmq.virtual.host:
    rabbitmq.username:
    rabbitmq.password:
    rabbitmq.resource.ownerId:

  srcDataSources:
    defaultDS:
#数据库地址
      url: jdbc:mysql://127.0.0.1:3306/shop?useUnicode=true
      username: root
      password: root
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
#es7 文件夹下的配置文件
      - name: es7
#es 的地址
        hosts: localhost:9200 #9300 是transport 
        properties:
#          mode: transport # or rest
          mode: rest
#          # security.auth: test:123456 #  only used for rest mode
          cluster.name: elasticsearch
#        - name: kudu
#          key: kudu
#          properties:
#            kudu.master.address: 127.0.0.1 # ',' split multi address

2.进入es7文件夹下修改要同步的数据,然后进行配置

$ cd /data/canal-adapter/conf/es7
$ cat mytest_user.yml

dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
  _index: mytest_user
  _id: _id
#  upsert: true
#  pk: id
#修改成自己想要同步的sql 就可以了
  sql: "select a.id as _id, a.name, a.role_id, b.role_name,
        a.c_time from user a
        left join role b on b.id=a.role_id"
#  objFields:
#    _labels: array:;
#我没有这个操作所以注释掉了
#  etlCondition: "where a.c_time>={}"
#批量提交条数
  commitBatch: 3000

4.启动可能会出现数据源类转换异常,需要下载源代码然后解决冲突,找到escore项目修改数据源依赖范围,重新打包找到 client-adapter.es7x-1.1.5-jar-with-dependencies.jar 上传到plugin文件夹

$ cd /data/canal-adapter/plugin
$ 上传jar包 删除旧的 上传新的

5.配置完成后启动即可,因为是增量所以要修改数据库数据,去数据库更新要更新的语句。

4.去es查询信息是否更新成功,上面的sql一定要配置成自己数据库的sql,而且一定要提前创建索引!!!提前创建索引!!!提前创建索引!!!

详情请参考:canal/client-adapter at master · alibaba/canal (github.com)https://github.com/alibaba/canal/tree/master/client-adapter

6.如果是全量操作,编写好yml文件然后调用接口,把下面的canal-adapter的地址和yml文件修改一下就可以了。借鉴博客:全量同步Elasticsearch方案之Canal - 知乎 (zhihu.com)

curl -X POST http://127.0.0.1:8081/etl/es7/sys_user.yml

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐