版本查看

在页面查看:点击具有Kafka服务的主机,然后点击组件

在服务器查看

如图2.11是Scala版本,2.2.1是Kafka版本
如果不知道CDH装哪,就把/opt/cloudera换成/

find /opt/cloudera -name \*kafka_\* | grep '\.jar'

命令

创建主题

kafka-topics --create \
--bootstrap-server hadoop105:9092 \
--replication-factor 副本数 \
--partitions 分区数 \
--topic 主题名称

注:
若使用--bootstrap-server hadoop105:9092创建,则消费者偏移量保存在Kafka中
若使用--zookeeper hadoop105:2181/kafka创建,则消费者偏移量保存在ZooKeeper中

查看有哪些主题

kafka-topics --list --bootstrap-server hadoop105:9092

kafka-topics --list --zookeeper hadoop105:2181/kafka

查看某个主题的描述

kafka-topics --describe \
--zookeeper hadoop105:2181/kafka \
--topic 主题名称

生产和消费

生产

kafka-console-producer \
--broker-list Kafka地址:9092 \
--topic 主题名称

消费(--from-beginning会把主题以往所有数据都读取出来)

kafka-console-consumer \
--bootstrap-server Kafka地址:9092 \
--topic 主题名称 \
--from-beginning \
--group 消费者组名称

压测

1、创建单分区单副本主题

kafka-topics --create \
--replication-factor 1 \
--partitions 1 \
--bootstrap-server hadoop105:9092 \
--topic StressTesting

2、写测试

生产者参数描述备注
--throughput最大消息吞吐量限制,-1不限制messages/sec
--print-metrics打印指标默认不打印
--num-records生产多少条消息
--record-size每条消息的大小bytes
--producer-props生产者相关配置,如服务器地址优先级高于--producer.config
--producer.config生产者配置文件路径
kafka-producer-perf-test \
--throughput -1 \
--print-metrics \
--num-records 1000000 \
--record-size 1024 \
--producer-props bootstrap.servers=hadoop105:9092 \
--print-metrics \
--topic StressTesting

某写测试结果

records sentrecords/secMB/secavg latencymax latency
100000024396.78938323.821232.27 ms2096.00 ms

3、读测试

消费者参数描述
--messages消费的消息数量
kafka-consumer-perf-test \
--print-metrics \
--messages 1000000 \
--broker-list hadoop105:9092 \
--topic StressTesting

某读测试结果

data.consumed.in.MBMB.secdata.consumed.in.nMsgnMsg.secrebalance.time.msfetch.time.msfetch.MB.secfetch.nMsg.sec
976.562556.8695100000058234.335030871408569.333570997.5151

配置

最大Java堆大小

broker_max_heap_size

建议给4~6G每个Broker

日志数据存储路径

log.dirs

默认副本数

default.replication.factor

建议>1

日志保留策略

log.retention

log.retention.ms:日志保留时长,设-1时无限制,建议3天
log.retention.bytes:每个主题分区保留在日志中的数据量,设-1时无限制
log.retention.check.interval.ms:如图示,每5分钟,日志清理程序 会根据日志保留策略 来清理符合删除条件的日志

接受消息的大小

可接收的单个消息的最大值(message.max.bytes

replica.fetch.max.bytes应大于message.max.bytes

按需调大

其它

配置说明备注
delete.topic.enable是否允许删除主题CDH的Kafka默认启用,不用改
broker.idbroker 编号,唯一的,每台机都不同通常不用改
num.network.threads处理网络请求的线程数
num.io.threads处理网络请求的线程数
num.partitions每个主题的默认分区数默认1不用改
log.segment.bytes日志片段最大字节数必须大于message.max.bytes

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐