前言

本人才疏学浅,所设计测试方法及程序可能并不合理,欢迎批评指正。

Linux环境

centos7,3g运行内存,30g存储内存

Kafka测试原思路

条件:单主题、单副本、单节点
采用方法:使用自带测试脚本,消息总数不变,增加每秒发送消息数,延迟最低时接近最大吞吐量。
执行命令如下(消息总数每次执行时进行修改):
问题:后续模拟多主题环境困难

bin/kafka-producer-perf-test.sh --num-records 10000000 --record-size 1024 --topic test --throughput 200000 --producer-props bootstrap.servers=127.0.0.1:9092 > logs/test.log 

查看日志内容(每次执行后结果汇总):

10000000 records sent, 63754.773639 records/sec (62.26 MB/sec), 11.66 ms avg latency, 608.00 ms max latency, 10 ms 50th, 19 ms 95th, 29 ms 99th, 493 ms 99.9th.
10000000 records sent, 64253.312258 records/sec (62.75 MB/sec), 12.53 ms avg latency, 635.00 ms max latency, 10 ms 50th, 20 ms 95th, 34 ms 99th, 535 ms 99.9th.
10000000 records sent, 63926.765497 records/sec (62.43 MB/sec), 15.22 ms avg latency, 870.00 ms max latency, 10 ms 50th, 20 ms 95th, 46 ms 99th, 774 ms 99.9th.
10000000 records sent, 62859.872772 records/sec (61.39 MB/sec), 12.73 ms avg latency, 792.00 ms max latency, 10 ms 50th, 19 ms 95th, 34 ms 99th, 670 ms 99.9th.
10000000 records sent, 63918.593280 records/sec (62.42 MB/sec), 12.27 ms avg latency, 697.00 ms max latency, 9 ms 50th, 19 ms 95th, 32 ms 99th, 603 ms 99.9th.
10000000 records sent, 63061.245081 records/sec (61.58 MB/sec), 13.69 ms avg latency, 916.00 ms max latency, 10 ms 50th, 19 ms 95th, 37 ms 99th, 838 ms 99.9th.

含义:
发送总消息数 单条消息大小(字节) 每秒发送条数 结果编号 最大延迟(毫秒) 平均延迟 95%(毫秒) 99% 99.9% 平均每秒写入信息(MB/sec)

结论:Kafka 在该条件下最大吞吐量位于 约 60000条消息每秒

Rocketmq测试:

条件:单主题、单副本、单节点
方法:编写java程序测试

public void test(@RequestParam long messageNum, @RequestParam int threadCount){
    //number 消息总数   threadcount 线程数
    //用于存放各线程执行消息数
    final long[] msgNums = new long[threadCount];
    //将消息平均分配给各线程
    if (messageNum > 0) {
        Arrays.fill(msgNums, messageNum / threadCount);
        long mod = messageNum % threadCount;
        if (mod > 0) {
            msgNums[0] += mod;
        }
    }

    class NameTreadFactory implements ThreadFactory {

        private final AtomicInteger mThreadNum = new AtomicInteger(1);

        @Override
        public Thread newThread(@NonNull Runnable r) {
            Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
            System.out.println(t.getName() + " has been created");
            return t;
        }
    }

    ThreadFactory threadFactory = new NameTreadFactory();

    final ExecutorService sendThreadPool = new ThreadPoolExecutor(threadCount, threadCount, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),threadFactory);

    List<Message> messages = new ArrayList<>();
    for (int j = 0; j < 3000; j++) {
        Message message = new Message("batchTopic", "tag",new byte[1024]);
        messages.add(message);
    }

    for (int i = 0; i < threadCount; i++) {

        final long msgNumLimit = msgNums[i];
        if (messageNum > 0 && msgNumLimit == 0) {
            break;
        }

        int finalI = i;
        sendThreadPool.execute(() -> {

            System.out.println("线程"+ finalI +"启动");
            int num = 0;
            do {
                try {
                    producer.send(messages);
                } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
                    e.printStackTrace();
                }
                num = num +20000;
            } while (num <= msgNumLimit);
        });
    }
}

发送总消息数 单条消息大小(字节) 线程数 每秒写入消息条数
10000000 1024 1 约16491
10000000 1024 3 约36380
10000000 1024 5 约50000
10000000 1024 7 约43000
10000000 1024 9 约34000

执行结果截图:
线程数:1
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
线程数:3
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
线程数: 5
在这里插入图片描述
在这里插入图片描述

线程数:7
在这里插入图片描述
在这里插入图片描述
线程数:9
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

结论:
Rocketmq在该条件下达到最大吞吐量为 50000条消息每秒

Kafka测试新思路

利用CMAK监测kafka吞吐量
for循环不断执行:

public void send(String topic,String message){
    kafkaTemplate.send(topic,message);
}

执行结果截图:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
结论:
在该条件下最大吞吐量达到约130000条每秒
设置50个分区
最大吞吐量达到约160000条每秒

多主题:
利用以下程序模拟64个主题(实际真正执行时,一次只同时操作了6个主题)

@GetMapping("sendToConsumer")
public void sendMessage() {

    class NameTreadFactory implements ThreadFactory {

        private final AtomicInteger mThreadNum = new AtomicInteger(1);

        @Override
        public Thread newThread(@NonNull Runnable r) {
            Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
            System.out.println(t.getName() + " has been created");
            return t;
        }
    }

    ThreadFactory threadFactory = new NameTreadFactory();

    final ExecutorService sendThreadPool = new ThreadPoolExecutor(6, 6, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), threadFactory);

    for (int i = 0; i < 6; i++) {
        int first = i;
        sendThreadPool.execute(() -> {

            for (int j = 0; j < 6; j++) {
                String topicName = first + "_" + j;
                for (int k = 0; k < 10000000; k++) {
                    producer.send(topicName, "JK在做测试");
                }
            }
        

});
}
}

程序启动初期:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
程序启动末期:
在这里插入图片描述
在这里插入图片描述
结论:
在多主题情况下kafka吞吐量呈下降趋势
该条件下最大吞吐量位于80000-90000之间 下降约30000-40000条消息每秒

Rocketmq 多主题:

@GetMapping("/produce3")
public void sendMessage() {

    class NameTreadFactory implements ThreadFactory {

        private final AtomicInteger mThreadNum = new AtomicInteger(1);

        @Override
        public Thread newThread(@NonNull Runnable r) {
            Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
            System.out.println(t.getName() + " has been created");
            return t;
        }
    }

    ThreadFactory threadFactory = new NameTreadFactory();

    final ExecutorService sendThreadPool = new ThreadPoolExecutor(6, 6, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), threadFactory);

    for (int i = 0; i < 6; i++) {
        int first = i;
        sendThreadPool.execute(() -> {

            for (int j = 0; j < 6; j++) {
                String topicName = first + "_" + j;
                for (int k = 0; k < 10000000; k++) {
                    Message message = new Message(topicName,"tag","JK在做测试".getBytes(StandardCharsets.UTF_8));
                    try {
                        producer.send(message);
                    } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
                        return;
                    }
                }
                System.out.println(topicName+"执行结束");
            }
        });
    }
}

在这里插入图片描述

@GetMapping("/produce3")
public void sendMessage() {

    class NameTreadFactory implements ThreadFactory {

        private final AtomicInteger mThreadNum = new AtomicInteger(1);

        @Override
        public Thread newThread(@NonNull Runnable r) {
            Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
            System.out.println(t.getName() + " has been created");
            return t;
        }
    }

    ThreadFactory threadFactory = new NameTreadFactory();

    final ExecutorService sendThreadPool = new ThreadPoolExecutor(6, 6, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), threadFactory);

    List<Message> messages = new ArrayList<>();
    for (int j = 0; j < 5000; j++) {
        Message message = new Message("batchTopic", "tag","JK在做测试".getBytes(StandardCharsets.UTF_8));
        messages.add(message);
    }

    for (int i = 0; i < 6; i++) {
        int first = i;
        sendThreadPool.execute(() -> {

            for (int j = 0; j < 6; j++) {
                int num = 0;
                String topicName = first + "_" + j;
                do {
                    try {
                        producer.send(messages);
                    } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
                        return;
                    }
                    num = num +5000;
                }while(num <10000000);
                System.out.println(topicName+"执行结束");
            }
        });
    }
}

在这里插入图片描述

@GetMapping("/produce3")
public void sendMessage() {

    class NameTreadFactory implements ThreadFactory {

        private final AtomicInteger mThreadNum = new AtomicInteger(1);

        @Override
        public Thread newThread(@NonNull Runnable r) {
            Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
            System.out.println(t.getName() + " has been created");
            return t;
        }
    }

    ThreadFactory threadFactory = new NameTreadFactory();

    final ExecutorService sendThreadPool = new ThreadPoolExecutor(6, 6, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), threadFactory);

    List<Message> messages = new ArrayList<>();
    for (int j = 0; j < 10000; j++) {
        Message message = new Message("batchTopic", "tag","JK在做测试".getBytes(StandardCharsets.UTF_8));
        messages.add(message);
    }

    for (int i = 0; i < 6; i++) {
        int first = i;
        sendThreadPool.execute(() -> {

            for (int j = 0; j < 5; j++) {
                int num = 0;
                String topicName = first + "_" + j;
                do {
                    try {
                        producer.send(messages);
                    } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
                        return;
                    }
                    num = num +5000;
                }while(num <10000000);
                System.out.println(topicName+"执行结束");
            }
        });
    }
}

在这里插入图片描述
结论:若采用相同发送方式,rocketmq最大吞吐量仅约4000条消息每秒
改变测试方式,同单主题时测试方式,最大吞吐量达到约50000条消息每秒
通单主题测试时 几乎没有衰减

最终结论

Kafka在单主题条件下,使用官方自测脚本,吞吐量约为60000条消息每秒,
使用java程序测试时,单分区条件下吞吐量约为130000条消息每秒,增加分区数到50,
Kafka达到约160000条消息每秒。
在多主题情况下(模拟64个主题,实际只达到了6个主题同时写入),使用java程序测试,吞吐量约为80000-90000条消息每秒。
RocketMQ单主题情况下,使用Java程序测试,吞吐量约为50000条消息每秒,
多主题情况下(模拟64个主题,实际只达到了6个主题同时写入)使用同kafka完全一致的测试方式,吞吐量仅为4000条消息每秒,
修改方法,同单主题测试时类似,此时吞吐量仍约为50000条消息每秒。
结合自测结果及网上其他结果推论:
Kafka在单主题时吞吐量呈明显优势优于rocketmq
但随着主题数的增加,kafka的吞吐量必然要小于rocketmq。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐