canal同步mysql数据到es中
背景:项目中业务数据量比较大,每类业务表都达到千万级别,虽然做了分库分表和读写分离,每张表数据控制在500W一下,但是效率还是达不到要求,为了提高查询效率,我们使用ES查询。而将mysql实时同步到es中保证数据一致性就成了我们的工作之下。环境:mysql 5.7caanl 1.1.5(也有一个坑,多张表公用一个es索引,但是多表有字段同名的时候,你更新一个表的同名字段,es会把数据表同名的所有字
背景:
项目中业务数据量比较大,每类业务表都达到千万级别,虽然做了分库分表和读写分离,每张表数据控制在500W一下,但是效率还是达不到要求,为了提高查询效率,我们使用ES查询。
而将mysql实时同步到es中保证数据一致性就成了我们的工作之下。
环境:
jdk1.8(依赖jdk环境,需要先安装这个)
mysql 5.7
canal1.1.5
(也有一个坑,多张表公用一个es索引,但是多表有字段同名的时候,你更新一个表的同名字段,es会把数据表同名的所有字段都更新,虽然你在es索引中的字段名称不一样,也会导致,cannl开发者修复了这个问题,但是并没有在1.1.5版本中更新,我们是自己下载源码手动改cannl才搞定)
问题地址:https://github.com/alibaba/canal/issues/2023
es:7.1.6
(推荐使用6.8.22版本(cannl对7.X版本的支持感觉不是很好,我们遇到不少坑,尽量用6.X版本,最新的6.8.22版本修复了log4j2的漏洞))
官方文档地址: https://www.elastic.co/guide/en/elasticsearch/reference/6.8/release-notes-6.8.22.html
官方下载地址:https://www.elastic.co/cn/downloads/elasticsearch
腾讯云镜像地址:https://mirrors.cloud.tencent.com/elasticstack/6.x/yum/6.8.22/
服务器 centos, 采用docker容器
资料:
canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。
简单来说canal的原理是根据mysql的主从复制原理实现的,canal伪装成slave库从而向master库读取binlog日志获取增量日志。
canal-admin
(非必须但推荐使用):为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作。
canal-server:
服务端,从mysql读取binlog日志获取增量日志,可以通过tcp、kafka、RocketMQ等方式与客户端通信;通过zookeeper搭建集群。
canal-adapter
:客户端,根据canal-server获取的增量日志执行适配到其他诸如elasticsearch、redis、mysql等端,实现数据同步。
部署:
1、开启mysql的binlog
使用canal-server需要先准备mysql,对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,/mysql.conf.d/mysqld.cnf中配置如下:
window的mysql配置文件可能有所不同,默认是C:\Program Files\MySQL\MySQL Server 5.7下的my-default.ini文件
[mysqld]
# 开启 binlog
log-bin=mysql-bin
# 选择 ROW 模式
binlog-format=ROW
# 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
server_id=1
配置完成后重启mysql,并查询是否配置生效:ON就是开启
SHOW VARIABLES LIKE 'log_bin%';
SHOW VARIABLES LIKE 'binlog_format%';
2、mysql创建cannl用户并授权
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限。
如果报错
“The MySQL server is running with the --skip-grant-tables option so it cannot execute this statement”
则先进行一下刷新操作:FLUSH PRIVILEGES;
CREATE USER canal IDENTIFIED BY '123456';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
3、安装运行canal-admin
docker run -d -p 8089:8089 -e server.port=8089 --name canal-admin canal/canal-admin
管理页面地址:http://127.0.0.1:8089/
用户名:admin
密码:123456
4、安装运行canal-server
几个端口号作用:
11110:这个端口是 Canal Server 的默认 admin 端口,主要用于 Canal Server 的管理和监控。
11111:这个端口是 Canal Server 的默认 client 端口,主要用于数据同步。Canal Server 通过此端口与 Canal Client 进行通信,实现数据的同步。
11112:这个端口通常用于 Canal Server 的高可用性配置,例如在多个 Canal Server 之间进行切换。
9100:这个端口是 Canal Server 的 metrics 端口,主要用于暴露 Prometheus metrics,用于监控 Canal Server 的运行状态和性能。
配置不同模式下,端口号会受影响
当我们将 canal.serverMode 设置为 rocketMQ 时,Canal Server 将使用 RocketMQ
作为消息队列,而不再使用 TCP 端口(默认为 11111)进行数据传输。这就是为什么你发现 11111 端口无法连接的原因。
在这种模式下,Canal Server 将 binlog 事件发送到 RocketMQ,然后 Canal Client 从 RocketMQ
订阅并消费这些事件。因此,你需要确保你的 RocketMQ 服务已经正确配置并运行。
如果想要同时使用 RocketMQ 和 TCP 端口进行数据传输,需要启动两个 Canal Server 实例,一个设置为 rocketMQ 模式,另一个设置为 tcp 模式。目前,Canal Server 的设计不支持在单个实例中同时启用多种模式。每个 Canal Server 实例只能使用一种模式。
(下面的IP和端口需要和第三部中配置的一致,需要改成自己服务器的)
安装启动命令:
docker run -d -p 9100:9100 -p 11110:11110 -p 11111:11111 -p 11112:11112 \
-v /home/docker/canal-server/logs:/home/admin/canal-server/logs \
-e canal.admin.manager=127.0.0.1:8089 \
-e canal.admin.port=11110 \
-e canal.admin.user=admin \
-e canal.admin.passwd=4ACFE3202A5FF5CF467898FC58AAB1D615029441 \
--name=canal-server \
--restart=always \
canal/canal-server
或者使用–linik绑定canal-admin ,命令如下:
docker run -d -p 9100:9100 -p 11110:11110 -p 11111:11111 -p 11112:11112 \
-v /home/docker/canal-server/logs:/home/admin/canal-server/logs \
--link canal-admin:canal-admin \
-e canal.admin.manager=canal-admin:8089 \
-e canal.admin.port=11110 \
-e canal.admin.user=admin \
-e canal.admin.passwd=4ACFE3202A5FF5CF467898FC58AAB1D615029441 \
--name=canal-server \
--restart=always \
canal/canal-server
5、应用配置
canal-server启动完成后访问canal-admin链接(IP:8089)因为docke启动的时候配置admin地址,canal-admin服务管理里会自动识别到canal-server的服务
访问地址:http://IP:8089/#/ 账号密码:admin/123456
第一步:现在 Instance 管理里添加一个实例,先输入实例名称(用英文,这个实例名下面要用!我这叫做demo)和选择服务,点击载入模板修改数据库配置
注意:如果需要同步多个数据源的数据,那就在此步创建多个实例,名称取不一样的
...
# 配置数据库IP和端口,改成自己的,下面说的都要改成自己环境的
canal.instance.master.address=127.0.0.1:3306
...
# 数据库中配置的cannl账号和密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=密码
# 数据库匹配,如果这个不配置,他会扫描所有的库的binlog,语法规则如下图
canal.instance.filter.regex=数据库名\\..*
第二步:在Server 管理中选择操作下拉框配置,在修改添加刚配置的实例名称,多个用,(逗号)隔开
# 多个实例用逗号隔开
canal.destinations = demo
6、安装运行canal-adapter
说明:canal官方提供了canal-server和canal-admin的docker镜像包,偏偏没有canal-adapter,其他又怎么放心,所以可以自己去制作canal-adapter的docker镜像包,也比较简单
参考方式https://blog.csdn.net/qiaodaima0/article/details/125561823?spm=1001.2014.3001.5501
docker run -d -p 8081:8081 \
--name canal-adapter \
canal/canal-adapter:v1.1.5
复制配置文件到容器外
docker cp canal-adapter:/opt/canal-adapter/conf /home/docker/canal-adapter/conf
停止删除canal-adapter容器,
docker stop canal-adapter
docker rm canal-adapter
再执行运行命令,因为dokcer挂载配置目录会导致配置不生成配置文件,所以我们再以挂载配置文件方式运行它。
docker run -d -p 8081:8081 \
-v /home/docker/canal-adapter/conf:/opt/canal-adapter/conf \
-v /home/docker/canal-adapter/logs:/opt/canal-adapter/logs \
--name canal-adapter \
canal/canal-adapter:v1.1.5
7、canal-adapter配置
修改application.yml配置文件中的canalServerHost、数据源及instance和ES服务器地址
vi /home/docker/canal-adapter/conf/application.yml
文件仅做参考,需要改成自己的,配置好后把所有的中文注释删除!!!
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# 配置cannl 地址 把127.0.0.1换成自己的
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
srcDataSources:
# 自定义数据源名称,支持多个数据源,设置为名称不一样的即可,如下面的defaultDS和defaultDS2
defaultDS:
# 数据库信息配置 换成自己的
url: jdbc:mysql://127.0.0.1:3306/test?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true
username: canal
password: 123456
#defaultDS2:
#url: jdbc:mysql://127.0.0.1:3306/test?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true
#username: canal
#password: 123456
canalAdapters:
# canal instance Name or mq topic name
# 实例名配置,上面第5步取的,我配置的叫做demo,如果是多个实例,从 - instance行到cluster.name最后一行重复即可,就像上面的defaultDS、defaultDS2多数据源
- instance: demo
groups:
- groupId: g1
outerAdapters:
- name: logger
# 数据文件位置,我这设置es7,等下需要在这个conf目录下创建一个新的文件夹 名称es7,这里设置的啥名,文件夹就叫什么名
- name: es7
# es信息配置,127.0.0.1换成自己的,es的数据端口是9300,集群情况下多个用逗号隔开
hosts: 127.0.0.1:9300
properties:
mode: transport # or rest (transport模式,hosts中的端口要用9300;用rest模式,hosts中的端口要用9200,阿里云的es有些只支持rest)
# security.auth: test:123456 # 账号密码 用于rest模式
# ES的cluster.name集群名称通过访问es所在服务器http://IP:9200/可以看到
cluster.name: elasticsearch
注意: 如果canal-adapter启动日志报错,报连接问题,请注意配置的端口防火墙有没有开放!!!
防火墙相关操作:
# 1、查看防火墙状态
systemctl status firewalld
# 2、开放指定端口
firewall-cmd --zone=public --add-port=9200/tcp --permanent
firewall-cmd --zone=public --add-port=9300/tcp --permanent
# 3、重启防火墙
firewall-cmd --reload
也可以直接选择关闭防火墙
# 临时关闭防火墙
systemctl stop firewalld
# 永久关闭防火墙,enable开启
systemctl disable firewalld
8、创建数据库同步脚本文件夹和文件
在/home/docker/canal-adapter/conf/下新建,名为es7的文件夹
mkdir es7
在es7文件夹下创建数据库同步脚本,脚本不一致,需要自己创建,每个索引都需要一个yml脚本,我以其中一个脚本为例,
数据库表名为 person_1,表结构如下
那么我需要把person_1和es中的索引数据同步(es索引自己百度),脚本如下
cd es7
vi person_1.yml
person_1.yml内容 注意给表设置别名,比如我下面给表person_1 设置别名为t
dataSourceKey: defaultDS
# 实例名 第5步取名的
destination: demo
groupId: g1
esMapping:
# es创建的索引名
_index: dev_person
_type: _doc
_id: _id
sql: "
# 查询sql语句,别名就是es的文档名,我这是一样,多表公用一个es索引的话,正常的left join写法就行
SELECT
t.id as _id,
t.id as id,
t.name as name,
t.sex as sex,
t.age as age,
t.des as des
FROM
person_1 t
"
#etlCondition: "where s.c_time>={}" 这个etlCondition是全量同步的,有坑,别用,全量同步用步骤9
commitBatch: 3000
重启canal-adapter,mysql新增数据就会自动增量同步到es
9、初始化数据同步
如果mysql中有数据就需要调用一次全量同步,如果mysql没数据,或者数据没用,就不需要调用此步骤
canal-adapter提供一个REST接口可全量同步数据到ES
### 全量同步person_1表数据到es的person索引中,路径中的es7是上一步的文件夹名
curl http://127.0.0.1:8081/etl/es7/person_1.yml -X POST
10、如果是生产环境,推荐采用mq模式而非tcp模式同步数据
mq具有重试补偿机制,我们上面采用canal的tcp模式进行同步,如果出现问题,需要我们手动运维处理,如果采用mq的方式会好一些。
对于canal同步的mq的topic最好不要和业务线采用一样的,单独设置一个,不然业务线mq的处理数量一旦起来了会造成同步的延期时间增加。理论上tcp模式的同步延期度应该是比mq模式比较低的。
如果是比较小的项目,其实tcp模式也够用了,一般不会出问题,有利有弊吧,看哪种模式适合自己的项目,mq的模式扩展性也比较强,自己可以写代码监听他的topic(消费组group名称别和实例的一样就行),去做一些其他的操作,比如实现redis和mysql的缓存一致性。
步骤是一样的,只需要改几个地方(我们以rocketMQ为例,以上面配置好的文件进行修改)
(1)server配置调整
把模式由tcp改称rocketMQ
配置mq的连接信息和消息生产的分组group名称
(2)实例配置调整
设置mq消费的topic名称
(3)canal-adapter配置调整
canal-adapter的配置文件application.yml也需要把模式改为mq
配置mq连接信息
实例名改为topic名,同步yml文件也把实例名改为topic名即可
参考:
https://www.csdn.net/tags/MtTaEgzsNzI2OTkzLWJsb2cO0O0O.html
更多推荐
所有评论(0)