深入浅出Spring Boot整合RocketMQ
Spring Cloud整合RocketMQ
前言
安装教程 深入浅出RocketMQ安装和部署
快速上手
1.添加依赖
第一个是原生依赖,第二个是spring-boot-starter,这里我们添加第二个依赖。
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
在rocketmq-spring-boot-starter中已经包含rocketmq-client依赖
2.配置文件properties
//1
rocketmq.name-server=127.0.0.1:9876
//2
rocketmq.producer.group=provider
生产者配置两条,消费者只需要配置第一条。生产者缺失第二条会报错,错误原因后面会分析。
3.实现消费者
写一个消费消息的实现类,这里我们接受消息来删除Redis中的一个key。
@Service
@RocketMQMessageListener(consumerGroup = RedisKeyListener.GROUP,
topic = RedisKeyListener.TOPIC,
consumeMode = ConsumeMode.ORDERLY)
public class RedisKeyListener implements RocketMQListener<String> {
public static final String GROUP = "redis_group";
public static final String TOPIC = "redis_topic";
private static final Logger logger = LoggerFactory.getLogger(RedisKeyListener.class);
@Autowired
StringRedisTemplate stringRedisTemplate;
@Override
public void onMessage(String key) {
logger.info("redis consumer work for key : {} ", key);
stringRedisTemplate.delete(key);
}
}
可以通过注解中的nameServer去覆盖配置文件中的值。
4.实现发送者
写一个简单的生产者
@RequestMapping("redis")
@RestController
public class RedisKeyController {
private static final Logger logger = LoggerFactory.getLogger(RedisKeyController.class);
@Autowired
RocketMQTemplate template;
@Autowired
StringRedisTemplate stringRedisTemplate;
@GetMapping("put")
public void putKey(String key, String value) {
stringRedisTemplate.opsForValue().set(key, value);
}
@GetMapping("delete")
public void delete(String key) {
logger.info("key : {} is send to MQ ", key);
try {
template.convertAndSend(RedisConstant.TOPIC, key);
} catch (MessagingException e) {
e.printStackTrace();
}
}
}
控制台输出
c.e.r.s.b.p.c.RedisKeyController : key : 1 is send to MQ
c.e.r.s.b.c.consumer.RedisKeyListener : redis consumer work for key : 1
7.可视化界面效果图
详情
点击重复消费,后台日志会出现两条记录。
c.e.r.s.b.c.consumer.RedisKeyListener : redis consumer work for key : 1
c.e.r.s.b.c.consumer.RedisKeyListener : redis consumer work for key : 1
这样我们就已经完成了一个简单的消息收发的过程。
进阶
RocketMQAutoConfiguration中的After和Before
这几个配置类谁先谁后?
@AutoConfigureAfter({MessageConverterConfiguration.class})
@AutoConfigureBefore({RocketMQTransactionConfiguration.class})
MessageConverterConfiguration -> RocketMQAutoConfiguration -> RocketMQTransactionConfiguration,
after和before注解在自定义starter中已经分析过,这里不再赘述。
rocketmq.producer.group缺失报错原因分析
之前我们在配置生产者的时候,如果将注释1下面的代码去掉会导致一个报错。
rocketmq:
name-server: ip:端口
//1
producer:
group: rocketMQ
报错提示
Field template in com.example.rocketmqspringboot.controller.MessageSendController required a bean of type 'org.apache.rocketmq.spring.core.RocketMQTemplate' that could not be found.
The injection point has the following annotations:
- @org.springframework.beans.factory.annotation.Autowired(required=true)
根据提示可知,容器中没有RocketMQTemplate,我们找到自动配置类 RocketMQAutoConfiguration ,搜索RocketMQTemplate 后找到这么一段代码
@Bean(destroyMethod = "destroy")
@Conditional(ProducerOrConsumerPropertyCondition.class)
@ConditionalOnMissingBean(name = ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME)
public RocketMQTemplate rocketMQTemplate(RocketMQMessageConverter rocketMQMessageConverter) {
RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
if (applicationContext.containsBean(PRODUCER_BEAN_NAME)) {
rocketMQTemplate.setProducer((DefaultMQProducer) applicationContext.getBean(PRODUCER_BEAN_NAME));
}
if (applicationContext.containsBean(CONSUMER_BEAN_NAME)) {
rocketMQTemplate.setConsumer((DefaultLitePullConsumer) applicationContext.getBean(CONSUMER_BEAN_NAME));
}
rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
return rocketMQTemplate;
}
由于代码不会返回null,说明能进到这段代码,RocketMQTemplate 一定会被创建,接下来关注这两个条件注解。
ConditionalOnMissingBean是容器没有name=rocketMQTemplate时候为true,所以问题来自@Conditional
@Conditional(ProducerOrConsumerPropertyCondition.class)
ProducerOrConsumerPropertyCondition 源码如下
static class ProducerOrConsumerPropertyCondition extends AnyNestedCondition {
public ProducerOrConsumerPropertyCondition() {
super(ConfigurationPhase.REGISTER_BEAN);
}
@ConditionalOnBean(DefaultMQProducer.class)
static class DefaultMQProducerExistsCondition {
}
@ConditionalOnBean(DefaultLitePullConsumer.class)
static class DefaultLitePullConsumerExistsCondition {
}
}
AnyNestedCondition
Any:任何、任意,Nested:嵌套,Condition: 条件
Can be used to create composite conditions. 可用于创建复合条件
在ProducerOrConsumerPropertyCondition 这个条件中,DefaultMQProducer和DefaultLitePullConsumer同时返回true,AnyNestedCondition 才会为真,相当于 && 。可以理解为下面这种形式。
@ConditionalOnBean(DefaultMQProducer.class)
@ConditionalOnBean(DefaultLitePullConsumer.class)
如果这么拆分会产生注解重复问题,所以使用一个组合注解来解决这个问题。
梳理到这里我们知道,两个条件有一个为false。DefaultMQProducer条件注解如下
@ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "producer.group"})
DefaultLitePullConsumer条件注解如下
@ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "consumer.group", "consumer.topic"})
当producer.group为空时,DefaultMQProducer不会创建,导致ProducerOrConsumerPropertyCondition 为false,导致RocketMQTemplate不会被创建,所以注入失败报错。
rocketmq-spring-boot-starter
spring boot 对 rocketmq的封装jar包如下图所示
rocketmq-client
rocketmq/docs/cn/RocketMQ_Example.md
1.3 消费消息
实际构造函数如下,注意到 new AllocateMessageQueueAveragely()
那么 new AllocateMessageQueueAveragely() 是什么呢 ?
AllocateMessageQueueStrategy 接口 是 分配消息队列策略,也就是决定消费者如何消费,主要的实现类如下图所示
搜索发现使用最多的是 AllocateMessageQueueAveragely ,翻译为 平均分配消息队列 ,主要分配代码如下
@Override
public List<MessageQueue> allocate(String consumerGroup,String currentCID,
List<MessageQueue> mqAll,List<String> cidAll) {
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}
List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
return result;
}
int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}
consumer.registerMessageListener 这一段代码看起来比较重要,监听主要类如下图所示
主要子类 MessageListenerConcurrently 和 MessageListenerOrderly ,Orderly有序的,Concurrently并发的
示例用到了MessageListenerConcurrently,所以我们先研究一下
在 consumeMessage 方法中实现消费消息的逻辑。
consumer.start()
RocketMQMessageListener
源码
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RocketMQMessageListener {
//配置文件中读取
String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
//消费组
String consumerGroup();
//指定topic
String topic();
//如何选择消息
SelectorType selectorType() default SelectorType.TAG;
//根据表达式选择消息
String selectorExpression() default "*";
//消费模式
ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
//消息模式
MessageModel messageModel() default MessageModel.CLUSTERING;
@Deprecated
int consumeThreadMax() default 64;
//消费线程数
int consumeThreadNumber() default 20;
//最大重复消费次数
int maxReconsumeTimes() default -1;
//阻塞线程最大时间
long consumeTimeout() default 15L;
//回复时间
int replyTimeout() default 3000;
//替换配置文件的ACCESS_KEY_PLACEHOLDER
String accessKey() default ACCESS_KEY_PLACEHOLDER;
//替换配置文件的SECRET_KEY_PLACEHOLDER
String secretKey() default SECRET_KEY_PLACEHOLDER;
//是否开启消息追踪
boolean enableMsgTrace() default false;
//替换配置文件的TRACE_TOPIC_PLACEHOLDER
String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;
//替换配置文件的NAME_SERVER_PLACEHOLDER
String nameServer() default NAME_SERVER_PLACEHOLDER;
//替换配置文件的ACCESS_CHANNEL_PLACEHOLDER;
String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;
//是否开启tls
String tlsEnable() default "false";
//命名空间
String namespace() default "";
//并发模式重试策略
int delayLevelWhenNextConsume() default 0;
//暂停时间间隔
int suspendCurrentQueueTimeMillis() default 1000;
//关闭等待时间
int awaitTerminationMillisWhenShutdown() default 1000;
}
使用注解监听队列
当你使用RocketMQMessageListener注解监听队列的时候,在服务启动的时候会看到如下输出。
@RocketMQMessageListener(consumerGroup = MqConstant.CONSUMER_GROUP_FIRST,
topic = MqConstant.ORDER_TOPIC)
输出
a.r.s.s.DefaultRocketMQListenerContainer : running container: DefaultRocketMQListenerContainer
{consumerGroup='first', namespace='', nameServer='ip+端口', topic='order',
consumeMode=CONCURRENTLY, selectorType=TAG, selectorExpression='*', messageModel=CLUSTERING',
tlsEnable=false}
DefaultRocketMQListenerContainer中有一个start方法打印了这行输出。
@Override
public void start() {
if (this.isRunning()) {
throw new IllegalStateException("container already running. " + this.toString());
}
try {
consumer.start();
} catch (MQClientException e) {
throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
}
this.setRunning(true);
log.info("running container: {}", this.toString());
}
ConsumeMode
ORDERLY
在顺序消费的实现中,消费者这一端同样非常重要。
switch (consumeMode) {
case ORDERLY:
consumer.setMessageListener(new DefaultMessageListenerOrderly());
break;
case CONCURRENTLY:
consumer.setMessageListener(new DefaultMessageListenerConcurrently());
break;
default:
throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
}
当选择ORDERLY的时候,为我们创建DefaultMessageListenerOrderly这么一个类,源码如下
public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
@SuppressWarnings("unchecked")
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt messageExt : msgs) {
log.debug("received msg: {}", messageExt);
try {
long now = System.currentTimeMillis();
handleMessage(messageExt);
long costTime = System.currentTimeMillis() - now;
log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
} catch (Exception e) {
log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);
context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
}
CONCURRENTLY
当选择CONCURRENTLY的时候,为我们创建DefaultMessageListenerConcurrently 这么一个类,源码如下
public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {
@SuppressWarnings("unchecked")
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt messageExt : msgs) {
log.debug("received msg: {}", messageExt);
try {
long now = System.currentTimeMillis();
handleMessage(messageExt);
long costTime = System.currentTimeMillis() - now;
log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
} catch (Exception e) {
log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);
context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
RocketMQTemplate
在 RocketMQAutoConfiguration 中为我们提供了一个Bean RocketMQTemplate。
@Bean(destroyMethod = "destroy")
@Conditional(ProducerOrConsumerPropertyCondition.class)
@ConditionalOnMissingBean(name = ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME)
public RocketMQTemplate rocketMQTemplate(RocketMQMessageConverter rocketMQMessageConverter) {
RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
if (applicationContext.containsBean(PRODUCER_BEAN_NAME)) {
rocketMQTemplate.setProducer((DefaultMQProducer) applicationContext.getBean(PRODUCER_BEAN_NAME));
}
if (applicationContext.containsBean(CONSUMER_BEAN_NAME)) {
rocketMQTemplate.setConsumer((DefaultLitePullConsumer) applicationContext.getBean(CONSUMER_BEAN_NAME));
}
rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
return rocketMQTemplate;
}
默认使用 DefaultMQProducer 作为生产者,DefaultLitePullConsumer 作为消费者,客户端 主动 到 MQ 拉取消息进行消费。
DefaultLitePullConsumer
public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer
核心是 poll 方法和 fetchMessageQueues 拉取消息队列。
fetchMessageQueues
接口定义
Collection<MessageQueue> fetchMessageQueues(String topic) throws MQClientException;
实现类
@Override
public Collection<MessageQueue> fetchMessageQueues(String topic) throws MQClientException {
return this.defaultLitePullConsumerImpl.fetchMessageQueues(withNamespace(topic));
}
点进去看一下实现
public Set<MessageQueue> fetchMessageQueues(String topic) throws MQClientException {
checkServiceState();
Set<MessageQueue> result = this.mQClientFactory.getMQAdminImpl().fetchSubscribeMessageQueues(topic);
return parseMessageQueues(result);
}
checkServiceState,如果服务状态不是RUNNING,抛出一个异常状态的异常。
private void checkServiceState() {
if (this.serviceState != ServiceState.RUNNING) {
throw new IllegalStateException(NOT_RUNNING_EXCEPTION_MESSAGE);
}
}
fetchSubscribeMessageQueues
public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
try {
TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);
if (topicRouteData != null) {
Set<MessageQueue> mqList = MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
if (!mqList.isEmpty()) {
return mqList;
} else {
throw new MQClientException("Can not find Message Queue for this topic, " + topic + " Namesrv return empty", null);
}
}
} catch (Exception e) {
throw new MQClientException(
"Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST),
e);
}
throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null);
}
poll
private long pollTimeoutMillis = 1000 * 5;
@Override
public List<MessageExt> poll() {
return defaultLitePullConsumerImpl.poll(this.getPollTimeoutMillis());
}
@Override
public List<MessageExt> poll(long timeout) {
return defaultLitePullConsumerImpl.poll(timeout);
}
poll 核心实现
public synchronized List<MessageExt> poll(long timeout) {
try {
//同上检查服务状态
checkServiceState();
//参数校验
if (timeout < 0) {
throw new IllegalArgumentException("Timeout must not be negative");
}
// private boolean autoCommit = true; 默认为true
if (defaultLitePullConsumer.isAutoCommit()) {
//见下面源码分析
maybeAutoCommit();
}
//获取超时时间
long endTime = System.currentTimeMillis() + timeout;
// consumeRequestCache 是 LinkedBlockingQueue<ConsumeRequest>(); 类型阻塞队列,
//调用 poll 方法获取队列头部ConsumeRequest
ConsumeRequest consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
//如果还没到时间
if (endTime - System.currentTimeMillis() > 0) {
//consumeRequest不为空 并且 boolean dropped = false; 这个字段为 true
while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) {
//在结束时间之前,继续获取一个元素
consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
//如果超时中断循环
if (endTime - System.currentTimeMillis() <= 0) {
break;
}
}
}
//consumeRequest不为空 并且 boolean dropped = false; 这个字段为 false
if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) {
//获取消息
List<MessageExt> messages = consumeRequest.getMessageExts();
//获取消息快照移除消息
long offset = consumeRequest.getProcessQueue().removeMessage(messages);
// assignedMessageQueue = new AssignedMessageQueue();
//更新偏移
assignedMessageQueue.updateConsumeOffset(consumeRequest.getMessageQueue(), offset);
//If namespace not null , reset Topic without namespace.
this.resetTopic(messages);
return messages;
}
} catch (InterruptedException ignore) {
}
return Collections.emptyList();
}
maybeAutoCommit方法源码
private void maybeAutoCommit() {
//获取当前时间
long now = System.currentTimeMillis();
// private long nextAutoCommitDeadline = -1L;
// 如果当前时间 >= 下一次自动提交时间
if (now >= nextAutoCommitDeadline) {
//提交
commitAll();
//计算下一次自动提交时间 private long autoCommitIntervalMillis = 5 * 1000;
nextAutoCommitDeadline = now + defaultLitePullConsumer.getAutoCommitIntervalMillis();
}
}
commitAll 源码
public synchronized void commitAll() {
try {
for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) {
long consumerOffset = assignedMessageQueue.getConsumerOffset(messageQueue);
if (consumerOffset != -1) {
ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
if (processQueue != null && !processQueue.isDropped()) {
updateConsumeOffset(messageQueue, consumerOffset);
}
}
}
if (defaultLitePullConsumer.getMessageModel() == MessageModel.BROADCASTING) {
offsetStore.persistAll(assignedMessageQueue.messageQueues());
}
} catch (Exception e) {
log.error("An error occurred when update consume offset Automatically.");
}
}
ConsumeRequest 类型源码
public class ConsumeRequest {
private final List<MessageExt> messageExts;
private final MessageQueue messageQueue;
private final ProcessQueue processQueue;
public ConsumeRequest(final List<MessageExt> messageExts, final MessageQueue messageQueue,
final ProcessQueue processQueue) {
this.messageExts = messageExts;
this.messageQueue = messageQueue;
this.processQueue = processQueue;
}
public List<MessageExt> getMessageExts() {
return messageExts;
}
public MessageQueue getMessageQueue() {
return messageQueue;
}
public ProcessQueue getProcessQueue() {
return processQueue;
}
}
resetTopic源码
private void resetTopic(List<MessageExt> msgList) {
//list 判空
if (null == msgList || msgList.size() == 0) {
return;
}
//If namespace not null , reset Topic without namespace.
for (MessageExt messageExt : msgList) {
if (null != this.defaultLitePullConsumer.getNamespace()) {
messageExt.setTopic(NamespaceUtil.withoutNamespace(messageExt.getTopic(), this.defaultLitePullConsumer.getNamespace()));
}
}
}
RocketMQTemplate调用syncSendOrderly发送顺序消息
简介:如何使用RocketMQTemplate发送顺序消息(生产者端)
String uuid = UUID.randomUUID().toString();
SendResult s1 = template.syncSendOrderly(ORDER_TOPIC, uuid + ":创建", uuid);
SendResult s2 = template.syncSendOrderly(ORDER_TOPIC, uuid + ":支付", uuid);
SendResult s3 = template.syncSendOrderly(ORDER_TOPIC, uuid + ":完成", uuid);
System.out.println(uuid + "发送成功");
private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash();
SelectMessageQueueByHash源码
public class SelectMessageQueueByHash implements MessageQueueSelector {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
//参数的hashCode对队列大小进行取余
int value = arg.hashCode() % mqs.size();
//如果<0,去绝对值
if (value < 0) {
value = Math.abs(value);
}
return mqs.get(value);
}
}
arg是什么,是你调用syncSendOrderly方法传入的第三个参数hashKey,上面这段select代码返回一个具体的队列,也就是相同的hashKey会得到一个相同的队列,在实际业务中,这里传入一组消息的唯一ID即可。
小总结:使用syncSendOrderly方法可以确保一组消息发送到同一个队列中。
问题1:用syncSend方法行不行?
答:不行
@GetMapping("/syncSend")
public void syncSend() {
String uuid = UUID.randomUUID().toString();
SendResult s1 = template.syncSend(ORDER_TOPIC, uuid + ":创建");
SendResult s2 = template.syncSend(ORDER_TOPIC, uuid + ":支付");
SendResult s3 = template.syncSend(ORDER_TOPIC, uuid + ":完成");
System.out.println(uuid + "发送成功");
}
syncSend发送后输出结果,同一个id消息消费顺序无法确定
consumer 顺序消费,收到消息:f52dce62-ce3c-47ad-9b51-f3434640e915:支付
consumer 顺序消费,收到消息:07f2965a-882d-4926-9d4e-2ee1f233fa95:支付
consumer 顺序消费,收到消息:f52dce62-ce3c-47ad-9b51-f3434640e915:创建
consumer 顺序消费,收到消息:aedd6dfc-fe2e-403a-9408-bd436355f699:支付
consumer 顺序消费,收到消息:06a61fb5-f2a6-44a9-adc9-451635fdec48:支付
consumer 顺序消费,收到消息:bd0af62a-4af1-4ee3-bf26-79aa1317fb76:支付
consumer 顺序消费,收到消息:f52dce62-ce3c-47ad-9b51-f3434640e915:完成
问题2 消费者不用ConsumeMode.ORDERLY行不行
ConsumeMessageService
根据对消息顺序需求的不同,使用不同的 Service 类型
ConsumeMessageConcurrentlyService
this.consumeExecutor = new ThreadPoolExecutor(
//20
this.defaultMQPushConsumer.getConsumeThreadMin(),
//20
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
//private final BlockingQueue<Runnable> consumeRequestQueue;
this.consumeRequestQueue,
new ThreadFactoryImpl(consumeThreadPrefix));
线程池工作内容处理consumeRequest
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
}
ConsumeRequest封装了
ConsumeMessageOrderlyService
submitConsumeRequest
@Override
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume) {
if (dispathToConsume) {
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
}
}
更多推荐
所有评论(0)