今天在生产环境的kafka处理数据时发现很多oom,导致kafka消费服务异常,消费数据积压,先贴下异常问题

java.lang.OutOfMemoryError: GC overhead limit exceeded
[ERROR] [2022-06-24 10:20:49.928] [c.q.d.k.v.VehicleCaptureDataListener:128][qsdi-pool-global-1-thread-2] 服务名称:ivdg-data-access-service--> 接收数据异常,异常原因exception=GC overhead limit exceeded
com.alibaba.fastjson.JSONException: GC overhead limit exceeded
	at com.alibaba.fastjson.parser.DefaultJSONParser.parseObject(DefaultJSONParser.java:710)
	at com.alibaba.fastjson.JSON.parseObject(JSON.java:394)
	at com.alibaba.fastjson.JSON.parseObject(JSON.java:298)
	at com.alibaba.fastjson.JSON.parseObject(JSON.java:588)
	at com.qsdi.dataAccess.kafka.vehicle.VehicleCaptureDataListener.lambda$batchHandlerRecord$3(VehicleCaptureDataListener.java:95)
	at java.util.Iterator.forEachRemaining(Iterator.java:116)
	at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
	at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
	at com.qsdi.dataAccess.kafka.vehicle.VehicleCaptureDataListener.batchHandlerRecord(VehicleCaptureDataListener.java:86)
	at com.qsdi.dataAccess.kafka.vehicle.VehicleCaptureDataListener.lambda$listener$0(VehicleCaptureDataListener.java:65)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
[ERROR] [2022-06-24 10:20:49.928] [o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:149][org.springframework.kafka.KafkaListenerEndpointContainer#1-1-C-1] 服务名称:ivdg-data-access-service--> Stopping container due to an Error
java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOf(Arrays.java:3332)
	at java.lang.StringCoding.safeTrim(StringCoding.java:89)
	at java.lang.StringCoding.access$100(StringCoding.java:50)
	at java.lang.StringCoding$StringDecoder.decode(StringCoding.java:154)
	at java.lang.StringCoding.decode(StringCoding.java:193)
	at java.lang.String.<init>(String.java:426)
	at java.lang.String.<init>(String.java:491)
	at org.apache.kafka.common.serialization.StringDeserializer.deserialize(StringDeserializer.java:47)
	at org.apache.kafka.common.serialization.StringDeserializer.deserialize(StringDeserializer.java:28)
	at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1324)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:129)
	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1555)
	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1244)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1144)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1057)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:750)

从代码上来看,最后是在StringCoding.safeTrim()方法,最后调用Arrarys.copyOf()后导致的OOM异常,这里我们从头跟一下源码看下

可以看出来kafka在调用fetcher poll数据,我们看下源码

public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
        Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
        Queue<CompletedFetch> pausedCompletedFetches = new ArrayDeque<>();
        int recordsRemaining = maxPollRecords;

        try {
            while (recordsRemaining > 0) {
                if (nextInLineFetch == null || nextInLineFetch.isConsumed) {
                    CompletedFetch records = completedFetches.peek();
                    if (records == null) break;

                    if (records.notInitialized()) {
                        try {
                            nextInLineFetch = initializeCompletedFetch(records);
                        } catch (Exception e) {
                            // Remove a completedFetch upon a parse with exception if (1) it contains no records, and
                            // (2) there are no fetched records with actual content preceding this exception.
                            // The first condition ensures that the completedFetches is not stuck with the same completedFetch
                            // in cases such as the TopicAuthorizationException, and the second condition ensures that no
                            // potential data loss due to an exception in a following record.
                            FetchResponse.PartitionData partition = records.partitionData;
                            if (fetched.isEmpty() && (partition.records == null || partition.records.sizeInBytes() == 0)) {
                                completedFetches.poll();
                            }
                            throw e;
                        }
                    } else {
                        nextInLineFetch = records;
                    }
                    completedFetches.poll();
                } else if (subscriptions.isPaused(nextInLineFetch.partition)) {
                    // when the partition is paused we add the records back to the completedFetches queue instead of draining
                    // them so that they can be returned on a subsequent poll if the partition is resumed at that time
                    log.debug("Skipping fetching records for assigned partition {} because it is paused", nextInLineFetch.partition);
                    pausedCompletedFetches.add(nextInLineFetch);
                    nextInLineFetch = null;
                } else {
                    List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineFetch, recordsRemaining);

                    if (!records.isEmpty()) {
                        TopicPartition partition = nextInLineFetch.partition;
                        List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition);
                        if (currentRecords == null) {
                            fetched.put(partition, records);
                        } else {
                            // this case shouldn't usually happen because we only send one fetch at a time per partition,
                            // but it might conceivably happen in some rare cases (such as partition leader changes).
                            // we have to copy to a new list because the old one may be immutable
                            List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
                            newRecords.addAll(currentRecords);
                            newRecords.addAll(records);
                            fetched.put(partition, newRecords);
                        }
                        recordsRemaining -= records.size();
                    }
                }
            }
        } catch (KafkaException e) {
            if (fetched.isEmpty())
                throw e;
        } finally {
            // add any polled completed fetches for paused partitions back to the completed fetches queue to be
            // re-evaluated in the next poll
            completedFetches.addAll(pausedCompletedFetches);
        }

        return fetched;
    }

可以看到,取数据的数据量是根据 maxPollRecords来拉取的,这里直接看

private List<ConsumerRecord<K, V>> fetchRecords(CompletedFetch completedFetch, int maxRecords){}

这个方法,然后跟进到completedFetch.fetchRecords(maxRecords)这里,然后这里代码会调用到

private ConsumerRecord<K, V> parseRecord(TopicPartition partition,
                                             RecordBatch batch,
                                             Record record) {

这个方法对拉取到的数据进行key,value的反序列化操作,我这里给key,value设置的都是用的StringSerializer,所以这里直接就到了String.java类中的这段代码

public String(byte bytes[], int offset, int length, String charsetName)
            throws UnsupportedEncodingException {
        if (charsetName == null)
            throw new NullPointerException("charsetName");
        checkBounds(bytes, offset, length);
        this.value = StringCoding.decode(charsetName, bytes, offset, length);
    }

这里就调用到了StringCoding.decode去将获取到的数据进行decoding解码,最后调用safeTrim()方法进行安全的字符串裁剪,代码如下:

// Trim the given char array to the given length
    //
    private static char[] safeTrim(char[] ca, int len,
                                   Charset cs, boolean isTrusted) {
        if (len == ca.length && (isTrusted || System.getSecurityManager() == null))
            return ca;
        else
            return Arrays.copyOf(ca, len);
    }

最后会调用Arrars.copyOf()方法复制出解码后的新的数组,就是在这里如果数组长度过长,或是内存不足就会导致开辟新的内存空间时导致内存溢出,这里要不就扩大启动内存,要不就调小单次发送kafka数据的消息体数据

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐