elasticsearch (五)canal 全量和增量自定义sql同步es
一、canal简介早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。基于日志增量订阅和消费的业务包括数据库镜像数据库实时备份索引构建和实时维护(拆分异构索引、倒排索引等)业务 cache 刷新带业务逻辑的增量数据处理当
一、canal 简介
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
基于日志增量订阅和消费的业务包括
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件 (github.com)
二、下载安装使用
下载链接地址 :Release v1.1.5 · alibaba/canal (github.com)
1.准备阶段
-
对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下
[mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
- 注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步
-
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
请务必查看基本操作官网地址说明:QuickStart · alibaba/canal Wiki (github.com)
2.选择版本后下载相应的软件
分别下载 adapter、depyer 、admin(按需下载)
创建文件夹canal-adapter 、canal-admin、canal-deployer 上传到服务器后在相应文件夹解压即可
1.修改canal-deployer 配置文件
$ cd /data/canal-deployer/conf/example
$ vim instance.properties
为什么会自带有example 文件夹,请参考官网:AdminGuide · alibaba/canal Wiki (github.com)
修改数据库链接地址找到配置信息
#数据库地址
canal.instance.master.address=127.0.0.1:3306
#要同步的数据库,可空,空的话就是同步全库
#canal.instance.defaultDatabaseName =
canal.instance.defaultDatabaseName = shop
#上面初始化的用户信息
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
保存退出即可,启动命令在/bin文件夹下 执行
$ cd /data/canal-deployer
$ ./bin/startup.sh
关闭命令
$ $ ./bin/stop.sh
2.配置canal-admin
参考文档 :Canal Admin QuickStart · alibaba/canal Wiki (github.com)
2.然后修改配置文件
$ cd data/canal-admin/conf
$ vim application.yml
server:
port: 8089
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
spring.datasource:
address:127.0.0.1:3306
database: canal_manager
username: canal
password: canal
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
hikari:
maximum-pool-size: 30
minimum-idle: 1
canal:
adminUser: admin
adminPasswd: admin
3.启动命令
$ cd /data/canal-admin
$ ./bin/startup.sh
可以通过 http://127.0.0.1:8089/ 访问,默认密码:admin/123456
4.关闭命令
$ ./bin/stop.sh
5.配置 canal-deployer 连接admin,启动命令 后面 加 local 即可
$ cd /data/canal-deployer
$ ./bin/startup.sh local
6.启动 admin、然后启动 canal-deployer
如果登录admin 发现 没有初始化实例和服务,点击依次创建即可,载入模板-修改配置
3.canal-adapter 配置同步数据增量操作
1.修改配置文件指向自己想要的数据存储连接,我这边用的是es7版本。
$ cd /data/canal-adapter/conf
$ vim application.yml
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
kafka.bootstrap.servers: 127.0.0.1:9092
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms: 1000
kafka.auto.offset.reset: latest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
# rocketMQ consumer
rocketmq.namespace:
rocketmq.namesrv.addr: 127.0.0.1:9876
rocketmq.batch.size: 1000
rocketmq.enable.message.trace: false
rocketmq.customized.trace.topic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
# rabbitMQ consumer
rabbitmq.host:
rabbitmq.virtual.host:
rabbitmq.username:
rabbitmq.password:
rabbitmq.resource.ownerId:
srcDataSources:
defaultDS:
#数据库地址
url: jdbc:mysql://127.0.0.1:3306/shop?useUnicode=true
username: root
password: root
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
#es7 文件夹下的配置文件
- name: es7
#es 的地址
hosts: localhost:9200 #9300 是transport
properties:
# mode: transport # or rest
mode: rest
# # security.auth: test:123456 # only used for rest mode
cluster.name: elasticsearch
# - name: kudu
# key: kudu
# properties:
# kudu.master.address: 127.0.0.1 # ',' split multi address
2.进入es7文件夹下修改要同步的数据,然后进行配置
$ cd /data/canal-adapter/conf/es7
$ cat mytest_user.yml
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
_index: mytest_user
_id: _id
# upsert: true
# pk: id
#修改成自己想要同步的sql 就可以了
sql: "select a.id as _id, a.name, a.role_id, b.role_name,
a.c_time from user a
left join role b on b.id=a.role_id"
# objFields:
# _labels: array:;
#我没有这个操作所以注释掉了
# etlCondition: "where a.c_time>={}"
#批量提交条数
commitBatch: 3000
4.启动可能会出现数据源类转换异常,需要下载源代码然后解决冲突,找到escore项目修改数据源依赖范围,重新打包找到 client-adapter.es7x-1.1.5-jar-with-dependencies.jar 上传到plugin文件夹
$ cd /data/canal-adapter/plugin
$ 上传jar包 删除旧的 上传新的
5.配置完成后启动即可,因为是增量所以要修改数据库数据,去数据库更新要更新的语句。
4.去es查询信息是否更新成功,上面的sql一定要配置成自己数据库的sql,而且一定要提前创建索引!!!提前创建索引!!!提前创建索引!!!
6.如果是全量操作,编写好yml文件然后调用接口,把下面的canal-adapter的地址和yml文件修改一下就可以了。借鉴博客:全量同步Elasticsearch方案之Canal - 知乎 (zhihu.com)
curl -X POST http://127.0.0.1:8081/etl/es7/sys_user.yml
更多推荐
所有评论(0)