使用canal同步mysql数据库到redis +实践策略模式
背景为提高系统性能、减小数据库压力,需要对频繁查询的数据进行缓存处理。为保证mysql和redis数据一致,网上博客一大堆缓存同步策略,最核心的问题其实还是数据修改时的数据一致性:先更新(删除)缓存再更新数据库、先更新数据库再更新(删除)缓存、延时双删等。以上所说的这些策略主要是考虑并发场景下db-redis之间的数据最终一致,如果业务场景没有多大的并发,或缓存的数据大多是配置之类不影响业务主流程
-
背景
为提高系统性能、减小数据库压力,需要对频繁查询的数据进行缓存处理。为保证mysql和redis数据一致,网上博客一大堆缓存同步策略,最核心的问题其实还是数据修改时的数据一致性:先更新(删除)缓存再更新数据库、先更新数据库再更新(删除)缓存、延时双删等。
以上所说的这些策略主要是考虑并发场景下db-redis之间的数据最终一致,如果业务场景没有多大的并发,或缓存的数据大多是配置之类不影响业务主流程的数据,其实以上策略基本也不会有问题。如果想做到数据强一致,那就只有一种方式就是串行,但是串行必然导致性能的降低,所以是要性能还是要数据强一致就需要根据实际业务来权衡了(CAP原理的体现)。
-
canal同步缓存策略
下面要说的是另外一种缓存同步策略,使用canal同步mysql数据库到redis。
-
什么是canal?
canal是阿里推出的,官方地址:https://github.com/alibaba/canal。
-
canal 工作原理
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- canal 解析 binary log 对象(原始为 byte 流)
-
使用canal同步mysql到redis示例流程图:
由上图可知,使用canal同步mysql到redis,也是一种异步更新缓存的方案,保证数据的最终一致。为什么要使用canal?主要考虑以下几个原因:1. 项目不是新开发,使用canal对业务的代码的侵入性小;2.基于mysql binlog监听的同步实时性较高,官方给出的说法是可以准实时。
-
canal server搭建
canal server搭建主要包括以下几点:-
安装jdk,canal是由java开发,要运行canal需要安装有jdk,注意配置java环境变量
-
修改mysql配置文件 /etc/my.cnf 并重启mysql:
log-bin=mysql-bin #binlog文件名 binlog_format=row #选择row模式 server_id=1 #mysql实例id,不能和canal的slaveId重复
-
在mysql中创建canal使用的账号密码和权限(使用已有的账号密码也可以):
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
-
安装canal server:
-
下载, 项目git地址:https://github.com/alibaba/canal/releases
-
解压
-
配置项目, conf/example/instance.properties
# position info canal.instance.master.address = 127.0.0.1:3306 # mysql连接 canal.instance.dbUsername = canal # mysql账号 canal.instance.dbPassword = canal # 密码 canal.instance.defaultDatabaseName = test # 需要同步的库名 canal.instance.connectionCharset = UTF-8 # mysql编码
-
启动项目: sh bin/startup.sh (即执行bin目录下的startup.sh 脚本即可)
-
查看日志:
vim logs/canal/canal.log #canal server端运行日志
vim logs/example/example.log #canal client端连接日志
vim logs/example/meta.log #实例binlog 读取记录文件(记录变更位置,默认为新增变更)
-
-
-
开发canal client
-
官方demo地址:https://github.com/alibaba/canal/wiki/ClientExample
-
引入 canal依赖:
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.5</version> </dependency> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.protocol</artifactId> <version>1.1.5</version> </dependency>
-
配置文件中配置:
#canal服务配置 canal-server: #canal服务地址 host: 192.168.1.1 #canal服务端口 port: 11111 #canal服务 desrination名称 desrination: example #要监控的数据库名 data-base: my_test #每批处理数量 batch-size: 1000
-
canal服务启动类:
说明:DTS是阿里云提供的服务,在同步缓存这方面来说,与canal实现的功能是一样的;为保证生产的稳定性,开发环境使用canal,生产环境使用DTS。(这里可以先忽略DTS,后续有时间再说DTS的使用)/** * 通过监听mysql binlog同步redis缓存:非生产环境使用阿里云DTS,非生产环境(开发、测试、预生产)使用自搭建canal服务 */ @Slf4j @Component public class MySQL2Redis { /** * canal服务配置<测试环境用canal> **/ @Value("${canal-server.host}") private String canalServerHost; @Value("${canal-server.port}") private int canalServerPort; @Value("${canal-server.desrination}") private String canalServerDesrination; @Value("${canal-server.data-base}") private String dataBase; @Value("${canal-server.batch-size}") private int batchSizeInit; /** * 阿里DTS服务配置<生产环境用DTS> **/ @Value("${dts.accessKey}") private String accessKey; @Value("${dts.accessSecret}") private String accessSecret; @Value("${dts.subscription_instance_id}") private String subscriptionInstanceId; @Autowired private SpringContextUtil springContextUtil; @Autowired private RedisUtil redisUtil; @Autowired private TableStrategyContext tableStrategyContext; @PostConstruct public void init() { log.info(" === start ==="); String activeProfile = springContextUtil.getActiveProfile(); boolean proEvn = ActiveEvnEnums.isProEvn(activeProfile); if (proEvn) { log.info(" === 生产环境,启动DTS监听 ==="); new Thread(new DTSMySQL2RedisTask(accessKey, accessSecret, subscriptionInstanceId, tableStrategyContext)).start(); } else { log.info(" === 非生产环境,启动canal-client监听 ==="); new Thread(new CanalMySQL2RedisTask(canalServerHost, canalServerPort, canalServerDesrination, dataBase, batchSizeInit, tableStrategyContext)). start(); } } }
-
canal任务执行类:
说明:因不同的表存入缓存时可能设置的key不同以及缓存查策略不同,因此这里使用策略模式,不同的表同步缓存时单独处理,策略的key定义为表名,有新的表要实现同步缓存时,只需要新增一个策略就行,不用改动其他代码,保证代码良好的扩展性。
/** * @author simon * @version 1.0.0 * @ClassName com.dhzhly.cache.client.synchronous.cache.CanalMySQL2RedisTask * @Description * @createTime 2022/2/23 */ @Slf4j public class CanalMySQL2RedisTask implements Runnable { private String canalServerHost; private int canalServerPort; private String canalServerDesrination; private String dataBase; private int batchSizeInit; //策略上下文 private TableStrategyContext tableStrategyContext; public CanalMySQL2RedisTask(String canalServerHost, int canalServerPort, String canalServerDesrination, String dataBase, int batchSizeInit, TableStrategyContext tableStrategyContext) { this.canalServerHost = canalServerHost; this.canalServerPort = canalServerPort; this.canalServerDesrination = canalServerDesrination; this.dataBase = dataBase; this.batchSizeInit = batchSizeInit; this.tableStrategyContext = tableStrategyContext; } @Override public void run() { //创建客户端 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalServerHost, canalServerPort), canalServerDesrination, Constants.BLANK_STR, Constants.BLANK_STR); try { //批量个数 int batchSize = batchSizeInit; //连接 connector.connect(); //过滤监听的库-表 connector.subscribe(dataBase + ".*"); while (true) { // 获取指定数量的数据 Message message = connector.getWithoutAck(batchSize); long batchId = message.getId(); int size = message.getEntries().size(); log.info("batchId = {},size = {}", batchId, size); List<CanalEntry.Entry> entries = message.getEntries(); if (CollectionUtils.isEmpty(entries)) { try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } } else { //处理数据 handleEntry(message.getEntries()); } // 提交确认 connector.ack(batchId); } } catch (Exception e) { log.error("exception e ," + e); //异常回滚:回滚到未进行 {@link #ack} 的地方,下次fetch的时候,可以从最后一个没有 {@link #ack} 的地方开始拿 connector.rollback(); } finally { //释放连接 connector.disconnect(); } } private void handleEntry(List<CanalEntry.Entry> entrys) { for (CanalEntry.Entry entry : entrys) { if (entry.getEntryType().equals(CanalEntry.EntryType.ROWDATA)) { CanalEntry.RowChange rowChage = null; try { rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } CanalEntry.EventType eventType = rowChage.getEventType(); String tableName = entry.getHeader().getTableName(); log.info("tableName :{}, eventType : {}", tableName, eventType); /** 后续通过表名 对不同的表数据分别进行处理 **/ for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) { if (eventType == CanalEntry.EventType.DELETE) { redisDelete(rowData.getBeforeColumnsList(), tableName); } else if (eventType == CanalEntry.EventType.INSERT) { redisInsert(rowData.getAfterColumnsList(), tableName); } else { log.info("-------> before"); printColumn(rowData.getBeforeColumnsList()); log.info("-------> after"); redisUpdate(rowData.getAfterColumnsList(), tableName); } } } } } private void printColumn(List<CanalEntry.Column> columns) { for (CanalEntry.Column column : columns) { log.info(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } private void redisInsert(List<CanalEntry.Column> columns, String tableName) { JSONObject json = new JSONObject(); for (CanalEntry.Column column : columns) { json.set(FieldConvertUtil.lineToHump(column.getName()), column.getValue()); } if (columns.size() > 0) { //通过表名获取策略 TableRedisOperationStrategy resource = tableStrategyContext.getResource(tableName); if (resource != null) { resource.insert(JSONUtil.toJsonStr(json)); } } } private void redisUpdate(List<CanalEntry.Column> columns, String tableName) { JSONObject json = new JSONObject(); for (CanalEntry.Column column : columns) { json.set(FieldConvertUtil.lineToHump(column.getName()), column.getValue()); } if (columns.size() > 0) { //通过表名获取策略 TableRedisOperationStrategy resource = tableStrategyContext.getResource(tableName); if (resource != null) { resource.update(JSONUtil.toJsonStr(json)); } } } private void redisDelete(List<CanalEntry.Column> columns, String tableName) { JSONObject json = new JSONObject(); for (CanalEntry.Column column : columns) { json.set(FieldConvertUtil.lineToHump(column.getName()), column.getValue()); } if (columns.size() > 0) { //通过表名获取策略 TableRedisOperationStrategy resource = tableStrategyContext.getResource(tableName); if (resource != null) { resource.delete(JSONUtil.toJsonStr(json)); } } } }
-
下划线装驼峰工具类
/** * @author simon * @version 1.0.0 * @ClassName com.dhzhly.cache.client.common.util.FieldConvertUtil * @Description * @createTime 2022/3/4 */ public class FieldConvertUtil { private static Pattern linePattern = Pattern.compile("_(\\w)"); /** 下划线转驼峰 */ public static String lineToHump(String str) { str = str.toLowerCase(); Matcher matcher = linePattern.matcher(str); StringBuffer sb = new StringBuffer(); while (matcher.find()) { matcher.appendReplacement(sb, matcher.group(1).toUpperCase()); } matcher.appendTail(sb); return sb.toString(); } }
-
策略接口
public interface TableRedisOperationStrategy { void insert(String value); void delete(String value); void update(String value); }
-
具体策略类
/** * @author simon * @version 1.0.0 * @ClassName com.dhzhly.cache.client.synchronous.table.AccountInfoRedisOpration * @Description * @createTime 2022/3/16 */ @Component("account_info") @Slf4j public class AccountInfoRedisOpration implements TableRedisOperationStrategy { private static final String REDIS_KEY_PREFIX = "account_info::"; private static final String LOGIN_NAME_FIELD = "loginName"; @Autowired private RedisUtil redisUtil; @Override public void insert(String value) { JSONObject jsonObject = JSONUtil.parseObj(value); String loginName = jsonObject.getStr(LOGIN_NAME_FIELD); redisUtil.set(Constants.generateKey(REDIS_KEY_PREFIX, loginName), value, Constants.REFIS_KEY_TIMEOUT); } @Override public void delete(String value) { JSONObject jsonObject = JSONUtil.parseObj(value); String loginName = jsonObject.getStr(LOGIN_NAME_FIELD); redisUtil.del(Constants.generateKey(REDIS_KEY_PREFIX, loginName)); } @Override public void update(String value) { JSONObject jsonObject = JSONUtil.parseObj(value); String loginName = jsonObject.getStr(LOGIN_NAME_FIELD); redisUtil.set(Constants.generateKey(REDIS_KEY_PREFIX, loginName), value, Constants.REFIS_KEY_TIMEOUT); } }
-
策略上下文
/** * @author simon * @version 1.0.0 * @ClassName com.dhzhly.cache.client.synchronous.OperationStrategyContext * @Description * @createTime 2022/3/3 */ @Component public class TableStrategyContext { @Autowired private final Map<String, TableRedisOperationStrategy> tableStrategyMap = new ConcurrentHashMap<>(); public TableStrategyContext(Map<String, TableRedisOperationStrategy> strategyMap) { this.tableStrategyMap.clear(); strategyMap.forEach((k, v)-> this.tableStrategyMap.put(k, v)); } public TableRedisOperationStrategy getResource(String tableName){ return tableStrategyMap.get(tableName); } }
-
常量类
/** * @author simon * @version 1.0.0 * @ClassName com.dhzhly.cache.client.common.constants * @Description * @createTime 2022/2/23 */ public class Constants { public static final String BLANK_STR = ""; //缓存超时时间 7天 public static final long REFIS_KEY_TIMEOUT = 60 * 60 * 24 * 7L; public static String generateKey(String redisKey,String param) { return new StringBuffer().append(redisKey).append(param).toString(); } }
-
代码结构目录
如上图所示,如果一张新表t_student要同步缓存,只需要再写一类 StudentRedisOpration,实现策略接口TableRedisOperationStrategy,StudentRedisOpration类上加注解@Component(“t_student”)即可,其他逻辑不用修改。
-
以上就是使用canal同步mysql到redis的简单用法。至于业务代码如何加缓存,如何无代码侵入性的加缓存,下篇再说。
更多推荐
所有评论(0)