使用python-kafka连接带有kerberos认证的kafka集群的时候,由于kerberos没有集成ldap,无法直接使用账号密码登录。记录下

参考了网上各种乱七八糟的文章,代码大致都如图所示:

producer = KafkaProducer(bootstrap_servers=['master.cloud.com:9092'],
                             security_protocol='SASL_PLAINTEXT',
                             sasl_mechanism='GSSAPI',
                             sasl_kerberos_service_name='kafka',
                             value_serializer=lambda m: json.dumps(m).encode('utf-8'))
for i in range(50):
    producer.send('python_mfa_topic', {"序列x": i})
    producer.flush()

然后就是安装python-kafka, python-gssapi两个库

我就是这么干的,然后出现了NoBrokersAvailable。

接着,最不靠谱的地方出现了,全网铺天盖地的出现了加一个api_version =(xxx版本)参数,然后就可以使用了。

我也尝试加了,完全没有效果,而且版本应该是没问题,因为我从2.0.2一致尝试到0.10。然后再看源码,吐了一口老血,其实在2.0.2的版本里面,是有自动获取版本的操作的,只有连接失败的情况之后才会使用手动设置版本,本来就已经连不上了,设置版本对我这里来说,压根不是根本原因啊。

因为原先对kerberos不太熟,疯狂看资料之后,发现自己漏了一个操作,就是kerberos里面有个kinit操作,需要我们在自己机器里面手动kinit,然后才能连接。

对应python的库是

krbticket

所以,我们需要在代码里面手动kinit一下。最终完整代码:

producer:

import os

from krbticket import KrbConfig, KrbCommand

from conf import config
from kafka import KafkaProducer
import json

jaas_conf = os.path.join(config.project_server_file, 'keytab/kafka/kafka_jaas.conf')
krb5_conf = os.path.join(config.project_server_file, 'keytab/kafka/krb5.conf')
user_name = 'hdfs'
keytab_conf = os.path.join(config.project_server_file, f'keytab/kafka/{user_name}.keytab')

try:
    # 建议加上,如果有多个使用默认缓存的脚本,则一个脚本 (kinit/kdestroy) 的操作可能会影响其他脚本。
    # 使用该KRB5CCNAME变量为每个脚本设置一个缓存文件
    os.environ['KRB5CCNAME'] = os.path.join(config.project_server_file, f'keytab/kafka/krb5cc_{user_name}')
    kconfig = KrbConfig(principal='hdfs/hdfs@HDFS.COM',
                        keytab=keytab_conf)
    KrbCommand.kinit(kconfig)

    os.environ['KAFKA_OPTS'] = f'-Djava.security.auth.login.config={jaas_conf}' \
                               f' -Djava.security.krb5.conf={krb5_conf}'

    producer = KafkaProducer(bootstrap_servers=['master.cloud.com:9092', 'slave1.cloud.com:9092'],
                             security_protocol='SASL_PLAINTEXT',
                             sasl_mechanism='GSSAPI',
                             sasl_kerberos_service_name='kafka',
                             value_serializer=lambda m: json.dumps(m).encode('utf-8'))

    for i in range(50):
        producer.send('python_mfa_topic', {"序列x": i})
    producer.flush()
finally:
    KrbCommand.kdestroy(kconfig)
    print("destory完成")

consumer:

import os

from krbticket import KrbConfig, KrbCommand

from conf import config
from kafka import KafkaProducer
import json

jaas_conf = os.path.join(config.project_server_file, 'keytab/kafka/kafka_jaas.conf')
krb5_conf = os.path.join(config.project_server_file, 'keytab/kafka/krb5.conf')
user_name = 'hdfs'
keytab_conf = os.path.join(config.project_server_file, f'keytab/kafka/{user_name}.keytab')

try:
    # 建议加上,如果有多个使用默认缓存的脚本,则一个脚本 (kinit/kdestroy) 的操作可能会影响其他脚本。
    # 使用该KRB5CCNAME变量为每个脚本设置一个缓存文件
    os.environ['KRB5CCNAME'] = os.path.join(config.project_server_file, f'keytab/kafka/krb5cc_{user_name}')
    kconfig = KrbConfig(principal='hdfs/hdfs@HDFS.COM',
                        keytab=keytab_conf)
    KrbCommand.kinit(kconfig)

    os.environ['KAFKA_OPTS'] = f'-Djava.security.auth.login.config={jaas_conf}' \
                               f' -Djava.security.krb5.conf={krb5_conf}'

    consumer = KafkaConsumer('python_mfa_topic',
                             bootstrap_servers=['master.cloud.com:9092', 'slave1.cloud.com:9092'],
                             security_protocol='SASL_PLAINTEXT',
                             sasl_mechanism='GSSAPI',
                             auto_offset_reset='earliest',
                             group_id='python_mfa_group',
                             sasl_kerberos_service_name='kafka',
                             value_deserializer=lambda m: json.loads(m.decode('utf-8')))
    for message in consumer:
        print(message.value)
finally:
    KrbCommand.kdestroy(kconfig)
    print("destory完成")

测试代码如果需要用在生产上,请自行修复bug。

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐