一、背景

实现对主题的订阅控制,动态分配客户端的读写权限。

二、操作指令

  1. 创建writer用户,密码是writer-pwd:
    ./kafka-configs.sh --zookeeper 192.168.248.100:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=writer-pwd],SCRAM-SHA-512=[password=writer-pwd]' --entity-type users --entity-name writer

  2. 创建reader用户,密码是reader-pwd:
    ./kafka-configs.sh --zookeeper 192.168.248.100:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=reader-pwd],SCRAM-SHA-512=[password=reader-pwd]' --entity-type users --entity-name reader

  3. 创建admin用户,密码是admin:
    ./kafka-configs.sh --zookeeper 192.168.248.100:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=admin],SCRAM-SHA-512=[password=admin]' --entity-type users --entity-name admin

  4. 查看一下writer用户的信息
    ./kafka-configs.sh --zookeeper 192.168.248.100:2181 --describe --entity-type users --entity-name writer

  5. config底下添加配置文件kafka-broker-jaas.conf(这里配置admin用户用于实现broker间的通讯):

    KafkaServer {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username="admin"
    password="admin";
    };
    
  6. 配置broker端的server.properties

    # 启用ACL
    authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
    # 设置本例中admin为超级用户
    super.users=User:admin
    # 启用SCRAM机制,采用SCRAM-SHA-512算法
    sasl.enabled.mechanisms=SCRAM-SHA-512
    # 为broker间通讯开启SCRAM机制,采用SCRAM-SHA-512算法
    sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
    # broker间通讯使用PLAINTEXT,本例中不演示SSL配置
    security.inter.broker.protocol=SASL_PLAINTEXT
    # 配置listeners使用SASL_PLAINTEXT
    listeners=SASL_PLAINTEXT://192.168.248.100:9092
    # 配置advertised.listeners
    advertised.listeners=SASL_PLAINTEXT://192.168.248.100:9092
    
  7. kafka-server-start.sh脚本最后一行配置环境变量如下:
    exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/opt/kafka/kafka1/config/kafka-broker-jaas.conf kafka.Kafka "$@"

  8. 启动kafka

  9. 创建一个主题:
    ./kafka-topics.sh --create --zookeeper 192.168.248.100:2181 --topic test --partitions 1 --replication-factor 1

  10. config底下创建writer用户生产配置文件producer.conf:

    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=SCRAM-SHA-512
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="writer" password="writer-pwd";
    
  11. 给writer写的权限:
    ./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.248.100:2181 --add --allow-principal User:writer --operation Write --topic bigData

  12. 发送消息:
    ./kafka-console-producer.sh --broker-list 192.168.248.100:9092 --topic test --producer.config /opt/kafka/kafka/config/producer.conf
    测试发送消息成功

  13. config底下创建writer用户消费配置文件consumer.conf:

    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=SCRAM-SHA-512
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="reader" password="reader-pwd";
    
  14. 设置reader用户的读权限:
    ./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.248.100:2181 --add --allow-principal User:admin --operation Read --topic bigData

  15. 设置reader用户的读test-group消费组权限:
    ./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.248.100:2181 --add --allow-principal User:admin --operation Read --group test-group

  16. 消费消息:
    ./kafka-console-consumer.sh --bootstrap-server 192.168.248.100:9092 --topic test --from-beginning --consumer.config /opt/kafka/kafka1/config/consumer.conf --group test-group

三、ACL权限整合springboot

在allpication.yml加上配置,即可正常消费(前提是reader用户已赋予read权限),发送者也一样

spring:
  kafka:
    properties:
      sasl.mechanism: SCRAM-SHA-512
      security.protocol: SASL_PLAINTEXT
      sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="reader" password="reader-pwd";
package cn.BigData;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * 测试发送对象
 * @author 天真热
 * @create 2022-03-31 10:03
 * @desc
 **/
@RestController
public class Send {
    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    @KafkaListener(topics = "test", groupId = "test-group")
    public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
        System.out.println("test:" + record.value());
        //手动提交offset
        ack.acknowledge();
    }

    @RequestMapping("/sendTest")
    public void sendTest() {
        kafkaTemplate.send("test", "testtesttesttest");
    }
}

四、Acl整合java代码

由于我们给kafka配置的Acl权限,所有对于kafka的操作都需要登录才可以进行

  1. 创建kafka.conf配置文件:因为admin配置了管理员权限,所以admin拥有最高权限。

    KafkaClient{
    org.apache.kafka.common.security.scram.ScramLoginModule required 
    username="admin"
    password="admin";
    };
    

    ps补充:也可以直接在prop申明账户,这是后面发现的比较方便的,替代前一种方案

         props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"admin\";");
    
  2. 由于我们所有的操作都需要与kafka创建连接,所以我这里写了一个单例模式,创建一个与kafka的持久连接,方便提高效率

    private static AdminClient client;
     /**
     * 获取AdminClient连接
     *
     * @param bootstrapServers
     * @return
     */
    public static AdminClient getAdminClient(String bootstrapServers) throws IOException {
        String resource = new ClassPathResource("/kafka.conf").getURL().getPath();
        if (client == null) {
            synchronized (AdminClient.class) {
                if (client == null) {
                    client = initAdminClient(bootstrapServers, resource);
                }
            }
        }
    
        return client;
    }
    
    /**
     * 初始化AdminClient
     *
     * @param bootstrapServers
     * @param resource
     * @return
     */
    public static AdminClient initAdminClient(String bootstrapServers, String resource) {
        System.setProperty("java.security.auth.login.config", resource);
        //创建链接
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServers);
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("max.request.size", 8000000);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "SCRAM-SHA-512");
        AdminClient client = AdminClient.create(props);
        return client;
    
  3. kafka创建用户

     /**
     * kafka创建用户
     *
     * @param bootstrapServers
     * @param account
     * @param password
     */
    public static void addUser(String bootstrapServers, String account, String password) throws ExecutionException, InterruptedException, IOException {
        //获取AdminClient连接
        AdminClient adminClient = getAdminClient(bootstrapServers);
        //创建User列表
        List<UserScramCredentialAlteration> alterations = new ArrayList<>();
        //构造Scram认证机制信息
        ScramCredentialInfo info = new ScramCredentialInfo(org.apache.kafka.clients.admin.ScramMechanism.SCRAM_SHA_512, 8192);
        //用户信息
        UserScramCredentialAlteration userScramCredentialAdd = new UserScramCredentialUpsertion(account, info, password);
        //添加用户信息到集合
        alterations.add(userScramCredentialAdd);
        //创建用户,并拿到返回结果
        AlterUserScramCredentialsResult result = adminClient.alterUserScramCredentials(alterations);
        result.all().get();
    }
    
  4. kafka删除用户

    /**
     * kafka删除用户
     *
     * @param bootstrapServers
     * @param account
     */
    public static void deleteUser(String bootstrapServers, String account) throws ExecutionException, InterruptedException, IOException {
        AdminClient adminClient = getAdminClient(bootstrapServers);
        //创建删除列表
        List<UserScramCredentialAlteration> alterations = new ArrayList<>();
        //构建删除用的UserScramCredentialAlteration
        UserScramCredentialAlteration userScramCredentialDel = new UserScramCredentialDeletion(account, org.apache.kafka.clients.admin.ScramMechanism.SCRAM_SHA_512);
        //添加认证信息到列表
        alterations.add(userScramCredentialDel);
        //执行方法,并拿到返回结果
        AlterUserScramCredentialsResult result = adminClient.alterUserScramCredentials(alterations);
        //阻塞等待结果完成
        result.all().get();
    }
    
  5. kafka获取所有用户信息

    /**
     * 获取所有用户信息
     *
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public static void describeAccount(String bootstrapServers) throws ExecutionException, InterruptedException {
        AdminClient adminClient = getAdminClient(bootstrapServers);
        //查询所有的账户,这也是默认方法
        DescribeUserScramCredentialsResult result = adminClient.describeUserScramCredentials();
        //执行方法,并拿到返回结果
        java.util.Map<String, UserScramCredentialsDescription> future = result.all().get();
        //输出
        future.forEach((name, info) -> System.out.println("[ScramUserName:" + name + "]:[ScramUserInfo:" + info.toString() + "]"));
    }
    
  6. 赋予用户消费组读权限

    /**
     * 给用户赋予消费组读权限
     *
     * @param bootstrapServers
     * @param account
     * @param consumerGroup
     */
    public static void addGroupReadAcl(String bootstrapServers, String account, String consumerGroup) throws IOException, ExecutionException, InterruptedException {
        //获取AdminClient连接
        AdminClient adminClient = getAdminClient(bootstrapServers);
        //绑定消费组
        ResourcePattern resourcePatternGroup = new ResourcePattern(ResourceType.GROUP, consumerGroup, PatternType.LITERAL);
        //绑定用户、权限
        AccessControlEntry accessControlEntryRead = new AccessControlEntry("User:" + account, "*", AclOperation.READ, AclPermissionType.ALLOW);
        //绑定用户、权限、消费组
        AclBinding aclBindingGroup = new AclBinding(resourcePatternGroup, accessControlEntryRead);
    
        Collection<AclBinding> aclBindingCollection = new ArrayList<>();
        aclBindingCollection.add(aclBindingGroup); //添加到集合
    
        CreateAclsResult aclResult = adminClient.createAcls(aclBindingCollection);
        KafkaFuture<Void> result = aclResult.all();
        result.get(); //执行
    }
    
  7. 移除用户消费组读权限

    /**
     * 移除消费组读权限
     *
     * @param bootstrapServers
     * @param account
     * @param consumerGroup
     * @throws IOException
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public static void deleteGroupReadAcl(String bootstrapServers, String account, String consumerGroup) throws IOException, ExecutionException, InterruptedException {
        //获取AdminClient连接
        AdminClient adminClient = getAdminClient(bootstrapServers);
        //绑定消费组
        ResourcePatternFilter resourcePatternFilterGroup = new ResourcePatternFilter(ResourceType.GROUP, consumerGroup, PatternType.LITERAL);
        //绑定用户、权限
        AccessControlEntryFilter accessControlEntryFilter = new AccessControlEntryFilter("User:" + account, "*", AclOperation.READ, AclPermissionType.ALLOW);
        //绑定用户、权限、消费组
        AclBindingFilter aclBindingGroup = new AclBindingFilter(resourcePatternFilterGroup, accessControlEntryFilter);
    
        Collection<AclBindingFilter> aclBindingCollection = new ArrayList<>();
        aclBindingCollection.add(aclBindingGroup);//添加到集合
    
        DeleteAclsResult aclResult = adminClient.deleteAcls(aclBindingCollection);
        KafkaFuture<Collection<AclBinding>> result = aclResult.all();
        result.get();
    }
    
  8. 赋予用户主题读/写权限

    /**
     * 给用户赋予主题读/写权限
     * @param bootstrapServers
     * @param topic
     * @param account
     * @param acl
     * @throws IOException
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public static void addTopicReadOrWriterAcl(String bootstrapServers, String topic, String account, AclOperation acl) throws IOException, ExecutionException, InterruptedException {
        //获取AdminClient连接
        AdminClient adminClient = getAdminClient(bootstrapServers);
        //绑定主题
        ResourcePattern resourcePattern = new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL);
        //绑定用户和权限
        AccessControlEntry accessControlEntryRead = new AccessControlEntry("User:" + account, "*", acl, AclPermissionType.ALLOW);
        //绑定用户、主题、权限
        AclBinding aclBindingRead = new AclBinding(resourcePattern, accessControlEntryRead);
    
        //赋予权限
        Collection<AclBinding> aclBindingCollection = new ArrayList<>();
        aclBindingCollection.add(aclBindingRead); //添加到集合
        CreateAclsResult aclResult = adminClient.createAcls(aclBindingCollection);
        KafkaFuture<Void> result = aclResult.all();
        result.get(); //执行
    }
    
  9. 移除用户主题读/写权限

    /**
     * 移除用户主题读/写权限
     *
     * @param bootstrapServers
     * @param topic
     * @param account
     */
    public static void deleteTopicReadOrWriterAcl(String bootstrapServers, String topic, String account) throws IOException, ExecutionException, InterruptedException {
        //获取AdminClient连接
        AdminClient adminClient = getAdminClient(bootstrapServers);
        //绑定主题
        ResourcePatternFilter resourcePatternFilter = new ResourcePatternFilter(ResourceType.TOPIC, topic, PatternType.LITERAL);
        //绑定用户、权限
        AccessControlEntryFilter accessControlEntryFilter = new AccessControlEntryFilter("User:" + account, "*", AclOperation.WRITE/READ, AclPermissionType.ALLOW);
        //绑定用户、主题、权限
        AclBindingFilter aclBinding = new AclBindingFilter(resourcePatternFilter, accessControlEntryFilter);
        //移除权限
        Collection<AclBindingFilter> aclBindingCollection = new ArrayList<>();
        aclBindingCollection.add(aclBinding);
        DeleteAclsResult aclResult = adminClient.deleteAcls(aclBindingCollection);
        KafkaFuture<Collection<AclBinding>> result = aclResult.all();
        result.get();
    }
    
  10. 批量移除主题读写权限

    /**
     * 批量移除主题读写权限
     *
     * @param bootstrapServers
     * @param kafkaAuthoritys
     */
    public static void batchDeleteTopicReadOrWriterAcl(String bootstrapServers, List<KafkaAuthority> kafkaAuthoritys) throws IOException, ExecutionException, InterruptedException {
        //获取AdminClient连接
        AdminClient adminClient = getAdminClient(bootstrapServers);
        adminClient.close();
        //移除权限集合
        Collection<AclBindingFilter> aclBindingCollection = new ArrayList<>();
    
        for (KafkaAuthority kafkaAuthority : kafkaAuthoritys) {
            //绑定主题
            ResourcePatternFilter resourcePatternFilter = new ResourcePatternFilter(ResourceType.TOPIC, kafkaAuthority.getTopic(), PatternType.LITERAL);
            //绑定用户、权限
            AccessControlEntryFilter accessControlEntryFilter = new AccessControlEntryFilter("User:" + kafkaAuthority.getAccount(), "*", AclOperation.WRITE/READ(这里可存表,然后由kafkaAuthority动态获取), AclPermissionType.ALLOW);
            //绑定用户、主题、权限
            AclBindingFilter aclBinding = new AclBindingFilter(resourcePatternFilter, accessControlEntryFilter);
            //加入集合
            aclBindingCollection.add(aclBinding);
        }
    
  11. 查看所有用户的所有权限

    /**
     * 账户权限
     *
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public static void describeAllAcl() throws ExecutionException, InterruptedException {
        AdminClient adminClient = getAdminClient();
        DescribeAclsResult result = adminClient.describeAcls(AclBindingFilter.ANY);
        try {
            Collection<AclBinding> gets = result.values().get();
            for (AclBinding get : gets) {
                System.out.println(get.pattern().name());
                System.out.println(get.pattern().patternType());
                System.out.println(get.pattern().resourceType());
                System.out.println(get.entry().principal());
                System.out.println(get.entry().permissionType());
                System.out.println(get.entry().operation());
                System.out.println("-------------------------");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
    
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐