kafka日常操作(包含修改分区,副本数)
增加新服务到kafka集群是很容易的,只要为新服务分配一个独一无二的Broker ID并启动即可。但是,新的服务不会自动分配到任何数据,需要把分区数据迁移给它们,在此期间它们一直不工作,直到新的topic创建,所以,通常向集群添加机器时,你需要将一些现有的数据迁移到这些机器上。迁移数据的过程是手动启动的,但是执行过程是完全自动化的。在kafka后台内部中,kafka将添加新的服务器,并作为正在迁移
·
1、查看topic
/opt/kafka/bin/kafka-topics.sh --list --zookeeper zookeeper_ip:zookeeper_port
2、创建 topic
/opt/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper_ip:zookeeper_port --replication-factor 2 --partitions 1 --topic topic_name
3、删除Topic
/opt/kafka/bin/kafka-topics.sh --delete --zookeeper zookeeper_ip:zookeeper_port --topic topic_name
4、查看topic 详细信息
/opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper_ip:zookeeper_port --describe --topic topic_name
5、修改topic副本数
例如:
编写修改副本计划
[root@k19-bigdata-4 ~]# cat replication.json
{"version":1,
"partitions":[
{"topic":"test","partition":0,"replicas":[0,1]},
{"topic":"test","partition":1,"replicas":[0,1]},
{"topic":"test","partition":2,"replicas":[0,1]},
{"topic":"test","partition":3,"replicas":[0,1]},
{"topic":"test","partition":4,"replicas":[0,1]},
{"topic":"test","partition":5,"replicas":[0,1]},
{"topic":"test","partition":6,"replicas":[0,1]}
]
}
执行计划
/opt/kafka/bin/kafka-reassign-partitions.sh --zookeeper zookeeper_ip:zookeeper_port --reassignment-json-file replication.json --execute
6、修改分区数据(只能增加)
/opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper_ip:zookeeper_port --alter --topic test –partitions 6
/opt/kafka/bin/kafka-topics.sh --bootstrap-server ${KAFKA_BROKERS} --alter --topic test --partitions 12
例如:
7、创建生产者
/opt/kafka/bin/kafka-console-producer.sh --broker-list 10.0.32.210:6667 --topic K19-NTC-HTTP-LOG
8、创建消费者
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.5.104:6667 --topic K19-NTC-HTTP-LOG --from-beginning
9、创建消费者组
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 10.0.32.210:6667 --topic K19-NTC-HTTP-LOG --consumer.config /opt/kafka/config/consumer.properties
更多推荐
已为社区贡献16条内容
所有评论(0)