kafka集群只要解压后修改各节点的broker.id,配置zookeeper连接后分别启动各节点kafka即可。

前提:jdk和zookeeper安装

参考jdk安装zookeeper集群安装

下载并解压

wget https://archive.apache.org/dist/kafka/2.1.1/kafka_2.11-2.1.1.tgz
tar -zxvf kafka_2.11-2.1.1.tgz

修改配置文件各节点的broker.id

cd  kafka_2.11-2.1.1
vi config/server.properties

#broker的全局唯一编号,各节点不能重复
broker.id=1
#删除topic功能开启
delete.topic.enable=true
#kafka运行日志存放的路径
log.dirs=/opt/MPP/kafka_2.11-2.1.1/logs
#配置连接Zookeeper集群地址
zookeeper.connect=10.37.62.95:2181,10.37.62.96:2181,10.37.62.97:2181/kafka
#ip地址为发送消息端连接kafka的ip地址,各节点各自的ip
advertised.listeners=PLAINTEXT://10.37.62.98:9092
advertised.host.name=10.37.62.99
#默认超时时间太短需要修改大一些
zookeeper.connection.timeout.ms=60000

配置环境变量

vi ~/.bash_profile

## KAFKA ##
export KAFKA_HOME=/opt/MPP/kafka_2.11-2.1.1
export PATH=$PATH:$KAFKA_HOME/bin
## KAFKA ##
#立即生效环境变量
source ~/.bash_profile

启动集群:分别配启动个节点

kafka_2.11-2.1.1/bin/kafka-server-start.sh /opt/MPP/kafka_2.11-2.1.1/config/server.properties &

停止kafka

kafka_2.11-2.1.1/bin/kafka-server-stop.sh

查看日志

tail -f kafka_2.11-2.1.1/logs/server.log

测试

1)查看当前服务器中的所有topic
 kafka-topics.sh --zookeeper  10.37.62.95:2181,10.37.62.96:2181,10.37.62.97:2181/kafka --list

2)创建topic

kafka-topics.sh --zookeeper 10.37.62.95:2181,10.37.62.96:2181,10.37.62.97:2181/kafka --create --replication-factor 1 --partitions 1 --topic first
选项说明:
--topic 定义topic名
--replication-factor  定义副本数
--partitions  定义分区数

3)删除topic
	kafka-topics.sh --zookeeper 10.37.62.95:2181,10.37.62.96:2181,10.37.62.97:2181/kafka --delete --topic first
需要server.properties中设置delete.topic.enable=true否则只是标记删除。
 
4)发送消息
 kafka-console-producer.sh --broker-list 10.37.62.99:9092 --topic first

5)消费消息
kafka-console-consumer.sh --bootstrap-server 10.37.62.99:9092 --topic first
# from-beginning:会把主题中以往所有的数据都读取出来。
kafka-console-consumer.sh --bootstrap-server 10.37.62.99:9092 --from-beginning --topic first

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐