这是做的数据采集实验,我们在做的时候,要先把zookeeper和kafka服务启动,可以参考我上一篇.https://blog.csdn.net/hhjdshz/article/details/123881242?spm=1001.2014.3001.5501

 学生表student

sno

sname

ssex

sage

95001

John

M

23

95002

Tom

M

23

  1. 读取student表的数据内容,将其转为JSON格式,发送给Kafka;

在导入kafka包之前,我们要安装Python 第三方库 kafka-Python包

​
python -m pip install  requests  --force-reinstall -i https://pypi.doubanio.com/simple/ pip

​

完整代码

#从导入kafka 生产者的包
from kafka import KafkaProducer
import json
import pymysql.cursors
# 连接kafka
producer=KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))
# 连接自己的mysql数据库
connect=pymysql.Connect(host='localhost', port=3306, user='root',  passwd='123456',  db='valentines', charset='utf8')
# 获取游标对象
cursor = connect.cursor()
# 查询数据
sql = "select * from student"
#执行SQL语句
result = cursor.execute(sql)
#fetchall查询所有结果
data = cursor.fetchall()
for msg in data:
    val = {}
    val ['sno']=msg[0]
    val ['sname']=msg[1]
    val ['ssex']=msg[2]
    val ['sage']=msg[3]
    producer.send('json_topic', val)
    #提交事务
    connect.commit()
    print(val )
connect.close()

2.再从Kafka中获取到JSON格式数据,打印出来;

完整代码:

from kafka import KafkaConsumer
import json
import pymysql.cursors

consumer = KafkaConsumer('json_topic', bootstrap_servers=['localhost:9092'], group_id=None, auto_offset_reset='earliest')

for msg in consumer:
    msgs = str(msg.value, encoding="utf-8")
    python_data = json.loads(msgs)
    print(python_data)

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐