org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

意思是生产者发送的消息过大,大于broker的配置

查看相关资料后,发现  Broker 端对 Producer 发送过来的消息也有一定的大小限制,这个参数叫 message.max.bytes,这个参数决定了  Broker 能够接收到的最大消息的大小,它的默认值为 977 KB,而 max.request.size 的值已经设置成 2M  大小了,很显然已经比 message.max.bytes 大了很多,因此消息大于 997KB 时,就会抛出如上异常。

值得一提的是,主题配置也有一个参数,叫 max.message.bytes,它只针对某个主题生效,可动态配置,可覆盖全局的 message.max.bytes,好处就是可以针对不同主题去设置 Broker 接收消息的大小,而且不用重启 Broker。

这还没完,消费端拉取消息数据的大小也需要更改,这个参数叫  fetch.max.bytes,这个参数决定消费者单次从 Broker 获取消息的最大字节数,那么问题来了,如果该参数值比  max.request.size 小,那么会导致消费者很可能消费不了比 fetch.max.bytes 大的消息。
 

producer端:
max.request.size=5242880(5M)
broker:
message.max.bytes=6291456(6M)
consumer:
fetch.max.bytes=7340032(7M)

max.request.size < message.max.bytes < fetch.max.bytes

    message.max.bytes (默认:1000000) – broker能接收消息的最大字节数,这个值应该比消费端的fetch.message.max.bytes更小才对,否则broker就会因为消费端无法使用这个消息而挂起。

    log.segment.bytes (默认: 1GB) – kafka数据文件的大小,确保这个数值大于一个消息的长度。一般说来使用默认值即可(一般一个消息很难大于1G,因为这是一个消息系统,而不是文件系统)。

    replica.fetch.max.bytes (默认: 1MB) – broker可复制的消息的最大字节数。这个值应该比message.max.bytes大,否则broker会接收此消息,但无法将此消息复制出去,从而造成数据丢失。

fetch.message.max.bytes (默认 1MB) – 消费者能读取的最大消息。这个值应该大于或等于message.max.bytes。

所以,如果你一定要选择kafka来传送大的消息,还有些事项需要考虑。要传送大的消息,不是当出现问题之后再来考虑如何解决,而是在一开始设计的时候,就要考虑到大消息对集群和主题的影响。

解决方案如下

kafka服务器端调整: 

方案一:

修改指定的topic中max.message.bytes,不需要重启,对此topic生效
/bin/kafka-topics.sh --zookeeper 192.168.6.102:2181 --alter --topic testTopic --config max.message.bytes=52428800

方案二:

server.properties中添加
message.max.bytes=5242880(5M)
replica.fetch.max.bytes=6291456(6M)每个分区试图获取的消息字节数。要大于等于message.max.bytes

重启kafka才生效

java微服务调整

@Bean
    public ConcurrentKafkaListenerContainerFactory<String, LogReceive> basicKafkaListenerContainerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
        configProps.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,ConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES*5);

        ConcurrentKafkaListenerContainerFactory<String, LogReceive> factory = new ConcurrentKafkaListenerContainerFactory<>();
        ConsumerFactory<String,LogReceive> basicConsumerFactory =
                new DefaultKafkaConsumerFactory<>(configProps,
                        new StringDeserializer(),
                        new JsonDeserializer<>(LogReceive.class));
        factory.setConsumerFactory(basicConsumerFactory);

        return factory;
    }

或者:

application.yml 中:
 producer
   下增加: max-request-size:5242880
 consumer
   下增加: fetch-max-bytes:6291456

命令行支持大报文

1> producer.properties中添加
max.request.size = 5242880 (5M)请求的最大大小为字节。要小于 message.max.bytes

2> consumer.properties中添加
fetch.message.max.bytes=6291456(6M)每个提取请求中为每个主题分区提取的消息字节数。要大于等于message.max.bytes

3> 重新执行生产端的命令
./bin/kafka-console-producer.sh  --broker-list 192.168.6.102:8997 --topic testTopic < /usr/local/test.txt --producer.config /usr/local/kafka10/config/producer.properties


注:在Linux控制台发送消息时,控制台有输入字数限制,不利于测试,所以将大的消息放在文本文件里test.txt,通过< /usr/local/test.txt追加到控制台

4> 重新执行消费断的命令
./bin/kafka-console-consumer.sh --zookeeper 192.168.6.102:2181 --topic testTopic --consumer.config config/consumer.properties

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐