python连接kerberos认证的kafka,踩坑(NoBrokersAvailable)
然后再看源码,吐了一口老血,其实在2.0.2的版本里面,是有自动获取版本的操作的,只有连接失败的情况之后才会使用手动设置版本,本来就已经连不上了,设置版本对我这里来说,压根不是根本原因啊。因为原先对kerberos不太熟,疯狂看资料之后,发现自己漏了一个操作,就是kerberos里面有个kinit操作,需要我们在自己机器里面手动kinit,然后才能连接。接着,最不靠谱的地方出现了,全网铺天盖地的出
使用python-kafka连接带有kerberos认证的kafka集群的时候,由于kerberos没有集成ldap,无法直接使用账号密码登录。记录下
参考了网上各种乱七八糟的文章,代码大致都如图所示:
producer = KafkaProducer(bootstrap_servers=['master.cloud.com:9092'],
security_protocol='SASL_PLAINTEXT',
sasl_mechanism='GSSAPI',
sasl_kerberos_service_name='kafka',
value_serializer=lambda m: json.dumps(m).encode('utf-8'))
for i in range(50):
producer.send('python_mfa_topic', {"序列x": i})
producer.flush()
然后就是安装python-kafka, python-gssapi两个库
我就是这么干的,然后出现了NoBrokersAvailable。
接着,最不靠谱的地方出现了,全网铺天盖地的出现了加一个api_version =(xxx版本)参数,然后就可以使用了。
我也尝试加了,完全没有效果,而且版本应该是没问题,因为我从2.0.2一致尝试到0.10。然后再看源码,吐了一口老血,其实在2.0.2的版本里面,是有自动获取版本的操作的,只有连接失败的情况之后才会使用手动设置版本,本来就已经连不上了,设置版本对我这里来说,压根不是根本原因啊。
因为原先对kerberos不太熟,疯狂看资料之后,发现自己漏了一个操作,就是kerberos里面有个kinit操作,需要我们在自己机器里面手动kinit,然后才能连接。
对应python的库是
krbticket
所以,我们需要在代码里面手动kinit一下。最终完整代码:
producer:
import os
from krbticket import KrbConfig, KrbCommand
from conf import config
from kafka import KafkaProducer
import json
jaas_conf = os.path.join(config.project_server_file, 'keytab/kafka/kafka_jaas.conf')
krb5_conf = os.path.join(config.project_server_file, 'keytab/kafka/krb5.conf')
user_name = 'hdfs'
keytab_conf = os.path.join(config.project_server_file, f'keytab/kafka/{user_name}.keytab')
try:
# 建议加上,如果有多个使用默认缓存的脚本,则一个脚本 (kinit/kdestroy) 的操作可能会影响其他脚本。
# 使用该KRB5CCNAME变量为每个脚本设置一个缓存文件
os.environ['KRB5CCNAME'] = os.path.join(config.project_server_file, f'keytab/kafka/krb5cc_{user_name}')
kconfig = KrbConfig(principal='hdfs/hdfs@HDFS.COM',
keytab=keytab_conf)
KrbCommand.kinit(kconfig)
os.environ['KAFKA_OPTS'] = f'-Djava.security.auth.login.config={jaas_conf}' \
f' -Djava.security.krb5.conf={krb5_conf}'
producer = KafkaProducer(bootstrap_servers=['master.cloud.com:9092', 'slave1.cloud.com:9092'],
security_protocol='SASL_PLAINTEXT',
sasl_mechanism='GSSAPI',
sasl_kerberos_service_name='kafka',
value_serializer=lambda m: json.dumps(m).encode('utf-8'))
for i in range(50):
producer.send('python_mfa_topic', {"序列x": i})
producer.flush()
finally:
KrbCommand.kdestroy(kconfig)
print("destory完成")
consumer:
import os
from krbticket import KrbConfig, KrbCommand
from conf import config
from kafka import KafkaProducer
import json
jaas_conf = os.path.join(config.project_server_file, 'keytab/kafka/kafka_jaas.conf')
krb5_conf = os.path.join(config.project_server_file, 'keytab/kafka/krb5.conf')
user_name = 'hdfs'
keytab_conf = os.path.join(config.project_server_file, f'keytab/kafka/{user_name}.keytab')
try:
# 建议加上,如果有多个使用默认缓存的脚本,则一个脚本 (kinit/kdestroy) 的操作可能会影响其他脚本。
# 使用该KRB5CCNAME变量为每个脚本设置一个缓存文件
os.environ['KRB5CCNAME'] = os.path.join(config.project_server_file, f'keytab/kafka/krb5cc_{user_name}')
kconfig = KrbConfig(principal='hdfs/hdfs@HDFS.COM',
keytab=keytab_conf)
KrbCommand.kinit(kconfig)
os.environ['KAFKA_OPTS'] = f'-Djava.security.auth.login.config={jaas_conf}' \
f' -Djava.security.krb5.conf={krb5_conf}'
consumer = KafkaConsumer('python_mfa_topic',
bootstrap_servers=['master.cloud.com:9092', 'slave1.cloud.com:9092'],
security_protocol='SASL_PLAINTEXT',
sasl_mechanism='GSSAPI',
auto_offset_reset='earliest',
group_id='python_mfa_group',
sasl_kerberos_service_name='kafka',
value_deserializer=lambda m: json.loads(m.decode('utf-8')))
for message in consumer:
print(message.value)
finally:
KrbCommand.kdestroy(kconfig)
print("destory完成")
测试代码如果需要用在生产上,请自行修复bug。
更多推荐
所有评论(0)