1.准备文件kafka_server_jaas.conf

KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_alice="alice-secret";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
};

2.准备文件zk_server_jaas.conf

Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};

3. 放入secrets文件夹

4.准备文件docker-compose.yml

version: '2'
services:
    zookeeper:
        image: confluentinc/cp-zookeeper:5.1.2
        hostname: zookeeper
        container_name: zookeeper
        restart: always
        ports:
            - 2182:2182
        environment:
            ZOOKEEPER_CLIENT_PORT: 2182
            ZOOKEEPER_TICK_TIME: 2000
            ZOOKEEPER_MAXCLIENTCNXNS: 0
            ZOOKEEPER_AUTHPROVIDER.1: org.apache.zookeeper.server.auth.SASLAuthenticationProvider
            ZOOKEEPER_REQUIRECLIENTAUTHSCHEME: sasl
            ZOOKEEPER_JAASLOGINRENEW: 3600000
            KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/zk_server_jaas.conf
        volumes:
            - ./secrets:/etc/kafka/secrets
    kafka:
        image: confluentinc/cp-kafka:5.1.2
        hostname: broker
        container_name: kafka
        restart: always
        depends_on:
            - zookeeper
        ports:
            - 9092:9092
        environment:
            KAFKA_BROKER_ID: 1
            KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2182/kafka'
            KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
            KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
            KAFKA_LISTENERS: SASL_PLAINTEXT://0.0.0.0:9092
            KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://xxx.xxx.xxx.xxx:9092
            KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SASL_PLAINTEXT
            KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
            KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
            KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
            KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/kafka_server_jaas.conf
            KAFKA_SUPER_USERS: User:admin
        volumes:
            - ./secrets:/etc/kafka/secrets

并把 上述xxx.xxx.xxx.xxx改成自己的IP地址

5. 执行命令docker-compose up -d 创建kafka

 5 写python代码验证

生产者

import time
import json
from datetime import datetime
from kafka import KafkaProducer, KafkaConsumer


def producer_event(server_info):
    producer = KafkaProducer(bootstrap_servers=server_info,
                             security_protocol='SASL_PLAINTEXT',
                             sasl_mechanism='PLAIN',
                             sasl_plain_username='admin',
                             sasl_plain_password='admin-secret')
    topic = "first.kafka.test"
    print("kafka连接成功")
    for i in range(7200):
        data = {
            "name":"hello world"
        }
        data_json = json.dumps(data)
        producer.send(topic, data_json.encode()).get(timeout=30)
        print("数据推送成功,当前时间为:{},数据为:{}".format(datetime.now(), data_json))
        time.sleep(1)
    producer.close()


server="127.0.0.1:9092"
producer_event(server)

运行结果,生产者数据发送成功

 消费者

import time
import json
from datetime import datetime
from kafka import KafkaProducer, KafkaConsumer


def consum_event(server_info):
    topic = "first.kafka.test"
    consum = KafkaConsumer(topic, bootstrap_servers=server_info,
                             security_protocol='SASL_PLAINTEXT',
                             sasl_mechanism='PLAIN',
                             sasl_plain_username='admin',
                             sasl_plain_password='admin-secret',
                             api_version=(2, 1, 1)
                             )

    for msg in consum:
        msg_1 = json.loads(msg.value)
        print("订阅到消息为:{}".format(msg_1))



server="127.0.0.1:9092"
consum_event(server)

运行结果 消费成功订阅消息

 

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐