报错如下:

  File "/usr/lib/python2.7/site-packages/kafka_python-1.3.5-py2.7.egg/kafka/producer/kafka.py", line 347, in __init__
    **self.config)
  File "/usr/lib/python2.7/site-packages/kafka_python-1.3.5-py2.7.egg/kafka/client_async.py", line 221, in __init__
    self.config['api_version'] = self.check_version(timeout=check_timeout)
  File "/usr/lib/python2.7/site-packages/kafka_python-1.3.5-py2.7.egg/kafka/client_async.py", line 846, in check_version
    raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable

细看错误信息,说是配置参数 api_version 的问题,我的程序调用里配置如下:

p = Producer('kafka_ip:port',
             acks=1,
             batch_size=10485760,
             linger_ms=100,
             max_block_ms=10000,
             max_request_size=10485760,
             send_buffer_bytes=10485760)

需要加一个api_version,如下:

p = Producer('kafka_ip:port',
             acks=1,
             batch_size=10485760,
             linger_ms=100,
             max_block_ms=10000,
             max_request_size=10485760,
             send_buffer_bytes=10485760,
             api_version=(0, 10))  # 注意这里加了新内容

注意:
若kafka版本为0.10,则 api_version=(0, 10)
​若kafka版本为0.10.2,则 api_version=(0, 10, 2)
这样设置就可以连接到kafka了。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐