Kafka与MySQL的组合使用
这是做的数据采集实验,我们在做的时候,要先把zookeeper和kafka服务启动,可以参考我上一篇.https://blog.csdn.net/hhjdshz/article/details/123881242?spm=1001.2014.3001.5501学生表studentsnosnamessexsage95001JohnM.
·
这是做的数据采集实验,我们在做的时候,要先把zookeeper和kafka服务启动,可以参考我上一篇.https://blog.csdn.net/hhjdshz/article/details/123881242?spm=1001.2014.3001.5501
学生表student
sno | sname | ssex | sage |
95001 | John | M | 23 |
95002 | Tom | M | 23 |
- 读取student表的数据内容,将其转为JSON格式,发送给Kafka;
在导入kafka包之前,我们要安装Python 第三方库 kafka-Python包
python -m pip install requests --force-reinstall -i https://pypi.doubanio.com/simple/ pip
完整代码
#从导入kafka 生产者的包
from kafka import KafkaProducer
import json
import pymysql.cursors
# 连接kafka
producer=KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))
# 连接自己的mysql数据库
connect=pymysql.Connect(host='localhost', port=3306, user='root', passwd='123456', db='valentines', charset='utf8')
# 获取游标对象
cursor = connect.cursor()
# 查询数据
sql = "select * from student"
#执行SQL语句
result = cursor.execute(sql)
#fetchall查询所有结果
data = cursor.fetchall()
for msg in data:
val = {}
val ['sno']=msg[0]
val ['sname']=msg[1]
val ['ssex']=msg[2]
val ['sage']=msg[3]
producer.send('json_topic', val)
#提交事务
connect.commit()
print(val )
connect.close()
2.再从Kafka中获取到JSON格式数据,打印出来;
完整代码:
from kafka import KafkaConsumer
import json
import pymysql.cursors
consumer = KafkaConsumer('json_topic', bootstrap_servers=['localhost:9092'], group_id=None, auto_offset_reset='earliest')
for msg in consumer:
msgs = str(msg.value, encoding="utf-8")
python_data = json.loads(msgs)
print(python_data)
更多推荐
已为社区贡献2条内容
所有评论(0)