kafka Acl权限管理
文章目录一、背景二、操作指令三、ACL权限整合springboot四、Acl整合java代码一、背景实现对主题的订阅控制,动态分配客户端的读写权限。二、操作指令创建writer用户,密码是writer-pwd:./kafka-configs.sh --zookeeper 192.168.248.100:2181 --alter --add-config 'SCRAM-SHA-256=[iterat
一、背景
实现对主题的订阅控制,动态分配客户端的读写权限。
二、操作指令
-
创建writer用户,密码是writer-pwd:
./kafka-configs.sh --zookeeper 192.168.248.100:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=writer-pwd],SCRAM-SHA-512=[password=writer-pwd]' --entity-type users --entity-name writer
-
创建reader用户,密码是reader-pwd:
./kafka-configs.sh --zookeeper 192.168.248.100:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=reader-pwd],SCRAM-SHA-512=[password=reader-pwd]' --entity-type users --entity-name reader
-
创建admin用户,密码是admin:
./kafka-configs.sh --zookeeper 192.168.248.100:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=admin],SCRAM-SHA-512=[password=admin]' --entity-type users --entity-name admin
-
查看一下writer用户的信息
./kafka-configs.sh --zookeeper 192.168.248.100:2181 --describe --entity-type users --entity-name writer
-
config底下添加配置文件kafka-broker-jaas.conf(这里配置admin用户用于实现broker间的通讯):
KafkaServer { org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin"; };
-
配置broker端的server.properties
# 启用ACL authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer # 设置本例中admin为超级用户 super.users=User:admin # 启用SCRAM机制,采用SCRAM-SHA-512算法 sasl.enabled.mechanisms=SCRAM-SHA-512 # 为broker间通讯开启SCRAM机制,采用SCRAM-SHA-512算法 sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512 # broker间通讯使用PLAINTEXT,本例中不演示SSL配置 security.inter.broker.protocol=SASL_PLAINTEXT # 配置listeners使用SASL_PLAINTEXT listeners=SASL_PLAINTEXT://192.168.248.100:9092 # 配置advertised.listeners advertised.listeners=SASL_PLAINTEXT://192.168.248.100:9092
-
kafka-server-start.sh脚本最后一行配置环境变量如下:
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/opt/kafka/kafka1/config/kafka-broker-jaas.conf kafka.Kafka "$@"
-
启动kafka
-
创建一个主题:
./kafka-topics.sh --create --zookeeper 192.168.248.100:2181 --topic test --partitions 1 --replication-factor 1
-
config底下创建writer用户生产配置文件producer.conf:
security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="writer" password="writer-pwd";
-
给writer写的权限:
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.248.100:2181 --add --allow-principal User:writer --operation Write --topic bigData
-
发送消息:
./kafka-console-producer.sh --broker-list 192.168.248.100:9092 --topic test --producer.config /opt/kafka/kafka/config/producer.conf
测试发送消息成功 -
config底下创建writer用户消费配置文件consumer.conf:
security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="reader" password="reader-pwd";
-
设置reader用户的读权限:
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.248.100:2181 --add --allow-principal User:admin --operation Read --topic bigData
-
设置reader用户的读test-group消费组权限:
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.248.100:2181 --add --allow-principal User:admin --operation Read --group test-group
-
消费消息:
./kafka-console-consumer.sh --bootstrap-server 192.168.248.100:9092 --topic test --from-beginning --consumer.config /opt/kafka/kafka1/config/consumer.conf --group test-group
三、ACL权限整合springboot
在allpication.yml加上配置,即可正常消费(前提是reader用户已赋予read权限),发送者也一样
spring:
kafka:
properties:
sasl.mechanism: SCRAM-SHA-512
security.protocol: SASL_PLAINTEXT
sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="reader" password="reader-pwd";
package cn.BigData;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* 测试发送对象
* @author 天真热
* @create 2022-03-31 10:03
* @desc
**/
@RestController
public class Send {
@Resource
private KafkaTemplate<String, Object> kafkaTemplate;
@KafkaListener(topics = "test", groupId = "test-group")
public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
System.out.println("test:" + record.value());
//手动提交offset
ack.acknowledge();
}
@RequestMapping("/sendTest")
public void sendTest() {
kafkaTemplate.send("test", "testtesttesttest");
}
}
四、Acl整合java代码
由于我们给kafka配置的Acl权限,所有对于kafka的操作都需要登录才可以进行
-
创建kafka.conf配置文件:因为admin配置了管理员权限,所以admin拥有最高权限。
KafkaClient{ org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin"; };
ps补充:也可以直接在prop申明账户,这是后面发现的比较方便的,替代前一种方案
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"admin\";");
-
由于我们所有的操作都需要与kafka创建连接,所以我这里写了一个单例模式,创建一个与kafka的持久连接,方便提高效率
private static AdminClient client; /** * 获取AdminClient连接 * * @param bootstrapServers * @return */ public static AdminClient getAdminClient(String bootstrapServers) throws IOException { String resource = new ClassPathResource("/kafka.conf").getURL().getPath(); if (client == null) { synchronized (AdminClient.class) { if (client == null) { client = initAdminClient(bootstrapServers, resource); } } } return client; } /** * 初始化AdminClient * * @param bootstrapServers * @param resource * @return */ public static AdminClient initAdminClient(String bootstrapServers, String resource) { System.setProperty("java.security.auth.login.config", resource); //创建链接 Properties props = new Properties(); props.put("bootstrap.servers", bootstrapServers); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("max.request.size", 8000000); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "SCRAM-SHA-512"); AdminClient client = AdminClient.create(props); return client;
-
kafka创建用户
/** * kafka创建用户 * * @param bootstrapServers * @param account * @param password */ public static void addUser(String bootstrapServers, String account, String password) throws ExecutionException, InterruptedException, IOException { //获取AdminClient连接 AdminClient adminClient = getAdminClient(bootstrapServers); //创建User列表 List<UserScramCredentialAlteration> alterations = new ArrayList<>(); //构造Scram认证机制信息 ScramCredentialInfo info = new ScramCredentialInfo(org.apache.kafka.clients.admin.ScramMechanism.SCRAM_SHA_512, 8192); //用户信息 UserScramCredentialAlteration userScramCredentialAdd = new UserScramCredentialUpsertion(account, info, password); //添加用户信息到集合 alterations.add(userScramCredentialAdd); //创建用户,并拿到返回结果 AlterUserScramCredentialsResult result = adminClient.alterUserScramCredentials(alterations); result.all().get(); }
-
kafka删除用户
/** * kafka删除用户 * * @param bootstrapServers * @param account */ public static void deleteUser(String bootstrapServers, String account) throws ExecutionException, InterruptedException, IOException { AdminClient adminClient = getAdminClient(bootstrapServers); //创建删除列表 List<UserScramCredentialAlteration> alterations = new ArrayList<>(); //构建删除用的UserScramCredentialAlteration UserScramCredentialAlteration userScramCredentialDel = new UserScramCredentialDeletion(account, org.apache.kafka.clients.admin.ScramMechanism.SCRAM_SHA_512); //添加认证信息到列表 alterations.add(userScramCredentialDel); //执行方法,并拿到返回结果 AlterUserScramCredentialsResult result = adminClient.alterUserScramCredentials(alterations); //阻塞等待结果完成 result.all().get(); }
-
kafka获取所有用户信息
/** * 获取所有用户信息 * * @throws ExecutionException * @throws InterruptedException */ public static void describeAccount(String bootstrapServers) throws ExecutionException, InterruptedException { AdminClient adminClient = getAdminClient(bootstrapServers); //查询所有的账户,这也是默认方法 DescribeUserScramCredentialsResult result = adminClient.describeUserScramCredentials(); //执行方法,并拿到返回结果 java.util.Map<String, UserScramCredentialsDescription> future = result.all().get(); //输出 future.forEach((name, info) -> System.out.println("[ScramUserName:" + name + "]:[ScramUserInfo:" + info.toString() + "]")); }
-
赋予用户消费组读权限
/** * 给用户赋予消费组读权限 * * @param bootstrapServers * @param account * @param consumerGroup */ public static void addGroupReadAcl(String bootstrapServers, String account, String consumerGroup) throws IOException, ExecutionException, InterruptedException { //获取AdminClient连接 AdminClient adminClient = getAdminClient(bootstrapServers); //绑定消费组 ResourcePattern resourcePatternGroup = new ResourcePattern(ResourceType.GROUP, consumerGroup, PatternType.LITERAL); //绑定用户、权限 AccessControlEntry accessControlEntryRead = new AccessControlEntry("User:" + account, "*", AclOperation.READ, AclPermissionType.ALLOW); //绑定用户、权限、消费组 AclBinding aclBindingGroup = new AclBinding(resourcePatternGroup, accessControlEntryRead); Collection<AclBinding> aclBindingCollection = new ArrayList<>(); aclBindingCollection.add(aclBindingGroup); //添加到集合 CreateAclsResult aclResult = adminClient.createAcls(aclBindingCollection); KafkaFuture<Void> result = aclResult.all(); result.get(); //执行 }
-
移除用户消费组读权限
/** * 移除消费组读权限 * * @param bootstrapServers * @param account * @param consumerGroup * @throws IOException * @throws ExecutionException * @throws InterruptedException */ public static void deleteGroupReadAcl(String bootstrapServers, String account, String consumerGroup) throws IOException, ExecutionException, InterruptedException { //获取AdminClient连接 AdminClient adminClient = getAdminClient(bootstrapServers); //绑定消费组 ResourcePatternFilter resourcePatternFilterGroup = new ResourcePatternFilter(ResourceType.GROUP, consumerGroup, PatternType.LITERAL); //绑定用户、权限 AccessControlEntryFilter accessControlEntryFilter = new AccessControlEntryFilter("User:" + account, "*", AclOperation.READ, AclPermissionType.ALLOW); //绑定用户、权限、消费组 AclBindingFilter aclBindingGroup = new AclBindingFilter(resourcePatternFilterGroup, accessControlEntryFilter); Collection<AclBindingFilter> aclBindingCollection = new ArrayList<>(); aclBindingCollection.add(aclBindingGroup);//添加到集合 DeleteAclsResult aclResult = adminClient.deleteAcls(aclBindingCollection); KafkaFuture<Collection<AclBinding>> result = aclResult.all(); result.get(); }
-
赋予用户主题读/写权限
/** * 给用户赋予主题读/写权限 * @param bootstrapServers * @param topic * @param account * @param acl * @throws IOException * @throws ExecutionException * @throws InterruptedException */ public static void addTopicReadOrWriterAcl(String bootstrapServers, String topic, String account, AclOperation acl) throws IOException, ExecutionException, InterruptedException { //获取AdminClient连接 AdminClient adminClient = getAdminClient(bootstrapServers); //绑定主题 ResourcePattern resourcePattern = new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL); //绑定用户和权限 AccessControlEntry accessControlEntryRead = new AccessControlEntry("User:" + account, "*", acl, AclPermissionType.ALLOW); //绑定用户、主题、权限 AclBinding aclBindingRead = new AclBinding(resourcePattern, accessControlEntryRead); //赋予权限 Collection<AclBinding> aclBindingCollection = new ArrayList<>(); aclBindingCollection.add(aclBindingRead); //添加到集合 CreateAclsResult aclResult = adminClient.createAcls(aclBindingCollection); KafkaFuture<Void> result = aclResult.all(); result.get(); //执行 }
-
移除用户主题读/写权限
/** * 移除用户主题读/写权限 * * @param bootstrapServers * @param topic * @param account */ public static void deleteTopicReadOrWriterAcl(String bootstrapServers, String topic, String account) throws IOException, ExecutionException, InterruptedException { //获取AdminClient连接 AdminClient adminClient = getAdminClient(bootstrapServers); //绑定主题 ResourcePatternFilter resourcePatternFilter = new ResourcePatternFilter(ResourceType.TOPIC, topic, PatternType.LITERAL); //绑定用户、权限 AccessControlEntryFilter accessControlEntryFilter = new AccessControlEntryFilter("User:" + account, "*", AclOperation.WRITE/READ, AclPermissionType.ALLOW); //绑定用户、主题、权限 AclBindingFilter aclBinding = new AclBindingFilter(resourcePatternFilter, accessControlEntryFilter); //移除权限 Collection<AclBindingFilter> aclBindingCollection = new ArrayList<>(); aclBindingCollection.add(aclBinding); DeleteAclsResult aclResult = adminClient.deleteAcls(aclBindingCollection); KafkaFuture<Collection<AclBinding>> result = aclResult.all(); result.get(); }
-
批量移除主题读写权限
/** * 批量移除主题读写权限 * * @param bootstrapServers * @param kafkaAuthoritys */ public static void batchDeleteTopicReadOrWriterAcl(String bootstrapServers, List<KafkaAuthority> kafkaAuthoritys) throws IOException, ExecutionException, InterruptedException { //获取AdminClient连接 AdminClient adminClient = getAdminClient(bootstrapServers); adminClient.close(); //移除权限集合 Collection<AclBindingFilter> aclBindingCollection = new ArrayList<>(); for (KafkaAuthority kafkaAuthority : kafkaAuthoritys) { //绑定主题 ResourcePatternFilter resourcePatternFilter = new ResourcePatternFilter(ResourceType.TOPIC, kafkaAuthority.getTopic(), PatternType.LITERAL); //绑定用户、权限 AccessControlEntryFilter accessControlEntryFilter = new AccessControlEntryFilter("User:" + kafkaAuthority.getAccount(), "*", AclOperation.WRITE/READ(这里可存表,然后由kafkaAuthority动态获取), AclPermissionType.ALLOW); //绑定用户、主题、权限 AclBindingFilter aclBinding = new AclBindingFilter(resourcePatternFilter, accessControlEntryFilter); //加入集合 aclBindingCollection.add(aclBinding); }
-
查看所有用户的所有权限
/** * 账户权限 * * @throws ExecutionException * @throws InterruptedException */ public static void describeAllAcl() throws ExecutionException, InterruptedException { AdminClient adminClient = getAdminClient(); DescribeAclsResult result = adminClient.describeAcls(AclBindingFilter.ANY); try { Collection<AclBinding> gets = result.values().get(); for (AclBinding get : gets) { System.out.println(get.pattern().name()); System.out.println(get.pattern().patternType()); System.out.println(get.pattern().resourceType()); System.out.println(get.entry().principal()); System.out.println(get.entry().permissionType()); System.out.println(get.entry().operation()); System.out.println("-------------------------"); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }
更多推荐
所有评论(0)