生产环境短时间接收大量推送过来的消息,异步发送到Rocketmq时发生超时异常,错误如下:

org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: DEFAULT ASYNC send call timeout at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl$4.run(DefaultMQProducerImpl.java:515) 

......

        翻看Rocketmq源码 DefaultMQProducerImpl#send

        public void send(Message msg,
        SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
        send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
    }
        
    @Deprecated
    public void send(final Message msg, final SendCallback sendCallback, final long timeout)
        throws MQClientException, RemotingException, InterruptedException {
        final long beginStartTime = System.currentTimeMillis();
        ExecutorService executor = this.getAsyncSenderExecutor();
        try {
            executor.submit(new Runnable() {
                @Override
                public void run() {
                    long costTime = System.currentTimeMillis() - beginStartTime;
                    if (timeout > costTime) {
                        try {
                            sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
                        } catch (Exception e) {
                            sendCallback.onException(e);
                        }
                    } else {
                        sendCallback.onException(
                            new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
                    }
                }

            });
        } catch (RejectedExecutionException e) {
            throw new MQClientException("executor rejected ", e);
        }
    }

        由此可知异步发送消息时,消息准备发送时间与消息提交线程池时间如果超过配置的sendMsgTimeout时间,则会抛出超时异常。

从以下三个方面入手解决:

1、适当调大sendMsgTimeout时间

2、消息本身是否过大

3、排查应用发送消息到Rocketmq耗时

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐