文章基于已经搭建好的canal环境,如果没有搭建好canal环境可以移步https://blog.csdn.net/u012946310/article/details/109361685 搭建好 canal 初始环境 canal admin 与 service 都为1.1.4版本

一,运行环境:

1,两台虚拟机
192.168.2.196:zk、mysql、redis安装在本虚拟机上面,并且有启动 canal-admin 与 canal-service 服务
192.168.2.169:只启动 canal-service
zk只启动一台实例服务

2,在196机器上通过docker安装zk

#1,下载zk
docker pull zookeeper
#2,运行
docker run --name zookeeper -p 2181:2181 -d zookeeper
#3,查看运行状态
docker ps

在这里插入图片描述

二、canal-service集群配置

可以参考官方文档说明:https://github.com/alibaba/canal/wiki/Canal-Admin-Guide

进入canal-admin控制台,账号密码:admin/123456
在这里插入图片描述
1,创建集群
在这里插入图片描述
在这里插入图片描述
集群创建完成后,点击操作 - 主配置 - 载入配置,配置zk注册地址点击保存

各配置说明参考文档:https://github.com/alibaba/canal/wiki/AdminGuide
在这里插入图片描述

配置项:

  • 修改集群/删除集群,属于基本的集群信息维护和删除
  • 主配置,主要是指集群对应的canal.properties配置,设计上一个集群的所有server会共享一份全局canal.properties配置
  • (如果有个性化的配置需求,可以创建多个集群) 查看server,主要是指查看挂载在这个集群下的所有server列表

2,Server 管理,新增集群server
新增两个server,选择刚刚新建的 springboot_canal_demo 集群
Server Ip分别是 192.168.2.196、192.168.2.169两台server主机服务
在这里插入图片描述
新增完成后,就能够在列表看到新增的两台服务,此时状态是断开状态,是因为我们还没启动canal服务
在这里插入图片描述

配置项:

  • 所属集群,可以选择为单机 或者 集群。一般单机Server的模式主要用于一次性的任务或者测试任务
  • Server名称,唯一即可,方便自己记忆
  • Server Ip,机器ip
  • admin端口,canal 1.1.4版本新增的能力,会在canal-server上提供远程管理操作,默认值11110
  • tcp端口,canal提供netty数据订阅服务的端口
  • metric端口, promethues的exporter监控数据端口 (未来会对接监控)

3,配置启动 192.168.2.196、192.168.2.169 机器canal服务
进入 conf 目录下,编辑 canal_local.properties 配置文件

vim conf/canal_local.properties

在这里插入图片描述

# register ip,填写虚拟机ip,默认为空,建议填写,防止注册时读取网卡ip有误,导致ip注册错误
canal.register.ip =192.168.2.196

# canal admin config 管理端地址
canal.admin.manager = 192.168.2.196:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =

启动服务

sh bin/startup.sh local

启动完成后就能够在列表看到,服务已经是启动状态
在这里插入图片描述
并且也已经将服务注册进zk了
在这里插入图片描述

三、Instance管理

1,创建Instance
instance配置比较简单,主要关注:

  • 资源关联,比如挂载到具体的单机 或 集群
  • instance.properties配置维护,可以载入默认模板进行修改

点击载入模板,选择之前创建的集群 springboot_canal_demo ,修改自己的配置即可

各配置说明参考文档:https://github.com/alibaba/canal/wiki/AdminGuide
在这里插入图片描述
配置完成后,保存,默认是启动当前实例
在这里插入图片描述

配置项:

  • 修改,主要就是维护instance.properties配置,做了修改之后会触发对应单机或集群server上的instance做动态reload
  • 删除,相当于直接执行instance stop,并执行配置删除
  • 启动/停止,对instance进行状态变更,做了修改会触发对应单机或集群server上的instance做启动/停止操作
  • 日志,主要针对instance运行状态时,获取对应instance的最后100行日志,比如example/example.log

四,集群高可用测试

通过列表看到,当前运行的 instance 所属主机是 192.168.2.196
在这里插入图片描述
通过server管理,将 192.168.2.196 服务停止,看 instance 运行实例会不会自动迁移到 192.168.2.169 机器
在这里插入图片描述
在这里插入图片描述
可以看到 instance 运行的实例已经切换到 169 的机器上面了,说明calal-server集群高可用服务已经搭建成功了

五,通过java代码连接 canal-server 集群服务,做数据同步

示例官方demo:https://github.com/alibaba/canal/wiki/ClientAPI
本文demo git地址:https://gitee.com/hwm0717/springboot_canal_demo/tree/ZK_HA

项目结构:
在这里插入图片描述
数据库sql

CREATE TABLE `user` (
  `user_id` int(11) NOT NULL AUTO_INCREMENT COMMENT '用户id',
  `nickname` varchar(126) DEFAULT NULL COMMENT '昵称',
  `name` varchar(50) DEFAULT NULL COMMENT '姓名',
  `phone` varchar(11) DEFAULT NULL COMMENT '手机',
  `login_name` varchar(20) DEFAULT NULL COMMENT '登陆名',
  `login_pwd` varchar(100) DEFAULT NULL COMMENT '密码',
  PRIMARY KEY (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>springboot.canal.demo</groupId>
    <artifactId>springboot-canal-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springboot-canal-demo</name>
    <description>Spring Boot canal1.1.4 demo</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- https://hutool.cn/docs/#/ -->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.4.4</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.4</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

application.properties


#服务端口号
server.port=8080

#canal配置
#canal-server地址,单机时使用
alibaba.canal.ip=192.168.2.196
#zk地址,canal集群时使用
alibaba.canal.zkAddr=192.168.2.196:2181
alibaba.canal.port=11111
#instance名称
alibaba.canal.destination=springboot_canal_demo
alibaba.canal.username=canal
alibaba.canal.password=canal
#监听的数据库
alibaba.canal.subscribe=springboot_canal_demo\\..*

#redis配置
spring.redis.database=0
spring.redis.host=192.168.2.196
spring.redis.port=6379
spring.redis.password=123456
# 连接池最大连接数(使用负值表示没有限制)
spring.redis.jedis.pool.max-active=20
# 连接池最大阻塞等待时间(使用负值表示没有限制)
spring.redis.jedis.pool.max-wait=-1
# 连接池中的最大空闲连接
spring.redis.jedis.pool.max-idle=10
# 连接池中的最小空闲连接
spring.redis.jedis.pool.min-idle=5
# 连接超时时间(毫秒)
spring.redis.timeout=30000

RedisConfig.java

package springboot.canal.demo.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
 * RedisTemplate 配置,重写key和value的序列化
 *
 * @Classname RedisConfig
 * @Description
 * @Date 2020/5/12 12:44
 * @Created by hwm
 */
@Configuration
public class RedisConfig {

    @Bean
    public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        // 配置redisTemplate
        RedisTemplate redisTemplate = new RedisTemplate();
        redisTemplate.setConnectionFactory(redisConnectionFactory);

        RedisSerializer stringSerializer = new StringRedisSerializer();

        redisTemplate.setKeySerializer(stringSerializer); // key序列化
        redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer()); // value序列化
        redisTemplate.setHashKeySerializer(stringSerializer); // Hash key序列化
        redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer()); // Hash value序列化
        redisTemplate.afterPropertiesSet();

        return redisTemplate;
    }

}

CanalInitBean.java

package springboot.canal.demo.canalinit;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

/**
 * canal配置bean
 *
 * @description:
 * @author: dada
 * @date: 2020/10/29 10:42
 */
@ConfigurationProperties(prefix = "alibaba.canal")
@Configuration
@Data
public class CanalInitBean {

    // canal ip
    private String ip;

    // zk addr
    private String zkAddr;

    // canal port
    private Integer port;

    // 实例名称
    private String destination;

    private String username;

    private String password;

    /**
     * 过滤规则:
     * mysql 数据解析关注的表,Perl正则表达式.
     * 多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
     * 常见例子:
     * 1.  所有表:.*   or  .*\\..*
     * 2.  canal schema下所有表: canal\\..*
     * 3.  canal下的以canal打头的表:canal\\.canal.*
     * 4.  canal schema下的一张表:canal.test1
     * 5.  多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
     * 注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)
     */
    private String subscribe;
}

CanalInit.java

package springboot.canal.demo.canalinit;

import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * @description:
 * @author: dada
 * @date: 2020/10/29 10:36
 */
@Component
public class CanalInit implements ApplicationRunner {

    @Autowired
    private RedisTemplate redisTemplate;
    @Autowired
    private CanalInitBean canalInitBean;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        startCanal(canalInitBean);
    }

    private void startCanal(CanalInitBean canalInitBean) {
        // 创建SimpleCanalConnector,单机时使用 (直连ip,不支持server/client的failover机制)
//        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalInitBean.getIp(), canalInitBean.getPort()), canalInitBean.getDestination(), canalInitBean.getUsername(), canalInitBean.getPassword());

        // 创建ClusterCanalConnector (基于zookeeper获取canal server ip,支持server/client的failover机制)
        CanalConnector connector = CanalConnectors.newClusterConnector(canalInitBean.getZkAddr(), canalInitBean.getDestination(), canalInitBean.getUsername(), canalInitBean.getPassword());
        int batchSize = 1000;
        try {
            connector.connect();
//            connector.subscribe(".*\\..*");
            connector.subscribe(canalInitBean.getSubscribe());
            connector.rollback();
            while (true) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }
        } finally {
            connector.disconnect();
        }

    }

    private void printEntry(List<CanalEntry.Entry> entrys) {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChage = null;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            CanalEntry.EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            // 变动的表名称
            String tableName = entry.getHeader().getTableName();
            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    delete(rowData.getBeforeColumnsList(), tableName);

                } else if (eventType == CanalEntry.EventType.INSERT) {
                    insert(rowData.getAfterColumnsList(), tableName);

                } else {
                    System.out.println("-------&gt; before");
                    System.out.println("-------&gt; after");
                    updateAfter(rowData.getAfterColumnsList(), tableName);
                }
            }
        }
    }

    private void updateAfter(List<CanalEntry.Column> columns, String tableName) {
        // 循环将字段put到json对象里面
        insert(columns, tableName);
    }

    private void delete(List<CanalEntry.Column> columns, String tableName) {

        String key = "";
        for (CanalEntry.Column column : columns) {
            if (StrUtil.isEmpty(key) && column.getIsKey()) {
                key = column.getValue();
            }
        }

        if (StrUtil.isNotEmpty(key)) {
            redisTemplate.opsForHash().delete(tableName, key);
        }
    }

    private void insert(List<CanalEntry.Column> columns, String tableName) {

        // 循环将字段put到json对象里面
        JSONObject jsonObject = new JSONObject();
        String key = "";
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            jsonObject.putOpt(column.getName(), column.getValue());
            if (StrUtil.isEmpty(key) && column.getIsKey()) {
                key = column.getValue();
            }
        }
        redisTemplate.opsForHash().put(tableName, key, jsonObject);
    }
}

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐