如果说,Kafka处理的消息大小,与它所用的DirectMemory大小,会存在一定程度的关系。你一定会觉得非常奇怪。毕竟,阅读kafka处理消息的源码时,只看到了它申请了堆内存,并未发现有申请DirectMemory。

那么,我们写一个简化版的kafka broker 9092消息处理程序来进行一下实验。

package com.dnsanalyze.main;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class KafkaOOMTest {
    //简化版的Processor处理过程;真实的kafka源码里,还会在selector里启动KafkaChannel和NetworkReceive来处理数据
    public void process(Selector selector){
        while(true) {
            try {
                selector.select();
                Iterator iterator = selector.selectedKeys().iterator();
                //接受到了可以读取的数据
                ByteBuffer size = ByteBuffer.allocate(4);
                ByteBuffer data = null;
                while(iterator.hasNext()) {
                    SelectionKey key = (SelectionKey) iterator.next();
                    if(key.isValid() && key.isReadable()) {
                        SocketChannel channel = (SocketChannel) key.channel();
                        while(size.hasRemaining()){
                            channel.read(size);
                        }
                        //利用头4个字节,得到数据大小
                        size.rewind();
                        int dataSize = size.getInt();
                        System.out.println("data size:" + dataSize);
                        //开辟堆内存空间
                        data = ByteBuffer.allocate(dataSize);
                        while(data.hasRemaining()) {
                            channel.read(data);
                        }
                    }
                }

            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public void start() {
        //9092监听网络请求
        try {
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress("127.0.0.1",9092));
            System.out.println("listen 9092");

            //监听来自客户端的一个连接请求;在真实的kafka代码里,会交给Processor线程处理,Processor线程的个数与num.network.threads相等
            SocketChannel socketChannel = serverSocketChannel.accept();
            socketChannel.configureBlocking(false);
            Selector selector = Selector.open();
            socketChannel.register(selector, SelectionKey.OP_READ);
            process(selector);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        KafkaOOMTest kafkaOOMTest = new KafkaOOMTest();
        kafkaOOMTest.start();
    }
}

而随着客户端第一次socket连接9092,并且发送数据之后,可以清晰地看到堆外内存有一次增长,并且增长的大小与消息的大小完全一致:

如果在jvm参数里,将maxDirectMemory的大小限制为100M。那么当发送超过此限制的消息大小时,会触发Direct Memory的OOM错误:

真是奇怪, 代码里并未直接申请堆外内存,怎么会在实验中发现堆外内存的增长呢?因为在对于HeapByteBuffer进行读写操作时,需要开辟堆外内存空间作为中间数据交换的空间。而这部分堆外内存并非由Kafka直接申请,而是由JVM申请。

用以下代码对于新启动的kafka broker 9092进行消息发送:

#coding:utf-8
import socket
import struct
import time

def send_data():
    tcp_client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    tcp_client.connect(("kafka broker ip",9092))

    fmt = '!is'
    data = struct.pack(fmt, 50*1024*1024, ('a'*(50*1024*1024)).encode())
    tcp_client.send(data)

    tcp_client.close()

if __name__ == '__main__':
   send_data()

这段脚本每次都启动新的socket客户端发送一个长达50M的消息。可以观察到kafka broker进程非常明显的堆外内存增长。每次增长50M,4次消息后,堆外内存即涨到了200M。

 这样就解释了我们最近遇到的Kafka Direct Memory OOM的问题。

因为某些消息触发的Invalid receive错误,导致了Kafka主动关闭socket。这样发送数据的脚本,会重连socket,kafka accept之后会申请与消息大小相等的堆内存空间,而JVM会根据第一次申请的堆内存大小开辟堆外内存空间。这样,多出现几次错误(而恰好每次新发过来的数据量又特别大),堆外内存大小超过maxDirectMemory限制,自然就会触发 Kafka Direct Memory OOM的问题。

由于SocketChannel在读取数据到HeapBuffer时,用到了一个threadlocal的buffercache对象。如果没有从buffercache取到需要的堆外内存时(buffercache里没有缓存,或需要的堆外内存比buffercache里的大),就会进行堆外内存新的申请。因此,如果发送给kafka的消息,被新的线程处理,或者消息的大小大于该线程之前申请过的堆外内存大小,那么都会造成kafka 堆外内存增长的情况。而这个情况,在消息的size格外大的时候,会特别明显。

既然这样,第二个疑问产生了。这部分堆外内存空间怎么不会被主动回收呢?因为上述buffercache机制,这部分堆外内是一直被引用的,因此在线程存活的时间内,这部分堆外内存是不会被回收的。

那么该如何解决这个问题呢,在kafka的启动参数里加上如下参数即可:

-Djdk.nio.maxCachedBufferSize=1048576

参数解释:

具体为:这个参数不会阻止申请比较大的DirectByteBuffer,仅仅只是阻止buffer对象被缓存起来并被重用。如果不配置这个参数,那么是不会对于缓存的内存对象进行限制的,在这个线程存在的时间范围内这个directbuffer会一直存在。

加上参数之后,就表示最大缓存的堆外内存对象为1M。这样,如果需要新的堆外内存,JVM就会重新申请DirectMemory,而不是重用之前的。而allocate新的DirectByteBuffer时,JVM会执行System.gc()(具体在Bits.java的reserveMemory函数里。)这样,就可以保证没有使用的堆外内存可以被即时回收掉。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐