RocketMQ事务消息机制
rocketMQ
RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致,从而实现了可靠消息服务。
一、事务消息的实现步骤
事务消息发送步骤:
- 发送方将半事务消息发送至RocketMQ服务端。
- RocketMQ服务端将消息持久化之后,向发送方返回Ack确认消息已经发送成功。由于消息为半事务消息,在未收到生产者对该消息的二次确认前,此消息被标记成“暂不能投递”状态。
- 发送方开始执行本地事务逻辑。
- 发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到Commit 状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息,订阅方将不会接受该消息。
事务消息回查步骤:
- 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
- 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
- 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。
二、程序实现
事务消息处理类需要继承RocketMQLocalTransactionListener类。该类的executeLocalTransaction方法负责在接到 RocketMQ 服务端的Ack确认消息后执行本地方法,也就是事务消息发送步骤中的步骤3。该类的checkLocalTransaction方法负责,在断网或者是应用重启的特殊情况下,执行RocketMQ服务端的消息回查,也就是事务消息回查步骤中的步骤2。
此外,要使该类生效,还需要加**@RocketMQTransactionListener注解**。这里有个要特别注意的地方。在2.1.0版本前,这个注解有一个属性txProducerGroup,可以用多个 @RocketMQTransactionListener来监听不同的txProducerGroup来发送不同类型的事务消息到topic。但是现在在一个项目中,如果你在一个project中写了多个 @RocketMQTransactionListener,项目将不能启动,启动会报错。产生这个问题的原因据说是,当使用RocketMQTemplate并发的执行事务时,非常容易出现**"illegal state"的异常**,原因是一个TransactionProducer在执行事务时不能被共享。所以,必须使用同一个 TransactionMQProducer来发送所有类型的事务消息。当然同理也就必须使用一个侦听器处理所有的消息了。
既然必须使用同一个TransactionMQProducer,对于比较大的应用,业务场景很多,就会造成混乱。这里我给出一个方案抛砖引玉。TransactionMQProducer在发送消息时,是可以传递参数对象和指定消息头的。可以把要执行的本地方法的bean名和方法名放进去。
//发送半事务消息
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
topicAndTag,
MessageBuilder.withPayload(msg)
.setHeader(Constants.TX_ID_HEADER_NAME, msg.getTxId())
.setHeader(Constants.CHECK_BEAN_ID_HEADER_NAME, def.getCheckBeanId())
.setHeader(Constants.BIZ_ID_HEADER_NAME, msg.getBizId())
.build(),
def
);
其中def就是参数对象,可以自定义对象,这里是我自定义的TransactionMsgDefinationDto类,可以把想传递的信息放进去,最重要的是要执行的本地方法的bean名和方法名和方法执行参数:executeBeanId(bean名)、executeBeanMethod(方法名)、executeBeanParams(方法执行参数)。该对象可以传给RocketMQLocalTransactionListener的executeLocalTransaction方法,然后通过反射执行。
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
//保存消息记录
String body = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
JSONObject jsonBody = JSONObject.parseObject(body);
BaseMsgDto dto = JSONObject.toJavaObject(jsonBody, BaseMsgDto.class);//(BaseMsgDto)msg.getPayload();
TransactionMsgDefinationDto def = (TransactionMsgDefinationDto)arg;
ProducerLog producerLog = BeanCopyUtils.copyProperties(def, ProducerLog::new);
String[] tags = def.getMsgTags();
if(tags !=null && tags.length > 0) {
StringBuilder tag = new StringBuilder();
for(int i = 0; i<tags.length; i++) {
tag.append(tags[0]);
if(i != tags.length-1) {
tag.append("||");
}
}
producerLog.setMsgTag(tag.toString());
}
producerLog.setBizId(dto.getBizId());
producerLog.setTxId(dto.getTxId());
producerLog.setBizType(dto.getBizType());
producerLog.setGroupName(dto.getProducerGroup());
producerLog.setMsgBody(body);
producerLogService.save(producerLog);
//执行事务方法
SpringUtil.invokeBeanMethod(def.getExecuteBeanId(), def.getExecuteBeanMethod(), def.getExecuteBeanParams());
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
logger.error("发生错误:", e);
return RocketMQLocalTransactionState.UNKNOWN;
}
}
放在消息头header中的数据可以传递给RocketMQLocalTransactionListener的checkLocalTransaction方法,然后同样通过反射执行。
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
try {
String txId = (String)msg.getHeaders().get(Constants.TX_ID_HEADER_NAME);
String checkBeanId = (String)msg.getHeaders().get(Constants.CHECK_BEAN_ID_HEADER_NAME);
Long bizId = Long.parseLong((String)msg.getHeaders().get(Constants.BIZ_ID_HEADER_NAME));
//执行检查方法
Boolean ret = (Boolean)SpringUtil.invokeBeanMethod(checkBeanId, "check", new Object[]{bizId, txId});
if(ret.booleanValue())
return RocketMQLocalTransactionState.COMMIT;
else
return RocketMQLocalTransactionState.ROLLBACK;
} catch (Exception e) {
logger.error("发生错误:", e);
return RocketMQLocalTransactionState.UNKNOWN;
}
}
更多推荐
所有评论(0)