redis键空间通知(keyspace notification)
目录redis键空间通知(keyspace notification)1 事件类型2 key-event 通知订阅机制3 过期事件的时间安排4 用代码实现发布订阅5 事件发布广播机制redis键空间通知(keyspace notification)Redis键空间通知redis 事件的影响del、expired 发布一个通知pub/sub发布/订阅1 事件类型键空间通知的实现是为每一个影响Redi
redis键空间通知(keyspace notification)
redis 事件的影响del、expired 发布一个通知
pub/sub发布/订阅
1 事件类型
键空间通知的实现是为每一个影响Redis数据空间的操作发送两个不同类型的事件。例如,在数据库0
中名为mykey
的键上执行DEL
操作,将触发两条消息的传递,完全等同于下面两个PUBLISH
命令:
PUBLISH __keyspace@0__:mykey del
PUBLISH __keyevent@0__:del mykey
以上很容易看到,一个频道允许监听所有以键mykey
为目标的所有事件,以及另一个频道允许获取有关所有DEL
操作目标键的信息。第一种事件,在频道中使用keyspace
前缀的被叫做键空间通知,第二种,使用keyevent
前缀的,被叫做键事件通知。在以上例子中,为键mykey
生成了一个del
事件。 会发生什么:
- 键空间频道接收到的消息是事件的名称。
- 键事件频道接收到的消息是键的名称。
可以只启用其中一种通知,以便只传递我们感兴趣的事件子集。
2 key-event 通知订阅机制
- 在redis配置文件中开启
键空间事件通知
默认情况下,键空间事件通知是不启用的,因为虽然不太明智,但该功能会消耗一些CPU。可以使用redis.conf中的notify-keyspace-events
或者使用CONFIG SET命令来开启通知。将参数设置为空字符串会禁用通知。
为了开启通知功能,使用了一个非空字符串,由多个字符组成,每一个字符都有其特殊的含义,具体参见下表:
K 键空间事件,以__keyspace@<db>__前缀发布。
E 键事件事件,以__keyevent@<db>__前缀发布。
g 通用命令(非类型特定),如DEL,EXPIRE,RENAME等等
$ 字符串命令
l 列表命令
s 集合命令
h 哈希命令
z 有序集合命令
x 过期事件(每次键到期时生成的事件)
e 被驱逐的事件(当一个键由于达到最大内存而被驱逐时产生的事件)
A g$lshzxe的别名,因此字符串AKE表示所有的事件。
注意:在windows下的配置中把多余的空格去掉,格式和官方保持一致。
- 在redis客户端中订阅通知
psubscribe __keyevent@0__:expired
psubscribe 关键词 订阅
'_ _key*_ _*'
表达式通配符
3 过期事件的时间安排
设置了生存时间的键由Redis以两种方式过期:
- 当命令访问键时,发现键已过期。
- 通过后台系统在后台逐步查找过期的键,以便能够收集那些从未被访问的键。
当通过以上系统之一访问键且发现键已经过期时,将生成expired
事件。因此无法保证Redis服务器在键过期的那一刻同时生成expired
事件。如果没有命令不断地访问键,并且有很多键都有关联的TTL,那么在键的生存时间降至零到生成expired
事件之间,将会有明显的延迟。基本上,expired
事件是在Redis服务器删除键的时候生成的,而不是在理论上生存时间达到零值时生成的。
4 用代码实现发布订阅
- redis依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
- 添加配置
spring:
redis:
host: localhost
port: 6379
# 7号库归还库存、优惠卷
database: 7
# 设置订阅哪种类型的通知(监听7号库所有键的过期通知)
listen-pattern: __keyevent@7__:expired
- Redis消息订阅配置类
package com.lin.missyou.manager.redis;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
@Configuration
public class MessageListenerConfiguration {
@Value("${spring.redis.listen-pattern}")
public String pattern;
/**
* redis消息监听器容器
* 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
* 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
* @param redisConnectionFactory
* @param topicMessageListener
* @return
*/
@Bean
public RedisMessageListenerContainer listenerContainer(RedisConnectionFactory redisConnectionFactory,
TopicMessageListener topicMessageListener) {
// 创建redis消息监听器容器
RedisMessageListenerContainer listenerContainer = new RedisMessageListenerContainer();
// 设置Redis 连接的线程安全工厂
listenerContainer.setConnectionFactory(redisConnectionFactory);
// 创建一个主题 (基于模式匹配)
Topic topic = new PatternTopic(pattern);
//订阅了一个主题
listenerContainer.addMessageListener(topicMessageListener, topic);
return listenerContainer;
}
}
- 监听消息
package com.lin.missyou.manager.redis;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
/**
* 监听发送的消息
* 一个回调接口,消息监听器,用于接收发送到 channel 的消息
*/
@Component
public class TopicMessageListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] bytes) {
String body = new String(message.getBody());
String channel = new String(message.getChannel());
System.out.println(body);
System.out.println(channel);
}
}
5 事件发布广播机制
- 方案一:
通过事件的触发来调用方法执行
package com.lin.missyou.manager.redis;
import com.lin.missyou.bo.OrderMessageBO;
import com.lin.missyou.service.CouponBackService;
import com.lin.missyou.service.OrderCancelService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
/**
* 监听发送的消息
* 一个回调接口,消息监听器,用于接收发送到 channel 的消息
*/
@Component
public class TopicMessageListener implements MessageListener {
@Autowired
private OrderCancelService orderCancelService;
@Autowired
private CouponBackService couponBackService;
@Override
public void onMessage(Message message, byte[] bytes) {
String expiredKey = new String(message.getBody());
String topic = new String(message.getChannel());
System.out.println(expiredKey);
System.out.println(topic);
OrderMessageBO orderMessageBO = new OrderMessageBO(expiredKey);
//通过事件的触发来调用方法执行
orderCancelService.cancel(orderMessageBO);
couponBackService.returnBack(orderMessageBO);
}
}
需要调用的业务类
package com.lin.missyou.service;
import com.lin.missyou.bo.OrderMessageBO;
import com.lin.missyou.core.enumeration.OrderStatus;
import com.lin.missyou.exception.http.ServerErrorException;
import com.lin.missyou.model.Order;
import com.lin.missyou.model.UserCoupon;
import com.lin.missyou.repository.OrderRepository;
import com.lin.missyou.repository.UserCouponRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Optional;
@Service
public class CouponBackService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private UserCouponRepository UserCouponRepository;
/**
* 先判断订单状态,在归还优惠卷
*
* @param orderMessageBO
*/
@EventListener
@Transactional
public void returnBack(OrderMessageBO orderMessageBO) {
Long orderId = orderMessageBO.getOrderId();
Long userId = orderMessageBO.getUserId();
Long couponId = orderMessageBO.getCouponId();
if (couponId == -1) {
return;
}
Order order = orderRepository.findById(orderId)
.orElseThrow(() -> new ServerErrorException(9999));
if (order.getStatusEnum().equals(OrderStatus.UNPAID)
|| order.getStatusEnum().equals(OrderStatus.CANCELED)) {
UserCouponRepository.returnBack(userId, couponId);
}
}
}
- 方案二:事件发布广播机制
解耦合,触发事件时不需要调用方法,运用ocp原则
package com.lin.missyou.manager.redis;
import com.lin.missyou.bo.OrderMessageBO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
/**
* 监听发送的消息
* 一个回调接口,消息监听器,用于接收发送到 channel 的消息
*/
@Component
public class TopicMessageListener implements MessageListener {
private static ApplicationEventPublisher publisher;
@Autowired
public void setPublisher(ApplicationEventPublisher publisher){
TopicMessageListener.publisher = publisher;
}
@Override
public void onMessage(Message message, byte[] bytes) {
String expiredKey = new String(message.getBody());
String topic = new String(message.getChannel());
System.out.println(expiredKey);
System.out.println(topic);
OrderMessageBO orderMessageBO = new OrderMessageBO(expiredKey);
TopicMessageListener.publisher.publishEvent(orderMessageBO);
}
}
当onMessage
触发时,发布一个事件,在方法加入@EventListener
注解来订阅事件,从而调用方法。
/**
* 先判断订单状态,在归还优惠卷
*
* @param orderMessageBO
*/
@EventListener
@Transactional
public void returnBack(OrderMessageBO orderMessageBO) {
Long orderId = orderMessageBO.getOrderId();
Long userId = orderMessageBO.getUserId();
Long couponId = orderMessageBO.getCouponId();
if (couponId == -1) {
return;
}
Order order = orderRepository.findById(orderId)
.orElseThrow(() -> new ServerErrorException(9999));
if (order.getStatusEnum().equals(OrderStatus.UNPAID)
|| order.getStatusEnum().equals(OrderStatus.CANCELED)) {
UserCouponRepository.returnBack(userId, couponId);
}
}
更多推荐
所有评论(0)