背景:

项目中业务数据量比较大,每类业务表都达到千万级别,虽然做了分库分表和读写分离,每张表数据控制在500W一下,但是效率还是达不到要求,为了提高查询效率,我们使用ES查询。
而将mysql实时同步到es中保证数据一致性就成了我们的工作之下。

环境:

jdk1.8(依赖jdk环境,需要先安装这个)
mysql 5.7
canal1.1.5
(也有一个坑,多张表公用一个es索引,但是多表有字段同名的时候,你更新一个表的同名字段,es会把数据表同名的所有字段都更新,虽然你在es索引中的字段名称不一样,也会导致,cannl开发者修复了这个问题,但是并没有在1.1.5版本中更新,我们是自己下载源码手动改cannl才搞定)
问题地址:https://github.com/alibaba/canal/issues/2023
es:7.1.6
(推荐使用6.8.22版本(cannl对7.X版本的支持感觉不是很好,我们遇到不少坑,尽量用6.X版本,最新的6.8.22版本修复了log4j2的漏洞))
官方文档地址: https://www.elastic.co/guide/en/elasticsearch/reference/6.8/release-notes-6.8.22.html
官方下载地址:https://www.elastic.co/cn/downloads/elasticsearch
腾讯云镜像地址:https://mirrors.cloud.tencent.com/elasticstack/6.x/yum/6.8.22/

服务器 centos, 采用docker容器

资料:

canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。
简单来说canal的原理是根据mysql的主从复制原理实现的,canal伪装成slave库从而向master库读取binlog日志获取增量日志。

canal-admin

(非必须但推荐使用):为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作。

canal-server:

服务端,从mysql读取binlog日志获取增量日志,可以通过tcp、kafka、RocketMQ等方式与客户端通信;通过zookeeper搭建集群。

canal-adapter

:客户端,根据canal-server获取的增量日志执行适配到其他诸如elasticsearch、redis、mysql等端,实现数据同步。

部署:

1、开启mysql的binlog

使用canal-server需要先准备mysql,对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,/mysql.conf.d/mysqld.cnf中配置如下:

window的mysql配置文件可能有所不同,默认是C:\Program Files\MySQL\MySQL Server 5.7下的my-default.ini文件

[mysqld]
# 开启 binlog
log-bin=mysql-bin 
# 选择 ROW 模式
binlog-format=ROW 
# 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
server_id=1 

配置完成后重启mysql,并查询是否配置生效:ON就是开启

SHOW VARIABLES LIKE 'log_bin%';
SHOW VARIABLES LIKE 'binlog_format%';

在这里插入图片描述

2、mysql创建cannl用户并授权

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限。
如果报错
“The MySQL server is running with the --skip-grant-tables option so it cannot execute this statement”
则先进行一下刷新操作:FLUSH PRIVILEGES;

CREATE USER canal IDENTIFIED BY '123456';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

3、安装运行canal-admin

docker run -d -p 8089:8089 -e server.port=8089 --name canal-admin canal/canal-admin

管理页面地址:http://127.0.0.1:8089/
用户名:admin
密码:123456

4、安装运行canal-server

几个端口号作用:

  1. 11110:这个端口是 Canal Server 的默认 admin 端口,主要用于 Canal Server 的管理和监控。

  2. 11111:这个端口是 Canal Server 的默认 client 端口,主要用于数据同步。Canal Server 通过此端口与 Canal Client 进行通信,实现数据的同步。

  3. 11112:这个端口通常用于 Canal Server 的高可用性配置,例如在多个 Canal Server 之间进行切换。

  4. 9100:这个端口是 Canal Server 的 metrics 端口,主要用于暴露 Prometheus metrics,用于监控 Canal Server 的运行状态和性能。

配置不同模式下,端口号会受影响

当我们将 canal.serverMode 设置为 rocketMQ 时,Canal Server 将使用 RocketMQ
作为消息队列,而不再使用 TCP 端口(默认为 11111)进行数据传输。这就是为什么你发现 11111 端口无法连接的原因。

在这种模式下,Canal Server 将 binlog 事件发送到 RocketMQ,然后 Canal Client 从 RocketMQ
订阅并消费这些事件。因此,你需要确保你的 RocketMQ 服务已经正确配置并运行。

如果想要同时使用 RocketMQ 和 TCP 端口进行数据传输,需要启动两个 Canal Server 实例,一个设置为 rocketMQ 模式,另一个设置为 tcp 模式。目前,Canal Server 的设计不支持在单个实例中同时启用多种模式。每个 Canal Server 实例只能使用一种模式。

(下面的IP和端口需要和第三部中配置的一致,需要改成自己服务器的)
安装启动命令:

docker run -d -p 9100:9100 -p 11110:11110 -p 11111:11111 -p 11112:11112 \
-v /home/docker/canal-server/logs:/home/admin/canal-server/logs \
-e canal.admin.manager=127.0.0.1:8089 \
-e canal.admin.port=11110 \
-e canal.admin.user=admin \
-e canal.admin.passwd=4ACFE3202A5FF5CF467898FC58AAB1D615029441 \
--name=canal-server \
--restart=always \
canal/canal-server

或者使用–linik绑定canal-admin ,命令如下:

docker run -d -p 9100:9100 -p 11110:11110 -p 11111:11111 -p 11112:11112 \
-v /home/docker/canal-server/logs:/home/admin/canal-server/logs \
--link canal-admin:canal-admin \
-e canal.admin.manager=canal-admin:8089 \
-e canal.admin.port=11110 \
-e canal.admin.user=admin \
-e canal.admin.passwd=4ACFE3202A5FF5CF467898FC58AAB1D615029441 \
--name=canal-server \
--restart=always \
canal/canal-server

5、应用配置

canal-server启动完成后访问canal-admin链接(IP:8089)因为docke启动的时候配置admin地址,canal-admin服务管理里会自动识别到canal-server的服务
访问地址:http://IP:8089/#/ 账号密码:admin/123456
在这里插入图片描述
第一步:现在 Instance 管理里添加一个实例,先输入实例名称(用英文,这个实例名下面要用!我这叫做demo)和选择服务,点击载入模板修改数据库配置

注意:如果需要同步多个数据源的数据,那就在此步创建多个实例,名称取不一样的

...
# 配置数据库IP和端口,改成自己的,下面说的都要改成自己环境的
canal.instance.master.address=127.0.0.1:3306
...
# 数据库中配置的cannl账号和密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=密码
# 数据库匹配,如果这个不配置,他会扫描所有的库的binlog,语法规则如下图
canal.instance.filter.regex=数据库名\\..*

在这里插入图片描述

第二步:在Server 管理中选择操作下拉框配置,在修改添加刚配置的实例名称,多个用,(逗号)隔开

# 多个实例用逗号隔开
canal.destinations = demo

6、安装运行canal-adapter

说明:canal官方提供了canal-server和canal-admin的docker镜像包,偏偏没有canal-adapter,其他又怎么放心,所以可以自己去制作canal-adapter的docker镜像包,也比较简单
参考方式https://blog.csdn.net/qiaodaima0/article/details/125561823?spm=1001.2014.3001.5501

docker run -d -p 8081:8081 \
--name canal-adapter \
canal/canal-adapter:v1.1.5

复制配置文件到容器外

docker cp canal-adapter:/opt/canal-adapter/conf /home/docker/canal-adapter/conf

停止删除canal-adapter容器,

docker stop canal-adapter
docker rm canal-adapter

再执行运行命令,因为dokcer挂载配置目录会导致配置不生成配置文件,所以我们再以挂载配置文件方式运行它。

docker run -d -p 8081:8081 \
-v /home/docker/canal-adapter/conf:/opt/canal-adapter/conf \
-v /home/docker/canal-adapter/logs:/opt/canal-adapter/logs \
--name canal-adapter \
canal/canal-adapter:v1.1.5

7、canal-adapter配置

修改application.yml配置文件中的canalServerHost、数据源及instance和ES服务器地址

 vi /home/docker/canal-adapter/conf/application.yml

文件仅做参考,需要改成自己的,配置好后把所有的中文注释删除!!!

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # 配置cannl 地址 把127.0.0.1换成自己的
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
  srcDataSources:
   # 自定义数据源名称,支持多个数据源,设置为名称不一样的即可,如下面的defaultDS和defaultDS2
    defaultDS:
      # 数据库信息配置 换成自己的
      url: jdbc:mysql://127.0.0.1:3306/test?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true
      username: canal
      password: 123456
    #defaultDS2:
       #url: jdbc:mysql://127.0.0.1:3306/test?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true
       #username: canal
       #password: 123456	      
  canalAdapters:
   # canal instance Name or mq topic name
   # 实例名配置,上面第5步取的,我配置的叫做demo,如果是多个实例,从 - instance行到cluster.name最后一行重复即可,就像上面的defaultDS、defaultDS2多数据源
  - instance: demo
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
		# 数据文件位置,我这设置es7,等下需要在这个conf目录下创建一个新的文件夹 名称es7,这里设置的啥名,文件夹就叫什么名
      - name: es7
      # es信息配置,127.0.0.1换成自己的,es的数据端口是9300,集群情况下多个用逗号隔开
        hosts: 127.0.0.1:9300
        properties:
          mode: transport # or rest (transport模式,hosts中的端口要用9300;用rest模式,hosts中的端口要用9200,阿里云的es有些只支持rest)
        # security.auth: test:123456 # 账号密码 用于rest模式
		# ES的cluster.name集群名称通过访问es所在服务器http://IP:9200/可以看到
          cluster.name: elasticsearch

注意: 如果canal-adapter启动日志报错,报连接问题,请注意配置的端口防火墙有没有开放!!!

防火墙相关操作:

# 1、查看防火墙状态
systemctl status firewalld

# 2、开放指定端口
firewall-cmd --zone=public --add-port=9200/tcp --permanent
firewall-cmd --zone=public --add-port=9300/tcp --permanent

# 3、重启防火墙
firewall-cmd --reload 

也可以直接选择关闭防火墙

# 临时关闭防火墙
systemctl stop firewalld 
# 永久关闭防火墙,enable开启
systemctl disable firewalld

8、创建数据库同步脚本文件夹和文件

在/home/docker/canal-adapter/conf/下新建,名为es7的文件夹

mkdir es7

在es7文件夹下创建数据库同步脚本,脚本不一致,需要自己创建,每个索引都需要一个yml脚本,我以其中一个脚本为例,
数据库表名为 person_1,表结构如下
在这里插入图片描述
那么我需要把person_1和es中的索引数据同步(es索引自己百度),脚本如下

 cd es7
 vi  person_1.yml

person_1.yml内容 注意给表设置别名,比如我下面给表person_1 设置别名为t

dataSourceKey: defaultDS
# 实例名 第5步取名的
destination: demo
groupId: g1
esMapping:
   # es创建的索引名
  _index: dev_person
  _type: _doc
  _id: _id
  sql: "
  # 查询sql语句,别名就是es的文档名,我这是一样,多表公用一个es索引的话,正常的left join写法就行
SELECT
  t.id as _id,
  t.id as id,
  t.name as name,
  t.sex as sex,
  t.age as age,
  t.des as des
  FROM
  person_1 t
"
  #etlCondition: "where s.c_time>={}" 这个etlCondition是全量同步的,有坑,别用,全量同步用步骤9
  commitBatch: 3000

重启canal-adapter,mysql新增数据就会自动增量同步到es

9、初始化数据同步

如果mysql中有数据就需要调用一次全量同步,如果mysql没数据,或者数据没用,就不需要调用此步骤

canal-adapter提供一个REST接口可全量同步数据到ES

### 全量同步person_1表数据到es的person索引中,路径中的es7是上一步的文件夹名
curl http://127.0.0.1:8081/etl/es7/person_1.yml -X POST

10、如果是生产环境,推荐采用mq模式而非tcp模式同步数据

mq具有重试补偿机制,我们上面采用canal的tcp模式进行同步,如果出现问题,需要我们手动运维处理,如果采用mq的方式会好一些。

对于canal同步的mq的topic最好不要和业务线采用一样的,单独设置一个,不然业务线mq的处理数量一旦起来了会造成同步的延期时间增加。理论上tcp模式的同步延期度应该是比mq模式比较低的。

如果是比较小的项目,其实tcp模式也够用了,一般不会出问题,有利有弊吧,看哪种模式适合自己的项目,mq的模式扩展性也比较强,自己可以写代码监听他的topic(消费组group名称别和实例的一样就行),去做一些其他的操作,比如实现redis和mysql的缓存一致性。

步骤是一样的,只需要改几个地方(我们以rocketMQ为例,以上面配置好的文件进行修改)

(1)server配置调整

把模式由tcp改称rocketMQ
配置mq的连接信息和消息生产的分组group名称
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

(2)实例配置调整

设置mq消费的topic名称
在这里插入图片描述
在这里插入图片描述

(3)canal-adapter配置调整

canal-adapter的配置文件application.yml也需要把模式改为mq
配置mq连接信息
实例名改为topic名,同步yml文件也把实例名改为topic名即可
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

参考:
https://www.csdn.net/tags/MtTaEgzsNzI2OTkzLWJsb2cO0O0O.html

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐