JetCache :实现二级缓存准实时刷新
JetCache简介及特性JetCache是一个基于Java的缓存系统封装,提供统一的API和注解来简化缓存的使用。 JetCache可以原生的支持TTL、两级缓存、分布式自动刷新,还提供了Cache接口用于手工缓存操作。 当前有四个实现,RedisCache、TairCache、CaffeineCache(in memory)和一个简易的LinkedHashMapCache(in m
JetCache简介及特性
JetCache是一个基于Java的缓存系统封装,提供统一的API和注解来简化缓存的使用。 JetCache可以原生的支持TTL、两级缓存、分布式自动刷新,还提供了Cache接口用于手工缓存操作。 当前有四个实现,RedisCache、TairCache、CaffeineCache(in memory)和一个简易的LinkedHashMapCache(in memory)。
简单用法
JetCache提供了简单易用的注解。相较于springframework.cache提供的注解,JetCache增加了一些新的功能性注解:
@CacheRefresh:用于定时刷新缓存。
@CacheUpdate:用于更新缓存
@CachePenetrationProtect:当缓存访问未命中的情况下,对并发进行的加载行为进行保护。 当前版本实现的是单JVM内的保护,即同一个JVM中同一个key只有一个线程去加载,其它线程等待结果。
通过JetCache注解实现的方法缓存,缓存value的数据类型统一为String。这块想想也没什么问题。比从复杂数据结构存取数据的效率还高。
(RedissonSpringCacheManager实现的spring cache方法级别缓存,是通过Hash的结构来缓存方法的返回值的)。
JetCache本地缓存有最大元素限制,默认是100个。可配置。基于LRU淘汰
JetCache存在问题:
二级缓存结构下:local cache的一致性问题
扩展实现
JetCache设计了缓存更新消息的发布机制。保障了架构的可扩展性。新版jetcache提供了操作缓存的监听接口,如下图
基于redis 订阅发布机制实现了 CacheMessagePublisher (如果项目又消息中间件的需求,可以使用MQ)
一下是jetcache配置,上代码
remoteCache: &remoteCache
type: redis
keyConvertor: fastjson
valueEncoder: kryo
valueDecoder: kryo
host: ${spring.redis.host:localhost}
port: ${spring.redis.port:6379}
password: ${spring.redis.password}
database: ${spring.redis.database}
poolConfig:
minIdle: 5
maxIdle: 20
maxTotal: 50
jetcache:
statIntervalMinutes: 15
areaInCacheName: false
hidePackages: com.springboot.cloud
local:
# 默认1小时本地缓存
default:
type: caffeine
keyConvertor: fastjson
expireAfterWriteInMillis: 3600000
expireAfterAccessInMillis: 1800000
# 長時本地緩存,主要用于要求时效一般
longTime:
type: caffeine
keyConvertor: fastjson
expireAfterWriteInMillis: 300000
expireAfterAccessInMillis: 180000
# 短時本地緩存,主要用于要求时效较高的配置
shortTime:
type: caffeine
keyConvertor: fastjson
expireAfterWriteInMillis: 60000
expireAfterAccessInMillis: 40000
remote:
# 默认1小时的远程缓存
default:
expireAfterWriteInMillis: 3600000
<<: *remoteCache
# uri格式:redis://密码@ip:端口/redis库名?timeout=5s url[0]
#uri: redis://${spring.redis.password}@${spring.redis.password}:${spring.redis.port:6379}/7?timeout=5s
# 长时远程緩存,主要用于要求时效要求一般的集中式缓存
longTime:
expireAfterWriteInMillis: 7200000
<<: *remoteCache
# 短時远程緩存,主要用于要求时效较高的集中式缓存
shortTime:
expireAfterWriteInMillis: 300000
<<: *remoteCache
cacheMessageTopic:
synCache: synCache
2.消息发布实现,只有本地缓存或者二级缓存再发送消息。通过ConfigProvider 获取 getCacheManager().getCache获取Cache实例
package com.cc.core.jetcache.core.publisher;
import com.alicp.jetcache.Cache;
import com.alicp.jetcache.anno.support.ConfigProvider;
import com.alicp.jetcache.redis.RedisCacheConfig;
import com.alicp.jetcache.support.CacheMessage;
import com.alicp.jetcache.support.CacheMessagePublisher;
import com.cc.core.jetcache.core.entity.LocalCacheMessageEntity;
import com.cc.core.redis.core.RedisUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;
/**
* @Author: jerry.chen
* @Date: 2021/4/16 15:04
*/
@Component
@Primary
public class LocalCacheRedisPublisher implements CacheMessagePublisher {
@Autowired
ConfigProvider configProvider;
@Autowired
private RedisUtil redisUtil;
@Value("${jetcache.cacheMessageTopic.synCache}")
String topicName;
@Override
public void publish(String area, String cacheName, CacheMessage cacheMessage) {
int type = cacheMessage.getType();
/*写入不发送消息*/
if (type == CacheMessage.TYPE_PUT || type == CacheMessage.TYPE_PUT_ALL) {
return;
}
Cache cache = configProvider.getCacheManager().getCache(area, cacheName);
/*如果是远程缓存 不处理*/
if (cache.config() instanceof RedisCacheConfig) {
return;
}
LocalCacheMessageEntity entity = new LocalCacheMessageEntity();
entity.setCacheMessage(cacheMessage);
entity.setArea(area);
entity.setCacheName(cacheName);
redisUtil.publish(topicName, entity);
}
}
3.redis中的发布和订阅实现(使用RedisMessageListenerContainer 进行订阅)
/**
* 发布信息
*
* @param channel 通道
* @param message 信息
*/
public void publish(String channel, Object message) {
redisTemplate.convertAndSend(channel, message);
}
@Autowired
RedisMessageListenerContainer redisMessageListenerContainer;
public void subscriber(String channel, MessageListener messageListener){
redisMessageListenerContainer.addMessageListener(messageListener,new ChannelTopic(channel));//订阅指定频道
}
redis配置
package com.cc.core.redis.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
/**
* @Author: jerry.chen
* @Date: 2018/12/12 14:26
*/
@Configuration
public class RedisConfig {
@Bean(name = "redisTemplate")
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
template.setConnectionFactory(factory);
//key序列化方式
template.setKeySerializer(redisSerializer);
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
template.setValueSerializer(jackson2JsonRedisSerializer);
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.setHashKeySerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
@Autowired
private RedisTemplate redisTemplate;
/*监听容器*/
@Bean
RedisMessageListenerContainer container() {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisTemplate.getConnectionFactory());
return container;
}
}
4.实现监听者,通过ConfigProvider 获取 getCacheManager().getCache获取Cache实例
package com.cc.core.jetcache.core.listener;
import com.alicp.jetcache.anno.support.ConfigProvider;
import com.alicp.jetcache.support.CacheMessage;
import com.cc.core.jetcache.core.entity.LocalCacheMessageEntity;
import com.github.benmanes.caffeine.cache.Cache;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
/**
* Caffeine本地缓存过期策略
*
* @Author: jerry.chen
* @Date: 2021/4/19 16:01
*/
@Component
public class LocalCacheRedisMessageListener implements MessageListener {
@Autowired
private ConfigProvider configProvider;
@Autowired
private RedisTemplate redisTemplate;
@Override
public void onMessage(Message message, byte[] bytes) {
/*redis 配置的时候 对象序列化使用的是 Jackson2JsonRedisSerializer*/
LocalCacheMessageEntity entity = (LocalCacheMessageEntity) redisTemplate.getValueSerializer().deserialize(message.getBody());
CacheMessage cacheMessage = entity.getCacheMessage();
if (cacheMessage == null) {
return;
}
Cache localCache = (Cache) configProvider.getCacheManager().getCache(entity.getArea(), entity.getCacheName()).unwrap(Cache.class);
if (localCache == null) {
return;
}
switch (cacheMessage.getType()) {
case CacheMessage.TYPE_REMOVE: {
invalidateLocalCaches(localCache, cacheMessage.getKeys());
}
break;
case CacheMessage.TYPE_REMOVE_ALL: {
localCache.invalidateAll();
}
break;
}
}
private void invalidateLocalCaches(Cache localCache, Object[] keys) {
for (Object key : keys) {
Object value = localCache.getIfPresent(key);
if (null != value) {
localCache.invalidate(key);
}
}
}
}
5.实现消息订阅功能,监听者通过通道监听消息
package com.cc.core.jetcache.core.subscription;
import com.cc.core.jetcache.core.listener.LocalCacheRedisMessageListener;
import com.cc.core.redis.core.RedisUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* @Author: jerry.chen
* @Date: 2021/4/19 16:09
*/
@Component
public class LocalCacheSyncSubscription {
@Value("${jetcache.cacheMessageTopic.synCache}")
String topicName;
@Autowired
private RedisUtil redisUtil;
@Autowired
private LocalCacheRedisMessageListener localCacheRedisMessageListener;
@PostConstruct
public void initConsumer() {
redisUtil.subscriber(topicName, localCacheRedisMessageListener);
}
}
删除缓存的时候自动进程间同步删除
更多推荐
所有评论(0)