前言

最近对接之前集成的支付组件,以前是通过推送websocket信息到前端,前端页面收到信息后回调接口,实现充值结果同步。但是如果用户支付了,关掉了H5页面,那还回调个鬼,数据就不同步了。所以这次堵上这个漏洞,但是又不想上消息中间件,依赖太多了额,以后私有化部署、运维成本太高,而且下周就要上线功能,时间太紧,所以就想到用redis的消息订阅发布来处理。
另外我这边是微服务的,现在的场景是支付模块收到微信支付的通知后,推送websocket信息到前端,同时发布redis信息,其他一个模块收到redis信息后,进行解析内容做充值后的回调。

一、前置环境

这就是要集成redis,这里就不说了,可以见之前的博文。

二、redis发布订阅使用步骤

1.信息发布

这个其实非常简单,本质上其实就是调用的redisTemplate的convertAndSend()方法。
我这里封装了一个redis的工具类,一些常用的方法都在里面,今天就是只在最下面加入这个方法的封装即可。

import ch.qos.logback.core.net.ssl.ConfigurableSSLServerSocketFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.util.CollectionUtils;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/**
 * @author zhengwen
 */
@Slf4j
public class RedisUtil {

  private RedisTemplate<String, Object> redisTemplate;

  public void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) {
    this.redisTemplate = redisTemplate;
  }
  //=============================common============================

  /**
   * 指定缓存失效时间
   *
   * @param key  键
   * @param time 时间(秒)
   * @return
   */
  public boolean expire(String key, long time) {
    try {
      if (time > 0) {
        redisTemplate.expire(key, time, TimeUnit.SECONDS);
      }
      return true;
    } catch (Exception e) {
      e.printStackTrace();
      return false;
    }
  }

  /**
   * 根据key 获取过期时间
   *
   * @param key 键 不能为null
   * @return 时间(秒) 返回0代表为永久有效
   */
  public long getExpire(String key) {
    return redisTemplate.getExpire(key, TimeUnit.SECONDS);
  }

  /**
   * 判断key是否存在
   *
   * @param key 键
   * @return true 存在 false不存在
   */
  public boolean hasKey(String key) {
    try {
      return redisTemplate.hasKey(key);
    } catch (Exception e) {
      e.printStackTrace();
      return false;
    }
  }

  /**
   * 删除缓存
   *
   * @param key 可以传一个值 或多个
   */
  @SuppressWarnings("unchecked")
  public void del(String... key) {
    if (key != null && key.length > 0) {
      if (key.length == 1) {
        redisTemplate.delete(key[0]);
      } else {
        List list = CollectionUtils.arrayToList(key);
        redisTemplate.delete(list);
      }
    }
  }

  //============================String=============================

  /**
   * 普通缓存获取
   *
   * @param key 键
   * @return 值
   */
  public Object get(String key) {
    if (key == null) {
      log.info("获取redis缓存数据时,key值为null");
      return null;
    } else {
      return redisTemplate.opsForValue().get(key);
    }
  }

  /**
   * 普通缓存获取
   *
   * @param key 键
   * @return 值
   */
  public String getString(String key) {
    Object val = this.get(key);
    if (val != null) {
      String valStr = val.toString();
      if (valStr.startsWith("\"")) {
        return valStr.replaceAll("\"", "");
      } else {
        return valStr;
      }
    }
    return null;
  }

  /**
   * 普通缓存放入
   *
   * @param key   键
   * @param value 值
   * @return true成功 false失败
   */
  public boolean set(String key, Object value) {
    try {
      redisTemplate.opsForValue().set(key, value);
      return true;
    } catch (Exception e) {
      e.printStackTrace();
      return false;
    }

  }

  /**
   * 普通缓存放入并设置时间
   *
   * @param key   键
   * @param value 值
   * @param time  时间(秒) time要大于0 如果time小于等于0 将设置无限期
   * @return true成功 false 失败
   */
  public boolean set(String key, Object value, long time) {
    try {
      if (time > 0) {
        redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
      } else {
        set(key, value);
      }
      return true;
    } catch (Exception e) {
      e.printStackTrace();
      return false;
    }
  }

  /**
   * 递增
   *
   * @param key 键
   * @param by  要增加几(大于0)
   * @return
   */
  public long incr(String key, long delta) {
    if (delta < 0) {
      throw new RuntimeException("递增因子必须大于0");
    }
    return redisTemplate.opsForValue().increment(key, delta);
  }

  /**
   * 递减
   *
   * @param key   键
   * @param delta 要减少几(小于0)
   * @return
   */
  public long decr(String key, long delta) {
    if (delta < 0) {
      throw new RuntimeException("递减因子必须大于0");
    }
    return redisTemplate.opsForValue().increment(key, -delta);
  }

  //================================Map=================================

  /**
   * HashGet
   *
   * @param key  键 不能为null
   * @param item 项 不能为null
   * @return 值
   */
  public Object hget(String key, String item) {
    return redisTemplate.opsForHash().get(key, item);

  }

  /**
   * 获取hashKey对应的所有键值
   *
   * @param key 键
   * @return 对应的多个键值
   */
  public Map<Object, Object> hmget(String key) {
    return redisTemplate.opsForHash().entries(key);
  }

  /**
   * HashSet
   *
   * @param key 键
   * @param map 对应多个键值
   * @return true 成功 false 失败
   */
  public boolean hmset(String key, Map<String, Object> map) {
    try {
      redisTemplate.opsForHash().putAll(key, map);
      return true;
    } catch (Exception e) {
      e.printStackTrace();
      return false;
    }
  }

  /**
   * HashSet 并设置时间
   *
   * @param key  键
   * @param map  对应多个键值
   * @param time 时间(秒)
   * @return true成功 false失败
   */
  public boolean hmset(String key, Map<String, Object> map, long time) {
    try {
      redisTemplate.opsForHash().putAll(key, map);
      if (time > 0) {
        expire(key, time);
      }
      return true;
    } catch (Exception e) {
      e.printStackTrace();
      return false;
    }
  }

  /**
   * 向一张hash表中放入数据,如果不存在将创建
   *
   * @param key   键
   * @param item  项
   * @param value 值
   * @return true 成功 false失败
   */
  public boolean hset(String key, String item, Object value) {
    try {
      redisTemplate.opsForHash().put(key, item, value);
      return true;
    } catch (Exception e) {
      e.printStackTrace();
      return false;
    }
  }

  /**
   * 向一张hash表中放入数据,如果不存在将创建
   *
   * @param key   键
   * @param item  项
   * @param value 值
   * @param time  时间(秒)  注意:如果已存在的hash表有时间,这里将会替换原有的时间
   * @return true 成功 false失败
   */
  public boolean hset(String key, String item, Object value, long time) {
    try {
      redisTemplate.opsForHash().put(key, item, value);
      if (time > 0) {
        expire(key, time);
      }
      return true;
    } catch (Exception e) {
      e.printStackTrace();
      return false;
    }
  }

  /**
   * 删除hash表中的值
   *
   * @param key  键 不能为null
   * @param item 项 可以使多个 不能为null
   */
  public void hdel(String key, Object... item) {
    redisTemplate.opsForHash().delete(key, item);
  }

  /**
   * 判断hash表中是否有该项的值
   *
   * @param key  键 不能为null
   * @param item 项 不能为null
   * @return true 存在 false不存在
   */
  public boolean hHasKey(String key, String item) {
    return redisTemplate.opsForHash().hasKey(key, item);
  }

  /**
   * hash递增 如果不存在,就会创建一个 并把新增后的值返回
   *
   * @param key  键
   * @param item 项
   * @param by   要增加几(大于0)
   * @return
   */
  public double hincr(String key, String item, double by) {
    return redisTemplate.opsForHash().increment(key, item, by);
  }

  /**
   * hash递减
   *
   * @param key  键
   * @param item 项
   * @param by   要减少记(小于0)
   * @return
   */
  public double hdecr(String key, String item, double by) {
    return redisTemplate.opsForHash().increment(key, item, -by);
  }

  //============================set=============================

  /**
   * 根据key获取Set中的所有值
   *
   * @param key 键
   * @return
   */
  public Set<Object> sGet(String key) {
    try {
      return redisTemplate.opsForSet().members(key);
    } catch (Exception e) {
      e.printStackTrace();
      return null;
    }
  }

  /**
   * 根据value从一个set中查询,是否存在
   *
   * @param key   键
   * @param value 值
   * @return true 存在 false不存在
   */
  public boolean sHasKey(String key, Object value) {
    try {
      return redisTemplate.opsForSet().isMember(key, value);
    } catch (Exception e) {
      e.printStackTrace();
      return false;
    }
  }

  /**
   * 将数据放入set缓存
   *
   * @param key    键
   * @param values 值 可以是多个
   * @return 成功个数
   */
  public long sSet(String key, Object... values) {
    try {
      return redisTemplate.opsForSet().add(key, values);
    } catch (Exception e) {
      e.printStackTrace();
      return 0;
    }
  }

  /**
   * 将set数据放入缓存
   *
   * @param key    键
   * @param time   时间(秒)
   * @param values 值 可以是多个
   * @return 成功个数
   */
  public long sSetAndTime(String key, long time, Object... values) {
    try {
      Long count = redisTemplate.opsForSet().add(key, values);
      if (time > 0) {
        expire(key, time);
      }
      return count;
    } catch (Exception e) {
      e.printStackTrace();
      return 0;
    }
  }

  /**
   * 获取set缓存的长度
   *
   * @param key 键
   * @return
   */
  public long sGetSetSize(String key) {
    try {
      return redisTemplate.opsForSet().size(key);
    } catch (Exception e) {
      e.printStackTrace();
      return 0;
    }
  }

  /**
   * 移除值为value的
   *
   * @param key    键
   * @param values 值 可以是多个
   * @return 移除的个数
   */
  public long setRemove(String key, Object... values) {
    try {
      Long count = redisTemplate.opsForSet().remove(key, values);
      return count;
    } catch (Exception e) {
      e.printStackTrace();
      return 0;
    }
  }
  //===============================list=================================

  /**
   * 获取list缓存的内容
   *
   * @param key   键
   * @param start 开始
   * @param end   结束  0 到 -1代表所有值
   * @return
   */
  public List<Object> lGet(String key, long start, long end) {
    try {
      return redisTemplate.opsForList().range(key, start, end);
    } catch (Exception e) {
      e.printStackTrace();
      return null;
    }
  }

  /**
   * 获取list缓存的内容
   *
   * @param key 键
   * @return
   */
  public List<Object> lGet(String key) {
    try {
      return redisTemplate.opsForList().range(key, 0, -1);
    } catch (Exception e) {
      e.printStackTrace();
      return null;
    }
  }

  /**
   * 获取list缓存的长度
   *
   * @param key 键
   * @return
   */
  public long lGetListSize(String key) {
    try {
      return redisTemplate.opsForList().size(key);
    } catch (Exception e) {
      e.printStackTrace();
      return 0;
    }
  }

  /**
   * 通过索引 获取list中的值
   *
   * @param key   键
   * @param index 索引  index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
   * @return
   */
  public Object lGetIndex(String key, long index) {
    try {
      return redisTemplate.opsForList().index(key, index);
    } catch (Exception e) {
      e.printStackTrace();
      return null;
    }
  }

  /**
   * 将list放入缓存
   *
   * @param key   键
   * @param value 值
   * @return
   */
  public boolean lSet(String key, Object value) {
    try {
      redisTemplate.opsForList().rightPush(key, value);
      return true;
    } catch (Exception e) {
      e.printStackTrace();
      return false;
    }
  }

  /**
   * 将list放入缓存
   *
   * @param key   键
   * @param value 值
   * @param time  时间(秒)
   * @return
   */
  public boolean lSet(String key, Object value, long time) {
    try {
      redisTemplate.opsForList().rightPush(key, value);
      if (time > 0) {
        expire(key, time);
      }
      return true;
    } catch (Exception e) {
      e.printStackTrace();
      return false;
    }
  }

  /**
   * 将list放入缓存
   *
   * @param key   键
   * @param value 值
   * @param time  时间(秒)
   * @return
   */
  public boolean lSet(String key, List<Object> value) {
    try {
      redisTemplate.opsForList().rightPushAll(key, value);
      return true;
    } catch (Exception e) {
      e.printStackTrace();
      return false;
    }
  }

  /**
   * 将list放入缓存
   *
   * @param key   键
   * @param value 值
   * @param time  时间(秒)
   * @return
   */
  public boolean lSet(String key, List<Object> value, long time) {
    try {
      redisTemplate.opsForList().rightPushAll(key, value);
      if (time > 0) {
        expire(key, time);
      }
      return true;
    } catch (Exception e) {
      e.printStackTrace();
      return false;
    }
  }

  /**
   * 根据索引修改list中的某条数据
   *
   * @param key   键
   * @param index 索引
   * @param value 值
   * @return
   */
  public boolean lUpdateIndex(String key, long index, Object value) {
    try {
      redisTemplate.opsForList().set(key, index, value);
      return true;
    } catch (Exception e) {
      e.printStackTrace();
      return false;
    }
  }

  /**
   * 移除N个值为value
   *
   * @param key   键
   * @param count 移除多少个
   * @param value 值
   * @return 移除的个数
   */
  public long lRemove(String key, long count, Object value) {
    try {
      Long remove = redisTemplate.opsForList().remove(key, count, value);
      return remove;
    } catch (Exception e) {
      e.printStackTrace();
      return 0;
    }
  }

  /**
   * 发布信息到队列
   *
   * @param channel 队列名
   * @param message 信息
   */
  public void convertAndSend(String channel, Object message) {
    redisTemplate.convertAndSend(channel, message);
  }
}

service实现方法

@Override
    public Result<?> sendRedisMeassage(RedisMsgVo redisMsgVo) {
        Result<?> res = MsgPushRecordUtil.checkSendRedisMessageParam(redisMsgVo);
        if (res != null) {
            return res;
        }
        redisUtil.convertAndSend(redisMsgVo.getQueueName(),redisMsgVo);

        return ResultGenerator.genSuccessResult();
    }

这是我service实现类里调用的方法,对外就提供了一个redis消息发布接口,扩充到信息推送服务模块。
实际上你们不是微服务,直接就可以注入这个工具类直接调用。(为什么我的这个工具类可以注入,这还是要看原先的博文redis的集成)
对了还有这个RedisMsgVo对象

import lombok.Data;

/**
 * redis消息实体
 *
 * @author zhengwen
 **/
@Data
public class RedisMsgVo {

  /**
   * 信息id
   */
  private Long id;

  /**
   * 消息队列名称
   */
  private String queueName;

  /**
   * 发送人id
   */
  private Long senderId;
  /**
   * 接收人id
   */
  private Long receiverId;

  /**
   * 信息内容
   */
  private String content;
}

2.信息订阅

这个才是重点,这里使用监听器的方法,有博友直接用一个线程间隔1s处理,配合redis的rightPop、leftPush方法实现消息队列。感觉那样太粗糙了,还是贴合springBoot做实现。
配置监听器

import cn.hutool.core.util.ArrayUtil;
import com.fillersmart.fsihouse.commonservice.component.RedisReceiver;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;

/**
 * redis订阅监听配置
 *
 * @author zhengwen
 **/
@Component
public class RedisSubListenerConfig {

  @Value("${redis.msg.topics}")
  private String topics;

  /**
   * 初始化监听器
   *
   * @param connectionFactory
   * @param listenerAdapter
   * @return
   */
  @Bean
  RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
      MessageListenerAdapter listenerAdapter) {
    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    // new PatternTopic("这里是监听的通道的名字") 通道要和发布者发布消息的通道一致
    //我这里配置文件读取多topic配置,多个拆解成多个topic添加监听
    if (StringUtils.isNotBlank(topics)) {
      String[] topisArr = topics.split(",");
      List<PatternTopic> topicList = new ArrayList<>();
      if (ArrayUtil.isNotEmpty(topisArr)) {
        Arrays.stream(topisArr).forEach(c -> {
          PatternTopic topic = new PatternTopic(c);
          topicList.add(topic);
        });
      }
      container.addMessageListener(listenerAdapter, topicList);
    }
    return container;
  }

  /**
   * 绑定消息监听者和接收监听的方法
   *
   * @param redisReceiver
   * @return
   */
  @Bean
  MessageListenerAdapter listenerAdapter(RedisReceiver redisReceiver) {
    // redisReceiver 消息接收者
    // receiveMessage 消息接收后的方法
    return new MessageListenerAdapter(redisReceiver, "receiveMessage");
  }


  @Bean
  StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
    return new StringRedisTemplate(connectionFactory);
  }

  /**
   * 注册订阅者
   *
   * @param latch
   * @return
   */
  @Bean
  RedisReceiver receiver(CountDownLatch latch) {
    return new RedisReceiver(latch);
  }

  /**
   * 计数器,用来控制线程
   *
   * @return
   */
  @Bean
  CountDownLatch latch() {
    //指定了计数的次数 1
    return new CountDownLatch(1);
  }

}

信息订阅处理

package com.fillersmart.fsihouse.commonservice.component;

import cn.hutool.json.JSONUtil;
import com.fillersmart.fsihouse.commonservice.service.CommonService;
import com.fillersmart.fsihouse.data.core.Result;
import com.fillersmart.fsihouse.data.vo.msgpush.RedisMsgVo;
import java.util.concurrent.CountDownLatch;
import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


/***
 * 消息接收者(订阅者)  需要注入到springboot中
 * @author zhengwen
 */
@Slf4j
@Component
public class RedisReceiver {

  private CountDownLatch latch;

  @Resource
  private CommonService commonService;

  @Autowired
  public RedisReceiver(CountDownLatch latch) {
    this.latch = latch;
  }


  /**
   * 收到通道的消息之后执行的方法
   *
   * @param message
   */
  public void receiveMessage(String message) {
    //这里是收到通道的消息之后执行的方法
    log.info("common通用服务收到redis信息:" + message);
    if (JSONUtil.isJson(message)) {
      RedisMsgVo redisMsgVo = JSONUtil.toBean(message,RedisMsgVo.class);
      Result<?> res = commonService.dealRedisMsg(redisMsgVo);
      log.info("--redis消息处理结果:{}",JSONUtil.toJsonStr(res));
    }
    latch.countDown();
  }
}

忽略commonService,这个是我定义的一个service,专门用于解析收到的信息做差异化业务处理的。我们今天的目的是在信息发布方发送消息后,这里能收到消息。

总结

1、集成使用确实挺简单的
2、这里目前实现的就是一个广播,其实还可以做单点发生信息
3、这里的监听器还有复杂点的stream监听器StreamMessageListenerContainer,但是这个据说不支持jedis,下次亲测了分享。官方原话这样说的:
Redis Stream support is currently only available through the Lettuce client as it is not yet supported by Jedis.
4、用redis配合StreamMessageListenerContainer做信息发布订阅,最好redis装最新版本,至少6+,以前的版本不稳定。
5、其实我们这个场景是模块间通讯,我们也可以用Spring Cloud Bus,这个配合消息中间件也是贼牛
6、目前这个还没有信息抛弃策略,也得下次了
好了,就到这里,希望能帮到大家。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐