1、启动kafka
nohup ./kafka-server-start.sh …/config/server.properties &&>/dev/null 2>&1 &

或者不指定日志输出位置和设置

./kafka-server-start.sh …/config/server.properties &

2、查看所有topic
./kafka-topics.sh --zookeeper 172.18.111.106:2181 --list

3、删除topic
./kafka-topics.sh --delete --zookeeper 172.18.111.106:2181 --topic audit_db_audit_data_topic

4、查看某一分组消费情况
./kafka-consumer-groups.sh --bootstrap-server 172.18.111.106:9092 --describe --group kafka_result_id

5、所有分组消费情况
./kafka-consumer-groups.sh --bootstrap-server 172.18.111.106:9092 --describe --all-groups

6、消费某一topic
./kafka-console-consumer.sh --bootstrap-server 172.18.111.106:9092 --topic task_result_topic --from-beginning

7、启动生产客户端

./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic kfcs

8、查看某一topic的消费情况

./kafka-topics.sh --bootstrap-server 172.18.111.106:9092 --describe --topic xxxx

9、创建topic

./kafka-topics.sh --zookeeper localhost:2181 --create --topic test --partitions 1 --replication-factor 1

10、设置某个topic的消息有效期
./kafka-configs.sh --zookeeper 192.168.0.101:2181 --alter --entity-name mytopic --entity-type topics --add-config retention.ms=86400000

11、测试kafka发送和接收消息(启动两个终端)

#发送消息(注意端口号为配置文件里面的端口号)
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
#消费消息(可能端口号与配置文件保持一致,或与发送端口保持一致)
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning #加了–from-beginning 重头消费所有的消息
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test #不加–from-beginning 从最新的一条消息开始消费

12、offset重置
./kafka-consumer-groups.sh --bootstrap-server=10.111.30.3:9092,10.111.30.10:9092,10.111.30.5:9092 --topic=locationUp --group=jt809-down-xg --execute --reset-offsets --to-offset 359905139

13、offset重置(所有topic)
./kafka-consumer-groups.sh --bootstrap-server 10.111.30.4:9092,10.111.30.8:9092,10.111.30.9:9092 --group testPlatform2 --reset-offsets --all-topics --to-latest --execute

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐