材料准备:

在centos7中安装了这些之后。

首先对于zookeeper的配置.

解压
zookeeper根目录为 /hadoop/zookeeper
cd /hadoop/zookeeper
mkdir zkData
cd conf
mv zoo_sample.cfg zoo.cfg
vi zoo.cfg
----
dataDir=/hadoop/zookeeper/zkData
----
/hadoop/zookeeper-3.4.10
启动
cd /hadoop/zookeeper
bin/zkServer.sh start

之后对于kafka的配置

kafka 0.5以上版本自带zookeeper,自带的不稳定,最好单独
下载好了,解压
kafka的路径 /hadoop/kafka

创建日志文件
cd /hadoop/kafka
mkdir logs

cd /config
vi server.properties

----
#broker的全局唯一编号,不能重复
broker.id=1
#删除topic功能使能
delete.topic.enable=true

#kafka运行日志存放的路径	
log.dirs=/hadoop/kafka/logs

#配置连接Zookeeper集群地址,没有集群采用单节点,没有hadoop102的映射写成ip地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181
----



启动kafka,一定要先启动zookeeper(装完zk启动)
启动zk中
...启动完成
cd /hadoop/kafka
bin/kafka-server-start.sh -daemon config/server.properties

配置外部访问

vi server.properties

创建topic

bin/kafka-topics.sh --zookeeper ip:2181 \

--create --replication-factor 1 --partitions 1 --topic first1

红色部分是自定义,ip是zookeeper的ip,first1是消费者主题的名称。

 

python方面代码..

pip install kafka-python

生产者:

from kafka import KafkaProducer;
import json;
producer = KafkaProducer(
                             value_serializer=lambda v: json.dumps(v).encode('utf-8'),
                             bootstrap_servers=['192.168.31.22:9092']
                          )
msg_dict = {
    "operatorId":"test",#公交公司ID
    "terminalId":"123",#设备Id
    "terminalCode":"123",#设备编码(使用车辆ID)
    "terminalNo":"1",#同一车辆内terminal序号从1开始
}


producer.send("text1",msg_dict)
producer.close()

 

消费者:

注意:消费者要一直启动着才可以

# -*- coding: utf-8 -*-
from kafka import KafkaConsumer;
consumer=KafkaConsumer('text1',bootstrap_servers='192.168.31.22:9092')
for msg in consumer:
    print(msg.value.decode())


 

 

 

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐