启动console-producer:

[root@kafka01 bin]# ./kafka-console-producer.sh --bootstrap-server kafka01:9092 --topic first
>[2021-06-08 03:33:00,972] WARN [Producer clientId=console-producer] Bootstrap broker kafka01:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2021-06-08 03:33:01,365] WARN [Producer clientId=console-producer] Bootstrap broker kafka01:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2021-06-08 03:33:01,788] WARN [Producer clientId=console-producer] Bootstrap broker kafka01:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)

启动console-consumer:

[root@kafka01 bin]# ./kafka-console-consumer.sh --bootstrap-server kafka01:9092 --topic first
[2021-06-08 03:32:04,632] WARN [Consumer clientId=consumer-console-consumer-49370-1, groupId=console-consumer-49370] Bootstrap broker kafka01:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2021-06-08 03:32:05,033] WARN [Consumer clientId=consumer-console-consumer-49370-1, groupId=console-consumer-49370] Bootstrap broker kafka01:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)

开启了认证机制,导致生产者和消费失败,下面为writer用户配置认证信息,首先创建writer的JAAS文件,如下:

KafkaClient{
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="writer"
        password="writer";
};

保存为config/acl_writer_jaas.conf,拷贝一份新的bin/kafka-console-producer.sh脚本,并将该jaas文件做为JVM参数传给它:

cp bin/kafka-console-producer.sh bin/acl-kafka-console-producer.sh
vim bin/acl-kafka-console-producer.sh
# 把文件中的这一行
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"

# 修改为
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/opt/kafka/config/acl_writer_jaas.conf kafka.tools.ConsoleProducer "$@"

使用新的脚本启动生产者:

bin/acl-kafka-console-producer.sh --bootstrap-server kafka01:9092 --topic first --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
>sdf
[2021-06-08 03:41:28,958] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {first=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2021-06-08 03:41:28,959] ERROR [Producer clientId=console-producer] Topic authorization failed for topics [first] (org.apache.kafka.clients.Metadata)
[2021-06-08 03:41:28,960] ERROR Error when sending message to topic first with key: null, value: 3 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [first]

依然有错误,不过错误变成了"topic授权失败",说明console producer通过认证,但没有授权,需要配置acl来让用户writer有权限写入topic。

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=kafka01:2181 --add --allow-principal User:writer --operation Write --topic first
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=first, patternType=LITERAL)`: 
 	(principal=User:writer, host=*, operation=WRITE, permissionType=ALLOW) 
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=first, patternType=LITERAL)`: 
 	(principal=User:writer, host=*, operation=WRITE, permissionType=ALLOW) 

授权成功后重新操作即可。

使用go客户端发送消息:

func TestProducer(t *testing.T) {

	topic := "first"
	config := sarama.NewConfig()

	config.Version = sarama.MinVersion
	config.Producer.Return.Successes = true
	config.Producer.RequiredAcks = sarama.WaitForLocal
	config.Producer.Flush.Frequency = 10000
	config.Producer.Flush.Bytes = 1048576000

	config.Net.SASL.Enable = true
	config.Net.SASL.User = "writer"
	config.Net.SASL.Password = "writer"

	producer, err := sarama.NewSyncProducer(addrs, config)
	if err != nil {
		panic(err)
	}

	var messages []*sarama.ProducerMessage

	message := &sarama.ProducerMessage{Topic: topic, Value: sarama.StringEncoder("Hello world")}
	messages = append(messages, message)

	err = producer.SendMessages(messages)
	if err != nil {
		panic(err)
	}

	for _, v := range messages {
		fmt.Println(v.Offset, v.Partition, v.Value)
	}

}

生产者发送消息成功了,下面配置消费者,首选创建reader用户的JAAS文件:

KafkaClient{
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="reader"
        password="reader";
};

拷贝一份新的console consumer脚本来指定上面的acl_reader_jaas.conf文件:

cp bin/kafka-console-consumer.sh bin/acl-kafka-console-consumer.sh 
# 把文件中的这一行
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"

# 修改为
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/opt/kafka/config/acl_reader_jaas.conf kafka.tools.ConsoleConsumer "$@"

然后创建一个consumer.config文件,指定console producer的3个属性:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
group.id=reader-group

现在运行console consumer:

[root@kafka01 kafka]# bin/acl-kafka-console-consumer.sh --bootstrap-server kafka01:9092 --topic first --from-beginning --consumer.config bin/acl-kafka-consumer.config
[2021-06-08 04:16:35,668] WARN [Consumer clientId=consumer-reader-group-1, groupId=reader-group] Error while fetching metadata with correlation id 2 : {first=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2021-06-08 04:16:35,669] ERROR [Consumer clientId=consumer-reader-group-1, groupId=reader-group] Topic authorization failed for topics [first] (org.apache.kafka.clients.Metadata)
[2021-06-08 04:16:35,669] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [first]
Processed a total of 0 messages

同样的topic授权失败,由于要读取topic数据,需要赋予用户reader topic读取的acl规则:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=kafka01:2181 --add --allow-principal User:reader --operation Read --topic first

再次启动,仍然报错,不过这次变成了消费者组授权失败,表明用户reader无权访问reader-group消费者组,同样是授权的问题,需要acl规则来解决,即赋予用户reader消费者组的读权限:

[root@kafka01 kafka]# bin/acl-kafka-console-consumer.sh --bootstrap-server kafka01:9092 --topic first --from-beginning --consumer.config bin/acl-kafka-consumer.config
[2021-06-08 04:18:39,588] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: reader-group
Processed a total of 0 messages

对组授权

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=kafka01:2181 --add --allow-principal User:reader --operation Read --group reader-group

授权成功后重新操作即可。

使用go客户端消费消息:

func TestConsumer(t *testing.T) {

	topic := "first"

	config := sarama.NewConfig()
	config.Consumer.Offsets.AutoCommit.Enable = true
	config.Net.SASL.Enable = true
	config.Net.SASL.User = "reader"
	config.Net.SASL.Password = "reader"

	consumer, err := sarama.NewConsumer(addrs, config)
	if err != nil {
		panic(err)
	}

	partitions, err := consumer.Partitions(topic) //获得该topic所有的分区
	if err != nil {
		panic(err)
	}

	var wg sync.WaitGroup
	wg.Add(len(partitions))

	for partition := range partitions {
		pc, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetOldest)
		if err != nil {
			panic(err)
		}
		wg.Add(1)
		go func(sarama.PartitionConsumer) {  //为每个分区开一个go协程去取值
			for msg := range pc.Messages() { //阻塞直到有值发送过来,然后再继续等待
				fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
			}
			defer pc.AsyncClose()
			wg.Done()
		}(pc)
	}
	wg.Wait()
	consumer.Close()
}

至此,基于acl的生产消费都正常工作了。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐