redis键空间通知(keyspace notification)

Redis键空间通知

redis 事件的影响del、expired 发布一个通知

pub/sub发布/订阅

1 事件类型

键空间通知的实现是为每一个影响Redis数据空间的操作发送两个不同类型的事件。例如,在数据库0中名为mykey的键上执行DEL操作,将触发两条消息的传递,完全等同于下面两个PUBLISH命令:

PUBLISH __keyspace@0__:mykey del
PUBLISH __keyevent@0__:del mykey

以上很容易看到,一个频道允许监听所有以键mykey为目标的所有事件,以及另一个频道允许获取有关所有DEL操作目标键的信息。第一种事件,在频道中使用keyspace前缀的被叫做键空间通知,第二种,使用keyevent前缀的,被叫做键事件通知。在以上例子中,为键mykey生成了一个del事件。 会发生什么:

  • 键空间频道接收到的消息是事件的名称。
  • 键事件频道接收到的消息是键的名称。

可以只启用其中一种通知,以便只传递我们感兴趣的事件子集。

2 key-event 通知订阅机制

  1. 在redis配置文件中开启键空间事件通知

默认情况下,键空间事件通知是不启用的,因为虽然不太明智,但该功能会消耗一些CPU。可以使用redis.conf中的notify-keyspace-events或者使用CONFIG SET命令来开启通知。将参数设置为空字符串会禁用通知。

为了开启通知功能,使用了一个非空字符串,由多个字符组成,每一个字符都有其特殊的含义,具体参见下表:

K     键空间事件,以__keyspace@<db>__前缀发布。
E     键事件事件,以__keyevent@<db>__前缀发布。
g     通用命令(非类型特定),如DEL,EXPIRE,RENAME等等
$     字符串命令
l     列表命令
s     集合命令
h     哈希命令
z     有序集合命令
x     过期事件(每次键到期时生成的事件)
e     被驱逐的事件(当一个键由于达到最大内存而被驱逐时产生的事件)
A     g$lshzxe的别名,因此字符串AKE表示所有的事件。

在这里插入图片描述

注意:在windows下的配置中把多余的空格去掉,格式和官方保持一致。

  1. 在redis客户端中订阅通知
psubscribe __keyevent@0__:expired

psubscribe 关键词 订阅

'_ _key*_ _*' 表达式通配符
在这里插入图片描述

3 过期事件的时间安排

设置了生存时间的键由Redis以两种方式过期:

  • 当命令访问键时,发现键已过期。
  • 通过后台系统在后台逐步查找过期的键,以便能够收集那些从未被访问的键。

当通过以上系统之一访问键且发现键已经过期时,将生成expired事件。因此无法保证Redis服务器在键过期的那一刻同时生成expired事件。如果没有命令不断地访问键,并且有很多键都有关联的TTL,那么在键的生存时间降至零到生成expired事件之间,将会有明显的延迟。基本上,expired事件是在Redis服务器删除键的时候生成的,而不是在理论上生存时间达到零值时生成的。

4 用代码实现发布订阅

  1. redis依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
  1. 添加配置
spring:
  redis:
    host: localhost
    port: 6379
    # 7号库归还库存、优惠卷
    database: 7
    # 设置订阅哪种类型的通知(监听7号库所有键的过期通知)
    listen-pattern: __keyevent@7__:expired
  1. Redis消息订阅配置类
package com.lin.missyou.manager.redis;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;

@Configuration
public class MessageListenerConfiguration {

    @Value("${spring.redis.listen-pattern}")
    public String pattern;


    /**
     * redis消息监听器容器
     * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
     * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
     * @param redisConnectionFactory
     * @param topicMessageListener
     * @return
     */
    @Bean
    public RedisMessageListenerContainer listenerContainer(RedisConnectionFactory redisConnectionFactory,
                                                           TopicMessageListener topicMessageListener) {
        // 创建redis消息监听器容器
        RedisMessageListenerContainer listenerContainer = new RedisMessageListenerContainer();
        // 设置Redis 连接的线程安全工厂
        listenerContainer.setConnectionFactory(redisConnectionFactory);
        // 创建一个主题  (基于模式匹配)
        Topic topic = new PatternTopic(pattern);
        //订阅了一个主题
        listenerContainer.addMessageListener(topicMessageListener, topic);

        return listenerContainer;

    }
}
  1. 监听消息
package com.lin.missyou.manager.redis;

import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;

/**
 * 监听发送的消息
 * 一个回调接口,消息监听器,用于接收发送到 channel 的消息
 */
@Component
public class TopicMessageListener implements MessageListener {
    @Override
    public void onMessage(Message message, byte[] bytes) {
        String body = new String(message.getBody());
        String channel = new String(message.getChannel());
        System.out.println(body);
        System.out.println(channel);
    }
}

5 事件发布广播机制

  • 方案一:

通过事件的触发来调用方法执行

package com.lin.missyou.manager.redis;

import com.lin.missyou.bo.OrderMessageBO;
import com.lin.missyou.service.CouponBackService;
import com.lin.missyou.service.OrderCancelService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;

/**
 * 监听发送的消息
 * 一个回调接口,消息监听器,用于接收发送到 channel 的消息
 */
@Component
public class TopicMessageListener implements MessageListener {

    @Autowired
    private OrderCancelService orderCancelService;
    @Autowired
    private CouponBackService couponBackService;


    @Override
    public void onMessage(Message message, byte[] bytes) {
        String expiredKey = new String(message.getBody());
        String topic = new String(message.getChannel());
        System.out.println(expiredKey);
        System.out.println(topic);

        OrderMessageBO orderMessageBO = new OrderMessageBO(expiredKey);
        //通过事件的触发来调用方法执行
        orderCancelService.cancel(orderMessageBO);
        couponBackService.returnBack(orderMessageBO);
    }
}

需要调用的业务类

package com.lin.missyou.service;

import com.lin.missyou.bo.OrderMessageBO;
import com.lin.missyou.core.enumeration.OrderStatus;
import com.lin.missyou.exception.http.ServerErrorException;
import com.lin.missyou.model.Order;
import com.lin.missyou.model.UserCoupon;
import com.lin.missyou.repository.OrderRepository;
import com.lin.missyou.repository.UserCouponRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.Optional;

@Service
public class CouponBackService {

    @Autowired
    private OrderRepository orderRepository;

    @Autowired
    private UserCouponRepository UserCouponRepository;

    /**
     * 先判断订单状态,在归还优惠卷
     *
     * @param orderMessageBO
     */
    @EventListener
    @Transactional
    public void returnBack(OrderMessageBO orderMessageBO) {
        Long orderId = orderMessageBO.getOrderId();
        Long userId = orderMessageBO.getUserId();
        Long couponId = orderMessageBO.getCouponId();
        if (couponId == -1) {
            return;
        }
        Order order = orderRepository.findById(orderId)
                .orElseThrow(() -> new ServerErrorException(9999));

        if (order.getStatusEnum().equals(OrderStatus.UNPAID)
                || order.getStatusEnum().equals(OrderStatus.CANCELED)) {
            UserCouponRepository.returnBack(userId, couponId);
        }
    }
}
  • 方案二:事件发布广播机制

解耦合,触发事件时不需要调用方法,运用ocp原则

package com.lin.missyou.manager.redis;

import com.lin.missyou.bo.OrderMessageBO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;

/**
 * 监听发送的消息
 * 一个回调接口,消息监听器,用于接收发送到 channel 的消息
 */
@Component
public class TopicMessageListener implements MessageListener {

    private static ApplicationEventPublisher publisher;

    @Autowired
    public void setPublisher(ApplicationEventPublisher publisher){
        TopicMessageListener.publisher = publisher;
    }


    @Override
    public void onMessage(Message message, byte[] bytes) {
        String expiredKey = new String(message.getBody());
        String topic = new String(message.getChannel());
        System.out.println(expiredKey);
        System.out.println(topic);


        OrderMessageBO orderMessageBO = new OrderMessageBO(expiredKey);
        TopicMessageListener.publisher.publishEvent(orderMessageBO);
    }
}

onMessage触发时,发布一个事件,在方法加入@EventListener注解来订阅事件,从而调用方法。

/**
 * 先判断订单状态,在归还优惠卷
 *
 * @param orderMessageBO
 */
@EventListener
@Transactional
public void returnBack(OrderMessageBO orderMessageBO) {
    Long orderId = orderMessageBO.getOrderId();
    Long userId = orderMessageBO.getUserId();
    Long couponId = orderMessageBO.getCouponId();
    if (couponId == -1) {
        return;
    }
    Order order = orderRepository.findById(orderId)
            .orElseThrow(() -> new ServerErrorException(9999));

    if (order.getStatusEnum().equals(OrderStatus.UNPAID)
            || order.getStatusEnum().equals(OrderStatus.CANCELED)) {
        UserCouponRepository.returnBack(userId, couponId);
    }
}
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐