Kafka消费消息体过大,反序列化时OOM
kafka消息
·
今天在生产环境的kafka处理数据时发现很多oom,导致kafka消费服务异常,消费数据积压,先贴下异常问题
java.lang.OutOfMemoryError: GC overhead limit exceeded
[ERROR] [2022-06-24 10:20:49.928] [c.q.d.k.v.VehicleCaptureDataListener:128][qsdi-pool-global-1-thread-2] 服务名称:ivdg-data-access-service--> 接收数据异常,异常原因exception=GC overhead limit exceeded
com.alibaba.fastjson.JSONException: GC overhead limit exceeded
at com.alibaba.fastjson.parser.DefaultJSONParser.parseObject(DefaultJSONParser.java:710)
at com.alibaba.fastjson.JSON.parseObject(JSON.java:394)
at com.alibaba.fastjson.JSON.parseObject(JSON.java:298)
at com.alibaba.fastjson.JSON.parseObject(JSON.java:588)
at com.qsdi.dataAccess.kafka.vehicle.VehicleCaptureDataListener.lambda$batchHandlerRecord$3(VehicleCaptureDataListener.java:95)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
at com.qsdi.dataAccess.kafka.vehicle.VehicleCaptureDataListener.batchHandlerRecord(VehicleCaptureDataListener.java:86)
at com.qsdi.dataAccess.kafka.vehicle.VehicleCaptureDataListener.lambda$listener$0(VehicleCaptureDataListener.java:65)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
[ERROR] [2022-06-24 10:20:49.928] [o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:149][org.springframework.kafka.KafkaListenerEndpointContainer#1-1-C-1] 服务名称:ivdg-data-access-service--> Stopping container due to an Error
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3332)
at java.lang.StringCoding.safeTrim(StringCoding.java:89)
at java.lang.StringCoding.access$100(StringCoding.java:50)
at java.lang.StringCoding$StringDecoder.decode(StringCoding.java:154)
at java.lang.StringCoding.decode(StringCoding.java:193)
at java.lang.String.<init>(String.java:426)
at java.lang.String.<init>(String.java:491)
at org.apache.kafka.common.serialization.StringDeserializer.deserialize(StringDeserializer.java:47)
at org.apache.kafka.common.serialization.StringDeserializer.deserialize(StringDeserializer.java:28)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1324)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:129)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1555)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1244)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1144)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1057)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:750)
从代码上来看,最后是在StringCoding.safeTrim()方法,最后调用Arrarys.copyOf()后导致的OOM异常,这里我们从头跟一下源码看下
可以看出来kafka在调用fetcher poll数据,我们看下源码
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
Queue<CompletedFetch> pausedCompletedFetches = new ArrayDeque<>();
int recordsRemaining = maxPollRecords;
try {
while (recordsRemaining > 0) {
if (nextInLineFetch == null || nextInLineFetch.isConsumed) {
CompletedFetch records = completedFetches.peek();
if (records == null) break;
if (records.notInitialized()) {
try {
nextInLineFetch = initializeCompletedFetch(records);
} catch (Exception e) {
// Remove a completedFetch upon a parse with exception if (1) it contains no records, and
// (2) there are no fetched records with actual content preceding this exception.
// The first condition ensures that the completedFetches is not stuck with the same completedFetch
// in cases such as the TopicAuthorizationException, and the second condition ensures that no
// potential data loss due to an exception in a following record.
FetchResponse.PartitionData partition = records.partitionData;
if (fetched.isEmpty() && (partition.records == null || partition.records.sizeInBytes() == 0)) {
completedFetches.poll();
}
throw e;
}
} else {
nextInLineFetch = records;
}
completedFetches.poll();
} else if (subscriptions.isPaused(nextInLineFetch.partition)) {
// when the partition is paused we add the records back to the completedFetches queue instead of draining
// them so that they can be returned on a subsequent poll if the partition is resumed at that time
log.debug("Skipping fetching records for assigned partition {} because it is paused", nextInLineFetch.partition);
pausedCompletedFetches.add(nextInLineFetch);
nextInLineFetch = null;
} else {
List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineFetch, recordsRemaining);
if (!records.isEmpty()) {
TopicPartition partition = nextInLineFetch.partition;
List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition);
if (currentRecords == null) {
fetched.put(partition, records);
} else {
// this case shouldn't usually happen because we only send one fetch at a time per partition,
// but it might conceivably happen in some rare cases (such as partition leader changes).
// we have to copy to a new list because the old one may be immutable
List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
newRecords.addAll(currentRecords);
newRecords.addAll(records);
fetched.put(partition, newRecords);
}
recordsRemaining -= records.size();
}
}
}
} catch (KafkaException e) {
if (fetched.isEmpty())
throw e;
} finally {
// add any polled completed fetches for paused partitions back to the completed fetches queue to be
// re-evaluated in the next poll
completedFetches.addAll(pausedCompletedFetches);
}
return fetched;
}
可以看到,取数据的数据量是根据 maxPollRecords来拉取的,这里直接看
private List<ConsumerRecord<K, V>> fetchRecords(CompletedFetch completedFetch, int maxRecords){}
这个方法,然后跟进到completedFetch.fetchRecords(maxRecords)这里,然后这里代码会调用到
private ConsumerRecord<K, V> parseRecord(TopicPartition partition,
RecordBatch batch,
Record record) {
这个方法对拉取到的数据进行key,value的反序列化操作,我这里给key,value设置的都是用的StringSerializer,所以这里直接就到了String.java类中的这段代码
public String(byte bytes[], int offset, int length, String charsetName)
throws UnsupportedEncodingException {
if (charsetName == null)
throw new NullPointerException("charsetName");
checkBounds(bytes, offset, length);
this.value = StringCoding.decode(charsetName, bytes, offset, length);
}
这里就调用到了StringCoding.decode去将获取到的数据进行decoding解码,最后调用safeTrim()方法进行安全的字符串裁剪,代码如下:
// Trim the given char array to the given length
//
private static char[] safeTrim(char[] ca, int len,
Charset cs, boolean isTrusted) {
if (len == ca.length && (isTrusted || System.getSecurityManager() == null))
return ca;
else
return Arrays.copyOf(ca, len);
}
最后会调用Arrars.copyOf()方法复制出解码后的新的数组,就是在这里如果数组长度过长,或是内存不足就会导致开辟新的内存空间时导致内存溢出,这里要不就扩大启动内存,要不就调小单次发送kafka数据的消息体数据
更多推荐
已为社区贡献5条内容
所有评论(0)