目前正在做一个项目,人力资源方面的,大部分业务都与用户表息息相关,用户服务和其他各个服务相互独立,所以无法进行关联查询。并且实际业务需要,很多都需要去关联到用户表,所以比较纠结,故有如下方案:

  • 方案一:

将用户表经常使用到的字段冗余到各个业务表,用户信息修改之后,发送一个消息到mq,然后涉及到冗余用户字段的服务去订阅这个队列,然后进行修改冗余字段数据,其实这个方法也不错。
优点:此方案其他业务涉及到用户数据的不用进行关联查询,效率高
缺点:冗余字段很难维护,冗余度会随着业务不断的扩张而扩张,如果增加某个字段,很容易漏掉

  • 方案二:

利用canal,单独抽离出一个服务,此服务只用户将用户服务的用户数据,同步到其他服务对应所拆分的库,此服务采用多数据源的循环的去维护各个库中的用户表。
优点:单独抽离出一个服务,方便维护与管理,可扩展性较高,如果增加分库的服务,则增加一个数据源即可,代码基本无需改动。
缺点:每一个库都要建立一个用户表,比较浪费资源,并且会使用的关联查询。

结合到代码的可扩展性和可维护性,最终考虑使用canal数据同步来实现。(本人比较懒,不想干那种重复性的工作,所以不想一样的代码,写n次。哈哈)

安装参照:安装直通车

一、首先安装canal

下载地址:https://github.com/alibaba/canal/releases
这里我选择的是1.1.4版本
在这里插入图片描述
点击直接下载。
如果地址无法访问,请加微信:osm164502,我可私发。

2、解压,修改配置文件

修改:安装目录/conf/example/instance.properties文件对应的位置

# position info
canal.instance.master.address=127.0.0.1:3306

# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

修改startup.bat的启动脚本
在这里插入图片描述

3、配置mysql
  • 找到my.ini文件

我的配置文件windows在:C:\ProgramData\MySQL\MySQL Server 5.7\my.ini,如果你的也是5.7,也是在这个位置。
打开文件,找到:Binary Logging.
新增如下配置:

log-bin=mysql-bin     #binlog文件名
binlog_format=ROW     #选择row模式

找到Server Id,设置server-id,

server-id=1 # mysql实例id,不能和canal的slaveId重复

重启mysql服务

  • 新增用户canal
    这个用户名可以改成你想改的,这里的用户密码需要和instance.properties里面的用户名密码对应!

-- 使用命令登录:mysql -u root -p
-- 创建用户 用户名:canal 密码:canal
create user 'canal'@'%' identified by 'canal';
-- 授权 *.*表示所有库
grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' identified by 'canal';
4、启动,双击startup.bat

到此canal服务就安装好了,然后根据我们的实际业务处理来了。

单独抽离一个springboot服务,来处理即可
1、新建一个springboot项目,pom文件如下:
	<parent>
	    <groupId>org.springframework.boot</groupId>
	    <artifactId>spring-boot-starter-parent</artifactId>
	    <version>2.3.1.RELEASE</version>
	</parent>

    <dependencies>
    	<dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.4</version>
        </dependency>
        
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>


        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.10</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.18</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.3.1</version>
            <exclusions>
                <exclusion>
                    <groupId>com.baomidou</groupId>
                    <artifactId>mybatis-plus-generator</artifactId>
                </exclusion>
            </exclusions>
        </dependency>


    </dependencies>
2、配置文件
server:
  port: 9999
spring:
  datasource:
    type: com.alibaba.druid.pool.DruidDataSource
    druid:
      work-server:
        #MySQL配置
        driverClassName: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://localhost:3306/db1?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai
        username: root
        password: root
      partner-server:
        #MySQL配置
        driverClassName: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://localhost:3306/db2?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai
        username: root
        password: root

# mybatis-plus配置
mybatis-plus:
  mapperLocations: classpath:mapper/*Mapper.xml
  configuration:
    lazy-loading-enabled: true
    aggressive-lazy-loading: false
    map-underscore-to-camel-case: true
  type-aliases-package: com.xxx.entity

xfr:
  canal:
    host: 127.0.0.1
    port: 11111
    subscribe: db.user_info
3、启动类
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;

@MapperScan("com.xxx.mapper") // mybatis的包扫描配置
@SpringBootApplication(exclude= {DataSourceAutoConfiguration.class})  // 排除数据源自动配置,因为我们要手动配置多个数据源,需要同步到多个库中
public class CanalApplication {

    public static void main(String[] args) {
        SpringApplication.run(CanalApplication.class, args);
    }

}
4、CanalConfig
package com.xfr.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.Message;
import com.xfr.handler.DataHandler;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.net.InetSocketAddress;
import java.util.List;

/**
 * @Author osy
 * @Date 2022/1/26
 * @Description: TODO
 **/
@Configuration
@ConfigurationProperties(prefix = "xfr.canal")
public class CanalConfig implements InitializingBean {


    /**
     * 自定义接口,面向接口编程
     */
    @Autowired
    private List<DataHandler> handlerList;

    @Value("${xfr.canal.host}")
    private String host;
    @Value("${xfr.canal.port}")
    private Integer port;
    @Value("${xfr.canal.subscribe}")
    private String subscribe;

    @Override
    public void afterPropertiesSet() throws Exception {
        // 创建链接
        InetSocketAddress inetSocketAddress = new InetSocketAddress(host, port);
        CanalConnector connector = CanalConnectors.newSingleConnector(inetSocketAddress, "example", "", "");
        try {
            //打开连接
            connector.connect();
            //订阅数据库表,全部表
            connector.subscribe(subscribe);
            //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
            connector.rollback();
            while (true) {
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(200);
                //获取批量ID
                long batchId = message.getId();
                //获取批量的数量
                int size = message.getEntries().size();
                //如果没有数据
                if (batchId == -1 || size == 0) {
                    try {
                        //线程休眠2秒
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    //如果有数据,处理数据
                    for (DataHandler dataHandler : handlerList) {
                        dataHandler.doHandler(message.getEntries());
                    }
                }
                //进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
                connector.ack(batchId);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            connector.disconnect();
        }
    }


}

4、动态数据源配置

import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;


public class DynamicDataSource  extends AbstractRoutingDataSource {

    private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();

    public static Map<Object, Object> allDataSources = new HashMap<>();

    public DynamicDataSource(DataSource defaultTargetDataSource, Map<Object, Object> targetDataSources) {
        super.setDefaultTargetDataSource(defaultTargetDataSource);
        super.setTargetDataSources(targetDataSources);
        super.afterPropertiesSet();
        allDataSources = targetDataSources;
    }

    @Override
    protected Object determineCurrentLookupKey() {
        return getDataSource();
    }

    public static void setDataSource(String dataSource) {
        contextHolder.set(dataSource);
    }

    public static String getDataSource() {
        return contextHolder.get();
    }

    public static void clearDataSource() {
        contextHolder.remove();
    }
}


import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceBuilder;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

@Configuration
@Component
public class DynamicDataSourceConfig {

    @Bean
    @ConfigurationProperties("spring.datasource.druid.work-server")
    public DataSource workServerDataSource(){
        return DruidDataSourceBuilder.create().build();
    }

    @Bean
    @ConfigurationProperties("spring.datasource.druid.partner-server")
    public DataSource partnerServerDataSource(){
        return DruidDataSourceBuilder.create().build();
    }

    @Bean
    @Primary
    public DynamicDataSource dataSource(DataSource workServerDataSource, DataSource partnerServerDataSource) {
        Map<Object, Object> targetDataSources = new HashMap<>();
        targetDataSources.put("workServer",workServerDataSource);
        targetDataSources.put("partnerServer", partnerServerDataSource);
        return new DynamicDataSource(workServerDataSource, targetDataSources);
    }
}
4、数据处理器
import com.alibaba.otter.canal.protocol.CanalEntry;

import java.util.List;

/**
 * @Author osy
 * @Date 2022/1/27
 * @Description: TODO
 **/
public interface DataHandler {
    /**
     * 处理更改的数据
     *
     * @param entrys
     */
    void doHandler(List<CanalEntry.Entry> entrys) throws Exception;
}


import com.alibaba.otter.canal.protocol.CanalEntry;
import com.google.common.base.Objects;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.xfr.datasource.DynamicDataSource;
import com.xfr.entity.UserInfoCopy;
import com.xfr.service.impl.UserInfoServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

import java.util.ArrayList;
import java.util.List;


@Service
public class SyncUserInfoHandler implements DataHandler {

    @Autowired
    private UserInfoServiceImpl userInfoService;

    @Override
    public void doHandler(List<CanalEntry.Entry> entrys) throws InvalidProtocolBufferException {
        for (CanalEntry.Entry entry : entrys) {
            String tableName = entry.getHeader().getTableName();
            CanalEntry.EntryType entryType = entry.getEntryType();
            ByteString storeValue = entry.getStoreValue();
            if (Objects.equal(entryType, CanalEntry.EntryType.ROWDATA)) {
                CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                List<UserInfoCopy> userInfoList = new ArrayList<>();
                // 删除语句
                if (Objects.equal(rowChange.getEventType(), CanalEntry.EventType.DELETE)) {
                    List<Integer> delIds = new ArrayList<>();
                    genDelIds(rowChange, delIds);
                    userInfoService.removeByIds(delIds);
                } else if (Objects.equal(rowChange.getEventType(), CanalEntry.EventType.INSERT)) {
                    genUserInfoList(rowChange, rowChange.getEventType(), userInfoList);
                    saveBatch(userInfoList);
                } else if (Objects.equal(rowChange.getEventType(), CanalEntry.EventType.UPDATE)) {
                    genUserInfoList(rowChange, rowChange.getEventType(), userInfoList);
                    if (!CollectionUtils.isEmpty(userInfoList)) {
                        userInfoService.updateBatchById(userInfoList);
                    }
                }
                userInfoList.forEach(item -> {
                    System.out.println(item);
                });
            } else {
                System.out.println("当前操作类型为: " + entryType);
            }
        }
    }

    private void saveBatch(List<UserInfoCopy> userInfoList) {
        for (Object key : DynamicDataSource.allDataSources.keySet()) {
            System.out.println("Key = " + key);
            DynamicDataSource.setDataSource((String) key);
            if (!CollectionUtils.isEmpty(userInfoList)) {
                userInfoService.saveBatch(userInfoList);
            }
        }
        DynamicDataSource.clearDataSource();
    }

    private void genDelIds(CanalEntry.RowChange rowChange, List<Integer> delIds) {
        row:for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
            column:for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
                if (Objects.equal(column.getName(), "id")) {
                    delIds.add(Integer.valueOf(column.getValue()));
                    continue column;
                }
            }
        }
    }

    private void genUserInfoList(CanalEntry.RowChange rowChange, CanalEntry.EventType eventType, List<UserInfoCopy> userInfoList) {
        if (eventType.equals(CanalEntry.EventType.UPDATE)) {
            genUpdate(rowChange, userInfoList);
        } else if (eventType.equals(CanalEntry.EventType.INSERT)) {
            genInsert(rowChange, userInfoList);
        }
    }

    public void genUpdate(CanalEntry.RowChange rowChange, List<UserInfoCopy> userInfoList) {
        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
            UserInfoCopy userInfo = new UserInfoCopy();
            for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
                if (Objects.equal(column.getName(), "id")) {
                    userInfo.setId(Integer.valueOf(column.getValue()));
                } else if (Objects.equal(column.getName(), "name") && column.getUpdated()) {
                    userInfo.setName(column.getValue());
                } else if (Objects.equal(column.getName(), "account") && column.getUpdated()) {
                    userInfo.setAccount(column.getValue());
                } else if (Objects.equal(column.getName(), "password") && column.getUpdated()) {
                    userInfo.setPassword(column.getValue());
                } else if (Objects.equal(column.getName(), "phone") && column.getUpdated()) {
                    userInfo.setPhone(column.getValue());
                } else if (Objects.equal(column.getName(), "status") && column.getUpdated()) {
                    userInfo.setStatus(Integer.valueOf(column.getValue()));
                }
            }
            userInfoList.add(userInfo);
        }
    }

    public void genInsert(CanalEntry.RowChange rowChange, List<UserInfoCopy> userInfoList) {
        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
            UserInfoCopy userInfo = new UserInfoCopy();
            for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
                if (Objects.equal(column.getName(), "id")) {
                    userInfo.setId(Integer.valueOf(column.getValue()));
                } else if (Objects.equal(column.getName(), "name")) {
                    userInfo.setName(column.getValue());
                } else if (Objects.equal(column.getName(), "account")) {
                    userInfo.setAccount(column.getValue());
                } else if (Objects.equal(column.getName(), "password")) {
                    userInfo.setPassword(column.getValue());
                } else if (Objects.equal(column.getName(), "phone")) {
                    userInfo.setPhone(column.getValue());
                }
            }
            userInfoList.add(userInfo);
        }
    }
}

实际真实业务处理在SyncUserInfoHandler类,如果需要增加其他扩展,只需要实现DataHandler接口,并实现doHandler方法即可。
源码地址

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐