写在前面

消息队列的实现,主要有三个要点:

  • 消息保序:保证消息的顺序消费。
  • 重复消费:对重复消息的有效处理。
  • 消息可靠性:保证消息不会因为服务器宕机而消失。

对于以上三个要点,其实redis的list和stream是可以做到的。

但是很多小伙伴也会使用发布/订阅模式来实现redis的消息队列,这样真的好吗?

下面咱们就一起分析一下,list、stream、发布/订阅模式做消息队列的可行性。

基于list的消息队列解决方案

使用list实现消息队列其实有一些坑的,这里我们慢慢剖析里面的坑并加以改进。

使用list基本实现消息队列

List 本身就是按先进先出的顺序对数据进行存取的,所以,如果使用 List 作为消息队列保存消息的话,就已经能满足消息保序的需求了。

生产者可以使用 LPUSH 命令把要发送的消息依次写入 List,而消费者则可以使用 RPOP 命令,从 List 的另一端按照消息的写入顺序,依次读取消息并进行处理。

1.消费者使用while(true)不断的尝试获取消息。

while(true) {
    Object o = redisTemplate.opsForList().rightPop("list:topic");
    if(o != null){
        System.out.println(o);
    }
    System.out.println("消费者尝试获取消息");
}

2.生产者发送消息。

redisTemplate.opsForList().leftPush("list:topic", "我是订阅消息");

3.提出问题

在生产者往 List 中写入数据时,List 并不会主动地通知消费者有新消息写入,如果消费者想要及时处理消息,就需要在程序中不停地调用 RPOP 命令(比如使用一个 while(true) 循环)。如果有新消息写入,RPOP 命令就会返回结果,否则,RPOP 命令返回空值,再继续循环。

所以,即使没有新消息写入 List,消费者也要不停地调用 RPOP 命令,这就会导致消费者程序的 CPU 一直消耗在执行 RPOP 命令上,带来不必要的性能损失。

解决方案请继续往下看。

阻塞式消费,避免性能损失

Redis 提供了 BRPOP 命令。BRPOP 命令也称为阻塞式读取,客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。和消费者程序自己不停地调用 RPOP 命令相比,这种方式能节省 CPU 开销。

1.消费者优化:

使用list阻塞式读取可以阻塞式读取list中的消息,避免while(true)不断的访问redis。

while(true) {
    // 阻塞3秒钟
    Object o = redisTemplate.opsForList().rightPop("list:topic", 3000, TimeUnit.MINUTES);
    if(o != null){
        System.out.println(o);
    }
    System.out.println("消费者尝试获取消息");
}

2.提出问题

使用while(true)总感觉并不是很优雅,有什么更好的方式可以替换while(true)呢?

解决方案请继续往下看。

替换while(true)

可以使用线程池来替换while(true)。

1.消费者优化

// 带有定时功能的线程池
ScheduledExecutorService scheduler =  Executors.newScheduledThreadPool(1);

ScheduledFuture<?> scheduleTask =  scheduler.scheduleWithFixedDelay(() -> {
    // 阻塞3秒钟
    Object o = redisTemplate.opsForList().rightPop("list:topic", 3, TimeUnit.SECONDS);
    if(o != null){
        System.out.println(o);
    }
    System.out.println("消费者尝试获取消息");
    System.out.println(Thread.currentThread().getName() + "111" + new Date());
}, 1, 1,  TimeUnit.SECONDS);//1秒初始化之后执行一次,以后每1秒执行一次(频率可以适当调节)

2.提出问题

list如何解决消息重复读取问题?

虽然说list只能读取一次,但是谁都无法保证生产者因为某种原因 会不会重复的生产相同的消息。

所以,基于redis的list,消费者只能自己本身来实现消息的幂等。

解决方案请继续往下看。

实现消息幂等

消息队列要能给每一个消息提供全局唯一的 ID 号;另一方面,消费者程序要把已经处理过的消息的 ID 号记录下来。

当收到一条消息后,消费者程序就可以对比收到的消息 ID 和记录的已处理过的消息 ID,来判断当前收到的消息有没有经过处理。如果已经处理过,那么,消费者程序就不再进行处理了。这种处理特性也称为幂等性,幂等性就是指,对于同一条消息,消费者收到一次的处理结果和收到多次的处理结果是一致的。

所以,该设计与redis本身无关,需要生产者与消费者达成一致协议,每一条消息生成一个唯一ID,用来判断重复消费问题。

保证消息可靠性

设想这样一个场景:
消费者收到消息之后,还没有处理完毕,消费者宕机了怎么办?

如果是Rabbitmq、Kafka这种消息队列,是有ack机制的,但是redis的list是没有这种机制的,怎么处理?

List 类型提供了 BRPOPLPUSH 命令,这个命令的作用是让消费者程序从一个 List 中读取消息,同时,Redis 会把这个消息再插入到另一个 List(可以叫作备份 List)留存。这样一来,如果消费者程序读了消息但没能正常处理,等它重启后,就可以从备份 List 中重新读取消息并进行处理了。

// 带有定时功能的线程池
ScheduledExecutorService scheduler =  Executors.newScheduledThreadPool(1);

ScheduledFuture<?> scheduleTask =  scheduler.scheduleWithFixedDelay(() -> {
    // 阻塞3秒钟,并生成备份
    Object o = redisTemplate.opsForList().rightPopAndLeftPush("list:topic", "list:topic:back",  3, TimeUnit.SECONDS);
    if(o != null){
        System.out.println(o);
    }
    System.out.println("消费者尝试获取消息");
    System.out.println(Thread.currentThread().getName() + "111" + new Date());
}, 1, 1,  TimeUnit.SECONDS);//1秒初始化之后执行一次,以后每1秒执行一次(频率可以适当调节)

但是以上要手动实现备份中哪些已经消费,哪些未被消费,实现起来还是比较麻烦的。。。

redis中有一种数据类型——stream,可以完美实现消息队列。

基于stream的消息队列解决方案(redis消息队列终极解决方案)

关于stream基本用法请移步:redis中stream数据结构使用详解——redis最适合做消息队列的数据结构

stream可以完美实现消息保序、自动生成消息唯一id、同时对消息的可靠性也有保障,提供了ack机制。

springboot使用stream实现消息队列

1.生产者不断生产消息

Random random = new Random();
Map<String, String> content = new HashMap<>();
content.put("id", "1");
content.put("name", "zhangsan");
content.put("age", String.valueOf(random.nextInt(1000)));
redisTemplate.opsForStream().add("stream:topic", content);

2.工具类

public String createGroup(String key, String group){
    return redisTemplate.opsForStream().createGroup(key, group);
}

public String addMap(String key, Map<String, String> value){
    return redisTemplate.opsForStream().add(key, value).getValue();
}

public String addRecord(Record<String, Object> record){
    return redisTemplate.opsForStream().add(record).getValue();
}

public Long ack(String key, String group, String... recordIds){
    return redisTemplate.opsForStream().acknowledge(key, group, recordIds);
}

public Long del(String key, String... recordIds){
    return redisTemplate.opsForStream().delete(key, recordIds);
}

3.创建消费者1

此处为了简化,直接使用匿名内部类定义了个bean,实际开发时可以单独定义一个类。

/**
 * 消费者A
 */
@Bean
public StreamListener consumer1(RedisStreamUtil redisStreamUtil){
    return new StreamListener<String, MapRecord<String, String, String>>() {
        @Override
        public void onMessage(MapRecord<String, String, String> message) {
            String stream = message.getStream();
            RecordId id = message.getId();
            Map<String, String> map = message.getValue();
            System.out.println("[自动ack] group:[group-a] consumerName:[{consumer1}] 接收到一个消息 stream:[{"+stream+"}],id:[{"+id+"}],value:[{"+map+"}]");
            redisStreamUtil.ack(stream, "group-a", id.getValue());
            redisStreamUtil.del(stream, id.getValue());
        }
    };
}

4.创建消费者2

/**
 * 消费者B
 */
@Bean
public StreamListener consumer2(RedisStreamUtil redisStreamUtil){
    return new StreamListener<String, MapRecord<String, String, String>>() {
        @Override
        public void onMessage(MapRecord<String, String, String> message) {
            String stream = message.getStream();
            RecordId id = message.getId();
            Map<String, String> map = message.getValue();
            System.out.println("[不自动ack] group:[group-b] consumerName:[{consumer2}] 接收到一个消息 stream:[{"+stream+"}],id:[{"+id+"}],value:[{"+map+"}]");
            redisStreamUtil.del(stream, id.getValue());
        }
    };
}

5.配置消费者组

此处也做了部分简化,实际开发可以适当调整

@Bean(initMethod = "start", destroyMethod = "stop")
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer(StreamListener consumer1, StreamListener consumer2) {
    AtomicInteger index = new AtomicInteger(1);
    int processors = Runtime.getRuntime().availableProcessors();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(), r -> {
        Thread thread = new Thread(r);
        thread.setName("async-stream-consumer-" + index.getAndIncrement());
        thread.setDaemon(true);
        return thread;
    });

    StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
            StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                    .builder()
                    // 一次最多获取多少条消息
                    .batchSize(3)
                    // 运行 Stream 的 poll task
                    .executor(executor)
                    // Stream 中没有消息时,阻塞多长时间,需要比 `spring.redis.timeout` 的时间小
                    .pollTimeout(Duration.ofSeconds(3))
                    // 获取消息的过程或获取到消息给具体的消息者处理的过程中,发生了异常的处理
                    //.errorHandler(new StreamErrorHandler())
                    .build();

    StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer =
            StreamMessageListenerContainer.create(redisTemplate.getConnectionFactory(), options);

    // 独立消费
    String streamKey = "stream:topic";

    // 消费者也可以直接使用new的方式创建出来,但是此处为了简化,直接注入了

    // 消费组A,自动ack
    // 从消费组中没有分配给消费者的消息开始消费
    streamMessageListenerContainer.receiveAutoAck(Consumer.from("group-a", "consumer-a"),
            StreamOffset.create(streamKey, ReadOffset.lastConsumed()), consumer1);

    // 消费组B,不自动ack
    streamMessageListenerContainer.receive(Consumer.from("group-b", "consumer-a"),
            StreamOffset.create(streamKey, ReadOffset.lastConsumed()), consumer2);

    return streamMessageListenerContainer;
}

5.测试一下

(1)先手动创建出队列、消费者组

127.0.0.1:6379> xadd stream:topic * id 0 name test
"1663116545477-0"
127.0.0.1:6379> xgroup create stream:topic group-a 0-0
OK
127.0.0.1:6379> xgroup create stream:topic group-b 0-0
OK

(2)生产者发送多个消息

(3)消费者进行消费

[自动ack] group:[group-a] consumerName:[{consumer1}] 接收到一个消息 stream:[{stream:topic}],id:[{1663117003190-0}],value:[{{name=zhangsan, id=1, age=937}}]
[不自动ack] group:[group-b] consumerName:[{consumer2}] 接收到一个消息 stream:[{stream:topic}],id:[{1663117003190-0}],value:[{{name=zhangsan, id=1, age=937}}]

[自动ack] group:[group-a] consumerName:[{consumer1}] 接收到一个消息 stream:[{stream:topic}],id:[{1663117006145-0}],value:[{{name=zhangsan, id=1, age=97}}]
[不自动ack] group:[group-b] consumerName:[{consumer2}] 接收到一个消息 stream:[{stream:topic}],id:[{1663117006145-0}],value:[{{name=zhangsan, id=1, age=97}}]

[自动ack] group:[group-a] consumerName:[{consumer1}] 接收到一个消息 stream:[{stream:topic}],id:[{1663117007311-0}],value:[{{name=zhangsan, id=1, age=539}}]
[不自动ack] group:[group-b] consumerName:[{consumer2}] 接收到一个消息 stream:[{stream:topic}],id:[{1663117007311-0}],value:[{{name=zhangsan, id=1, age=539}}]

[自动ack] group:[group-a] consumerName:[{consumer1}] 接收到一个消息 stream:[{stream:topic}],id:[{1663117007934-0}],value:[{{name=zhangsan, id=1, age=905}}]
[不自动ack] group:[group-b] consumerName:[{consumer2}] 接收到一个消息 stream:[{stream:topic}],id:[{1663117007934-0}],value:[{{name=zhangsan, id=1, age=905}}]

5.未自动ack的消息,会放在pending队列中

127.0.0.1:6379> xpending stream:topic group-a - + 5
(empty array)
127.0.0.1:6379> xpending stream:topic group-b - + 5
1) 1) "1663117003190-0"
   2) "consumer-a"
   3) (integer) 54462
   4) (integer) 1
2) 1) "1663117006145-0"
   2) "consumer-a"
   3) (integer) 51507
   4) (integer) 1
3) 1) "1663117007311-0"
   2) "consumer-a"
   3) (integer) 50341
   4) (integer) 1
4) 1) "1663117007934-0"
   2) "consumer-a"
   3) (integer) 49718
   4) (integer) 1

在SpringBoot中重新消费Redis Stream中未ACK的消息

消费组从stream中获取到消息后,会分配给自己组中其中的一个消费者进行消费,消费者消费完毕,需要给消费组返回ACK,表示这条消息已经消费完毕了。

当消费者从消费组获取到消息的时候,会先把消息添加到自己的pending消息列表,当消费者给消费组返回ACK的时候,就会把这条消息从pending队列删除。(每个消费者都有自己的pending消息队列)

消费者可能没有及时的返回ACK。例如消费者消费完毕后,宕机,没有及时返回ACK,此时就会导致这条消息占用2倍的内存(stream中保存一份, 消费者的的pending消息列表中保存一份)

1.遍历消费者的pending列表,读取到未ACK的消息,直接进行ACK

遍历消费组的pending消息情况,再遍历每个消费者的pending消息id列表,再根据id,直接去stream读取这条消息,进行消费Ack。

import java.time.Duration;
import java.util.List;
import java.util.Map;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.PendingMessages;
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.test.context.junit4.SpringRunner;

import io.springboot.jwt.SpringBootJwtApplication;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringBootJwtApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT)
public class RedisStreamTest {
	
	private static final Logger LOGGER = LoggerFactory.getLogger(RedisStreamTest.class);
	
	@Autowired
	private StringRedisTemplate stringRedisTemplate;
	
	@Test
	public void test() {
		StreamOperations<String, String, String> streamOperations = this.stringRedisTemplate.opsForStream();
		
		// 获取group-b中的pending消息信息,本质上就是执行XPENDING指令
		PendingMessagesSummary pendingMessagesSummary = streamOperations.pending("stream:topic", "group-b");
		
		// 所有pending消息的数量
		long totalPendingMessages = pendingMessagesSummary.getTotalPendingMessages();
		
		// 消费组名称
		String groupName= pendingMessagesSummary.getGroupName();
		
		// pending队列中的最小ID
		String minMessageId = pendingMessagesSummary.minMessageId();
		
		// pending队列中的最大ID
		String maxMessageId = pendingMessagesSummary.maxMessageId();
		
		LOGGER.info("消费组:{},一共有{}条pending消息,最大ID={},最小ID={}", groupName, totalPendingMessages, minMessageId, maxMessageId);
		

		// 每个消费者的pending消息数量
		Map<String, Long> pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer();
		
		pendingMessagesPerConsumer.entrySet().forEach(entry -> {
			
			// 消费者
			String consumer = entry.getKey();
			// 消费者的pending消息数量
			long consumerTotalPendingMessages = entry.getValue();
			
			LOGGER.info("消费者:{},一共有{}条pending消息", consumer, consumerTotalPendingMessages);
			
			if (consumerTotalPendingMessages > 0) {
				// 读取消费者pending队列的前10条记录,从ID=0的记录开始,一直到ID最大值
				PendingMessages pendingMessages = streamOperations.pending("stream:topic", Consumer.from("group-b", consumer), Range.closed("0", "+"), 10);
				
				// 遍历所有Opending消息的详情
				pendingMessages.forEach(message -> {
					// 消息的ID
					RecordId recordId =  message.getId();
					// 消息从消费组中获取,到此刻的时间
					Duration elapsedTimeSinceLastDelivery = message.getElapsedTimeSinceLastDelivery();
					// 消息被获取的次数
					long deliveryCount = message.getTotalDeliveryCount();
					
					LOGGER.info("openg消息,id={}, elapsedTimeSinceLastDelivery={}, deliveryCount={}", recordId, elapsedTimeSinceLastDelivery, deliveryCount);
					
					/**
					 * 演示手动消费的这个判断非常的针对,目的就是要读取消费者“consumer-a”pending消息中,ID=1605524665215-0的这条消息
					 */
					if (consumer.equals("consumer1") && recordId.toString().equals("1605524665215-0")) {
						// 通过streamOperations,直接读取这条pending消息,
						List<MapRecord<String, String, String>> result = streamOperations.range("stream:topic", Range.rightOpen("1605524665215-0", "1605524665215-0"));
						
						// 开始和结束都是同一个ID,所以结果只有一条
						MapRecord<String, String, String> record = result.get(0);
						
						// 这里执行日志输出,模拟的就是消费逻辑
						LOGGER.info("消费了pending消息:id={}, value={}", record.getId(), record.getValue());
						
						// 如果手动消费成功后,往消费组提交消息的ACK
						Long retVal = streamOperations.acknowledge("group-b", record);
						LOGGER.info("消息ack,一共ack了{}条", retVal);
					}
				});
			}
		});
	}
}

最终的结果就是,消费者1的唯一一条pending消息被Ack了,这里有几个点要注意

遍历消费者pending列表时候,最小/大消息id,可以根据XPENDING指令中的结果来,我写0 - +,只是为了偷懒

遍历到消费者pending消息的时候,可以根据elapsedTimeSinceLastDelivery(idle time)和deliveryCount(delivery counter)做一些逻辑判断,elapsedTimeSinceLastDelivery越长,表示这条消息被消费了很久,都没Ack,deliveryCount表示重新投递N次后,都没被消费成功,可能是消费逻辑有问题,或者是Ack有问题。

2.通过XCLAIM改变消息的消费者

如果一个消费者,一直不能消费掉某条消息,或者说这个消费者因为某些消息,永远也不能上过线了,那么可以把这个消费者的pending消息,转移到其他的消费者pending列表中,重新消费

其实我们这里要做的事情,就是把“消费者2”的唯一1条pending消息“ 1605524657157-0”(message2),交给“消费者1”,重新进行消费。

import java.time.Duration;
import java.util.List;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.stream.ByteRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.test.context.junit4.SpringRunner;

import io.springboot.jwt.SpringBootJwtApplication;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringBootJwtApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT)
public class RedisStreamTest {
	
	private static final Logger LOGGER = LoggerFactory.getLogger(RedisStreamTest.class);
	
	@Autowired
	private StringRedisTemplate stringRedisTemplate;
	
	@Test
	public void test() {
		List<ByteRecord> retVal = this.stringRedisTemplate.execute(new RedisCallback<List<ByteRecord>>() {
			@Override
			public List<ByteRecord> doInRedis(RedisConnection connection) throws DataAccessException {
				// XCLAIM 指令的实现方法
				return connection.streamCommands().xClaim("stream:topic".getBytes(), "group-b", "consumer-a", Duration.ofSeconds(10), RecordId.of("1605524657157-0"));
			}
		});
		
		for (ByteRecord byteRecord : retVal) {
			LOGGER.info("改了消息的消费者:id={}, value={}", byteRecord.getId(), byteRecord.getValue());
		}
	}
}

3.读取pending消息列表,进行消费

import java.util.List;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.test.context.junit4.SpringRunner;

import io.springboot.jwt.SpringBootJwtApplication;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringBootJwtApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT)
public class RedisStreamTest {
	
	private static final Logger LOGGER = LoggerFactory.getLogger(RedisStreamTest.class);
	
	@Autowired
	private StringRedisTemplate stringRedisTemplate;
	
	@SuppressWarnings("unchecked")
	@Test
	public void test() {
		
		StreamOperations<String, String, String> streamOperations = this.stringRedisTemplate.opsForStream();
		
		// 从消费者的pending队列中读取消息
		List<MapRecord<String, String, String>>  retVal = streamOperations.read(Consumer.from("group-b", "consumer1"), StreamOffset.create("stream:topic", ReadOffset.from("0")));
		
		// 遍历消息
		for (MapRecord<String, String, String> record : retVal ) {
			// 消费消息
			LOGGER.info("消息id={}, 消息value={}", record.getId(), record.getValue());
			// 手动ack消息
			streamOperations.acknowledge("group-b", record);
		}
	}
}

4.注意opsForStream().read和opsForStream().pending

read方法也是可以读到pending中的数据的。

pending方法貌似只能读取基本信息,读不到数据。。。

所以,读取pending中的数据,使用read方法即可。

基于发布/订阅的消息队列解决方案

pub-sub 是redis官方支持的一种发布订阅模式。

关于redis发布订阅模式的基本使用请看该文:redis发布订阅模式详解

由上可以看出,发布订阅模式虽然可以完美实现生产者发布消息-消费者消费消息,但是有一个致命缺点:消息无法持久化,也就是说,当消费者宕机或者掉线的时候,这个期间所有的消息都会丢失。

所以,发布订阅模式若果一定要用来做消息队列,一定要注意消息丢失这个风险。

springboot实现发布-订阅

1.创建redis监听类

该类可以单独抽出来,这里为了简化使用了内部类。

/**
 * redis订阅监听类
 */
@Bean
public MessageListener subscriber(){
    return new MessageListener() {
        @Override
        public void onMessage(Message message, byte[] pattern) {
            String messages = (String) redisTemplate.getValueSerializer().deserialize(message.getBody());
            System.out.println("收到订阅消息:" + messages);
        }
    };
}

2.注册redis订阅监听类

主题通道就是需要监听的channel,消费者需要与生产者统一。

/**
 * 注册一个RedisMessageListenerContainer用来监听redis消息
 */
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(MessageListener subscriber) {
    //创建一个消息监听对象
    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    //将监听对象放入到容器中(如果是单redis数据源,可以直接在注入RedisConnectionFactory redisConnectionFactory)
    container.setConnectionFactory(redisTemplate.getConnectionFactory());
    //一个订阅者对应一个主题通道信息,可以设置多个订阅者
    container.addMessageListener(subscriber, new PatternTopic("test:topic"));
    return container;
}

3.生产者发布消息

直接使用convertAndSend发送消息即可。

redisTemplate.convertAndSend("test:topic", "我是订阅消息");

总结

其实,关于 Redis 是否适合做消息队列,业界一直是有争论的。很多人认为,要使用消息队列,就应该采用 Kafka、RabbitMQ 这些专门面向消息队列场景的软件,而 Redis 更加适合做缓存。

Redis 是一个非常轻量级的键值数据库,部署一个 Redis 实例就是启动一个进程,部署 Redis 集群,也就是部署多个 Redis 实例。而 Kafka、RabbitMQ 部署时,涉及额外的组件,例如 Kafka 的运行就需要再部署 ZooKeeper。相比 Redis 来说,Kafka 和 RabbitMQ 一般被认为是重量级的消息队列。

所以,关于是否用 Redis 做消息队列的问题,不能一概而论,我们需要考虑业务层面的数据体量,以及对性能、可靠性、可扩展性的需求。如果分布式系统中的组件消息通信量不大,那么,Redis 只需要使用有限的内存空间就能满足消息存储的需求,而且,Redis 的高性能特性能支持快速的消息读写,不失为消息队列的一个好的解决方案。

参考资料

在SpringBoot中使用RedisTemplate重新消费Redis Stream中未ACK的消息
https://blog.csdn.net/u011320413/article/details/124710384

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐