Redis 发布订阅功能

        常见小型项目中由于引入队列(MQ)等工具实现消息的分发和订阅模式,使项目比较臃肿,且组件较多,维护困难。所以对于比如数据中数据发生变更或首页通知信息变更时可考虑使用redis的channel来实现消息的通知。

        使用思路为先使用命令行测试redis的channel是否满足实际需要,因为命令行如果可以实现,那么只要切换为代码,然后串接过程就可以了。

       常见的消息发布订阅过程如下:

  • 发布者和订阅者都是Redis客户端,Channel则为Redis服务器端,发布者将消息发送到某个的频道,订阅了这个频道的订阅者就能接收到这条消息。
  • Redis的这种发布订阅机制与基于主题的发布订阅类似,Channel相当于主题。

需要注意以下几点:

  • 消息的发送者与接收者之间通过 channel 绑定:channel 可以是确定的字符串,也可以基于模式匹配
  • 客户端可以订阅任意多个 channel
  • 发送者发送的消息无法持久化,所以可能会造成消息丢失
  • 由于消息无法持久化,所以,消费者无法收到在订阅 channel 之间发送的消息
  • 发送者与客户端之间的消息发送与接收不存在 ACK 机制

Redis 发布订阅功能的适用场景

由于没有消息持久化与 ACK 的保证,所以,Redis 的发布订阅功能并不可靠。这也就导致了它的应用场景很有限,建议用于实时与可靠性要求不高的场景。例如:

  • 消息推送
  • 内网环境的消息通知
  • ...

总之,Redis 发布订阅功能足够简单,如果没有过多的要求,且不想搭建 Kafka、RabbitMQ 这样的可靠型消息系统时,可以考虑尝试使用 Redis。

redis订阅代码案例:

1、添加依赖及配置:

2、编写配置读取和加载;

3、生产者和消费者创建;

4、启动加载并实现订阅;

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-redis</artifactId>
	<version>1.5.3.RELEASE</version>
</dependency>

使用JedisPool做连接池管理;

JedisPubSub中有较多的消息接收模式。可酌情使用。



import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Component
@ConfigurationProperties(prefix = "spring.redis")
@Data
public class RedisBean {

    @Value("${spring.redis.host}")
    private String host;

    @Value("${spring.redis.port}")
    private int port;

    @Value("${spring.redis.password}")
    private String password;

    @Value("${spring.redis.pool.max-idle:100}")
    private int maxIdle;

    @Value("${spring.redis.pool.max-active:100}")
    private int maxActive;

    @Value("${spring.redis.pool.min-idle:1}")
    private int minIdle;

    @Value("${spring.redis.pool.max-wait:-1}")
    private int maxWait;

}

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

@Configuration
public class RedisConfig {

    @Autowired
    private RedisBean redisBean;

    @Bean
    public JedisPool getJedisPool() {
        JedisPoolConfig config = new JedisPoolConfig();
        config.setMaxIdle(redisBean.getMaxIdle());
        config.setMaxTotal(redisBean.getMaxActive());
        config.setMaxWaitMillis(redisBean.getMaxWait());

        String password = redisBean.getPassword();
        if (password == null || "".equals(password)) {
            password = null;
        }
        //10000为超时时间
        JedisPool pool = new JedisPool(config, redisBean.getHost(), redisBean.getPort(),
                10000,
                password);
        System.out.println(">> redis初始化");
        return pool;
    }

}

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

@Component
public class ActiveThread implements ApplicationRunner {

    @Autowired
    private RedisMqThread mqThread;

    @Override
    public void run(ApplicationArguments args) {
        mqThread.start();
    }
}


import org.springframework.stereotype.Component;
import redis.clients.jedis.JedisPubSub;
@Component
public class RedisMessageHandler extends JedisPubSub {

    @Override
    public void onMessage(String channel, String message) {
        System.out.println(">> 接收到了来自 " + channel + " 的消息: " + message);
    }

    @Override
    public void onPMessage(String pattern, String channel, String message) {
        System.out.println(">> 接收到了来自 " + pattern + channel + " 的消息: " + message);
    }

}


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

@Component
public class RedisMqThread extends Thread {

    @Autowired
    private JedisPool jedisPool;

    @Autowired
    private RedisMessageHandler mqHandler;

    @Override
    public void run() {
        Jedis jedis = jedisPool.getResource();
        jedis.subscribe(mqHandler, "channel1", "channel2");
    }

}

list队列模式:

简单来说就是使用redis的lpush和rpop命令或rpush或lpop

1、添加依赖;

2、添加配置;

3、MQ接口实现

前两步省略,直接实现MQ的接口

public interface MQService {
  
  void produce(String string);
  
  void consume();

//lpop阻塞队列实现
  void blockingConsume()
}

import com.sb.service.MQService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.lang.Nullable;
import org.springframework.stereotype.Service;
  
import javax.annotation.Resource;
import java.util.List;
  
@Service
public class MQServiceImpl implements MQService {
  
  private static Logger log = LoggerFactory.getLogger(MQServiceImpl.class);
  
  private static final String MESSAGE_KEY = "message:queue";
  
  @Resource
  private RedisTemplate redisTemplate;
  
  @Override
  public void produce(String string) {
    redisTemplate.opsForList().leftPush(MESSAGE_KEY, string);
  }
  
  @Override
  public void consume() {
    String string = (String) redisTemplate.opsForList().rightPop(MESSAGE_KEY);
    log.info("consume : {}", string);
  }


public void blockingConsume() {
  List<Object> obj = redisTemplate.executePipelined(new RedisCallback<Object>() {
    @Nullable
    @Override
    public Object doInRedis(RedisConnection connection) throws DataAccessException {
      //队列没有元素会阻塞操作,直到队列获取新的元素或超时
      return connection.bLPop(TIME_OUT, MESSAGE_KEY.getBytes());
    }
  },new StringRedisSerializer());
  
  for (Object str: obj) {
    log.info("blockingConsume : {}", str);
  }
}

  
}


import com.sb.service.MQService;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
  
import javax.annotation.Resource;
  
@RestController
@RequestMapping(value="/api")
public class MQController {
  
  @Resource
  private MQService mQService;
  
  @RequestMapping(value = "/produce", method=RequestMethod.GET)
  public void produce(@RequestParam(name = "key") String key) {
    mQService.produce(key);
  }
  
  @RequestMapping(value="/consume", method=RequestMethod.GET)
  public void consume() {
    while (true) {
      mQService.consume();
    }
  }
  
}

 

 

 

其他复杂方式参考:

http://www.voidcn.com/article/p-sawoewdm-bqk.html

另外一种自定义实现方法

https://zhuanlan.zhihu.com/p/59065399

https://www.cnblogs.com/laoxia/p/11759226.html

 

redis常用命令:

http://redisdoc.com/

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐