springboot+数据同步框架canal,将mysql的数据同步到多个地方,比如其他库,redis,es,mq等
现在正在做一个项目,人力资源方面的,有个各个业务都用用户表息息相关,用户服务和其他各个服务相互独立,所以无法进行关联查询,并且业务需要,很多都需要去关联到用户表,所以比较纠结,故有如下方案:方案一:将用户表经常使用到的字段冗余到各个业务表,用户信息修改之后,发送一个消息到mq,然后涉及到冗余用户字段的服务去订阅这个队列,然后进行修改冗余字段数据,其实这个方法也不错。方案二:利用canal,各个服务
目前正在做一个项目,人力资源方面的,大部分业务都与用户表息息相关,用户服务和其他各个服务相互独立,所以无法进行关联查询。并且实际业务需要,很多都需要去关联到用户表,所以比较纠结,故有如下方案:
- 方案一:
将用户表经常使用到的字段冗余到各个业务表,用户信息修改之后,发送一个消息到mq,然后涉及到冗余用户字段的服务去订阅这个队列,然后进行修改冗余字段数据,其实这个方法也不错。
优点:此方案其他业务涉及到用户数据的不用进行关联查询,效率高
缺点:冗余字段很难维护,冗余度会随着业务不断的扩张而扩张,如果增加某个字段,很容易漏掉
- 方案二:
利用canal,单独抽离出一个服务,此服务只用户将用户服务的用户数据,同步到其他服务对应所拆分的库,此服务采用多数据源的循环的去维护各个库中的用户表。
优点:单独抽离出一个服务,方便维护与管理,可扩展性较高,如果增加分库的服务,则增加一个数据源即可,代码基本无需改动。
缺点:每一个库都要建立一个用户表,比较浪费资源,并且会使用的关联查询。
结合到代码的可扩展性和可维护性,最终考虑使用canal数据同步来实现。(本人比较懒,不想干那种重复性的工作,所以不想一样的代码,写n次。哈哈)
安装参照:安装直通车
一、首先安装canal
下载地址:https://github.com/alibaba/canal/releases
这里我选择的是1.1.4版本
点击直接下载。
如果地址无法访问,请加微信:osm164502,我可私发。
2、解压,修改配置文件
修改:安装目录/conf/example/instance.properties文件对应的位置
# position info
canal.instance.master.address=127.0.0.1:3306
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
修改startup.bat的启动脚本
3、配置mysql
- 找到my.ini文件
我的配置文件windows在:C:\ProgramData\MySQL\MySQL Server 5.7\my.ini,如果你的也是5.7,也是在这个位置。
打开文件,找到:Binary Logging.
新增如下配置:
log-bin=mysql-bin #binlog文件名
binlog_format=ROW #选择row模式
找到Server Id,设置server-id,
server-id=1 # mysql实例id,不能和canal的slaveId重复
重启mysql服务
- 新增用户canal
这个用户名可以改成你想改的,这里的用户密码需要和instance.properties里面的用户名密码对应!
-- 使用命令登录:mysql -u root -p
-- 创建用户 用户名:canal 密码:canal
create user 'canal'@'%' identified by 'canal';
-- 授权 *.*表示所有库
grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' identified by 'canal';
4、启动,双击startup.bat
到此canal服务就安装好了,然后根据我们的实际业务处理来了。
单独抽离一个springboot服务,来处理即可
1、新建一个springboot项目,pom文件如下:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.1.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.10</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.18</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.3.1</version>
<exclusions>
<exclusion>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-generator</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
2、配置文件
server:
port: 9999
spring:
datasource:
type: com.alibaba.druid.pool.DruidDataSource
druid:
work-server:
#MySQL配置
driverClassName: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/db1?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai
username: root
password: root
partner-server:
#MySQL配置
driverClassName: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/db2?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai
username: root
password: root
# mybatis-plus配置
mybatis-plus:
mapperLocations: classpath:mapper/*Mapper.xml
configuration:
lazy-loading-enabled: true
aggressive-lazy-loading: false
map-underscore-to-camel-case: true
type-aliases-package: com.xxx.entity
xfr:
canal:
host: 127.0.0.1
port: 11111
subscribe: db.user_info
3、启动类
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
@MapperScan("com.xxx.mapper") // mybatis的包扫描配置
@SpringBootApplication(exclude= {DataSourceAutoConfiguration.class}) // 排除数据源自动配置,因为我们要手动配置多个数据源,需要同步到多个库中
public class CanalApplication {
public static void main(String[] args) {
SpringApplication.run(CanalApplication.class, args);
}
}
4、CanalConfig
package com.xfr.canal;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.Message;
import com.xfr.handler.DataHandler;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.net.InetSocketAddress;
import java.util.List;
/**
* @Author osy
* @Date 2022/1/26
* @Description: TODO
**/
@Configuration
@ConfigurationProperties(prefix = "xfr.canal")
public class CanalConfig implements InitializingBean {
/**
* 自定义接口,面向接口编程
*/
@Autowired
private List<DataHandler> handlerList;
@Value("${xfr.canal.host}")
private String host;
@Value("${xfr.canal.port}")
private Integer port;
@Value("${xfr.canal.subscribe}")
private String subscribe;
@Override
public void afterPropertiesSet() throws Exception {
// 创建链接
InetSocketAddress inetSocketAddress = new InetSocketAddress(host, port);
CanalConnector connector = CanalConnectors.newSingleConnector(inetSocketAddress, "example", "", "");
try {
//打开连接
connector.connect();
//订阅数据库表,全部表
connector.subscribe(subscribe);
//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
connector.rollback();
while (true) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(200);
//获取批量ID
long batchId = message.getId();
//获取批量的数量
int size = message.getEntries().size();
//如果没有数据
if (batchId == -1 || size == 0) {
try {
//线程休眠2秒
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
//如果有数据,处理数据
for (DataHandler dataHandler : handlerList) {
dataHandler.doHandler(message.getEntries());
}
}
//进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}
}
4、动态数据源配置
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
public class DynamicDataSource extends AbstractRoutingDataSource {
private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();
public static Map<Object, Object> allDataSources = new HashMap<>();
public DynamicDataSource(DataSource defaultTargetDataSource, Map<Object, Object> targetDataSources) {
super.setDefaultTargetDataSource(defaultTargetDataSource);
super.setTargetDataSources(targetDataSources);
super.afterPropertiesSet();
allDataSources = targetDataSources;
}
@Override
protected Object determineCurrentLookupKey() {
return getDataSource();
}
public static void setDataSource(String dataSource) {
contextHolder.set(dataSource);
}
public static String getDataSource() {
return contextHolder.get();
}
public static void clearDataSource() {
contextHolder.remove();
}
}
import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceBuilder;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
@Configuration
@Component
public class DynamicDataSourceConfig {
@Bean
@ConfigurationProperties("spring.datasource.druid.work-server")
public DataSource workServerDataSource(){
return DruidDataSourceBuilder.create().build();
}
@Bean
@ConfigurationProperties("spring.datasource.druid.partner-server")
public DataSource partnerServerDataSource(){
return DruidDataSourceBuilder.create().build();
}
@Bean
@Primary
public DynamicDataSource dataSource(DataSource workServerDataSource, DataSource partnerServerDataSource) {
Map<Object, Object> targetDataSources = new HashMap<>();
targetDataSources.put("workServer",workServerDataSource);
targetDataSources.put("partnerServer", partnerServerDataSource);
return new DynamicDataSource(workServerDataSource, targetDataSources);
}
}
4、数据处理器
import com.alibaba.otter.canal.protocol.CanalEntry;
import java.util.List;
/**
* @Author osy
* @Date 2022/1/27
* @Description: TODO
**/
public interface DataHandler {
/**
* 处理更改的数据
*
* @param entrys
*/
void doHandler(List<CanalEntry.Entry> entrys) throws Exception;
}
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.google.common.base.Objects;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.xfr.datasource.DynamicDataSource;
import com.xfr.entity.UserInfoCopy;
import com.xfr.service.impl.UserInfoServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
@Service
public class SyncUserInfoHandler implements DataHandler {
@Autowired
private UserInfoServiceImpl userInfoService;
@Override
public void doHandler(List<CanalEntry.Entry> entrys) throws InvalidProtocolBufferException {
for (CanalEntry.Entry entry : entrys) {
String tableName = entry.getHeader().getTableName();
CanalEntry.EntryType entryType = entry.getEntryType();
ByteString storeValue = entry.getStoreValue();
if (Objects.equal(entryType, CanalEntry.EntryType.ROWDATA)) {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
List<UserInfoCopy> userInfoList = new ArrayList<>();
// 删除语句
if (Objects.equal(rowChange.getEventType(), CanalEntry.EventType.DELETE)) {
List<Integer> delIds = new ArrayList<>();
genDelIds(rowChange, delIds);
userInfoService.removeByIds(delIds);
} else if (Objects.equal(rowChange.getEventType(), CanalEntry.EventType.INSERT)) {
genUserInfoList(rowChange, rowChange.getEventType(), userInfoList);
saveBatch(userInfoList);
} else if (Objects.equal(rowChange.getEventType(), CanalEntry.EventType.UPDATE)) {
genUserInfoList(rowChange, rowChange.getEventType(), userInfoList);
if (!CollectionUtils.isEmpty(userInfoList)) {
userInfoService.updateBatchById(userInfoList);
}
}
userInfoList.forEach(item -> {
System.out.println(item);
});
} else {
System.out.println("当前操作类型为: " + entryType);
}
}
}
private void saveBatch(List<UserInfoCopy> userInfoList) {
for (Object key : DynamicDataSource.allDataSources.keySet()) {
System.out.println("Key = " + key);
DynamicDataSource.setDataSource((String) key);
if (!CollectionUtils.isEmpty(userInfoList)) {
userInfoService.saveBatch(userInfoList);
}
}
DynamicDataSource.clearDataSource();
}
private void genDelIds(CanalEntry.RowChange rowChange, List<Integer> delIds) {
row:for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
column:for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
if (Objects.equal(column.getName(), "id")) {
delIds.add(Integer.valueOf(column.getValue()));
continue column;
}
}
}
}
private void genUserInfoList(CanalEntry.RowChange rowChange, CanalEntry.EventType eventType, List<UserInfoCopy> userInfoList) {
if (eventType.equals(CanalEntry.EventType.UPDATE)) {
genUpdate(rowChange, userInfoList);
} else if (eventType.equals(CanalEntry.EventType.INSERT)) {
genInsert(rowChange, userInfoList);
}
}
public void genUpdate(CanalEntry.RowChange rowChange, List<UserInfoCopy> userInfoList) {
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
UserInfoCopy userInfo = new UserInfoCopy();
for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
if (Objects.equal(column.getName(), "id")) {
userInfo.setId(Integer.valueOf(column.getValue()));
} else if (Objects.equal(column.getName(), "name") && column.getUpdated()) {
userInfo.setName(column.getValue());
} else if (Objects.equal(column.getName(), "account") && column.getUpdated()) {
userInfo.setAccount(column.getValue());
} else if (Objects.equal(column.getName(), "password") && column.getUpdated()) {
userInfo.setPassword(column.getValue());
} else if (Objects.equal(column.getName(), "phone") && column.getUpdated()) {
userInfo.setPhone(column.getValue());
} else if (Objects.equal(column.getName(), "status") && column.getUpdated()) {
userInfo.setStatus(Integer.valueOf(column.getValue()));
}
}
userInfoList.add(userInfo);
}
}
public void genInsert(CanalEntry.RowChange rowChange, List<UserInfoCopy> userInfoList) {
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
UserInfoCopy userInfo = new UserInfoCopy();
for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
if (Objects.equal(column.getName(), "id")) {
userInfo.setId(Integer.valueOf(column.getValue()));
} else if (Objects.equal(column.getName(), "name")) {
userInfo.setName(column.getValue());
} else if (Objects.equal(column.getName(), "account")) {
userInfo.setAccount(column.getValue());
} else if (Objects.equal(column.getName(), "password")) {
userInfo.setPassword(column.getValue());
} else if (Objects.equal(column.getName(), "phone")) {
userInfo.setPhone(column.getValue());
}
}
userInfoList.add(userInfo);
}
}
}
实际真实业务处理在SyncUserInfoHandler类,如果需要增加其他扩展,只需要实现DataHandler接口,并实现doHandler方法即可。
源码地址
更多推荐
所有评论(0)