如何获取Kafka中的全部主题
想要获取Kafka的全部主题,首先要连接Kafka,连接Kafka有多种方式,目前最常用的就是采用AdminClient的客户端对象连接Kafka集群。再根据AdminClient对象获取listTopics,得到主题列表,通过get到具体的name列表,打印出来的就是Kafka集群中的全部主题名。在通过AdminClient连接到Kafka集群后,得到了AdminClient对象。......
·
如何获取Kafka中的全部主题
想要获取Kafka的全部主题,首先要连接Kafka,连接Kafka有多种方式,目前最常用的就是采用AdminClient的客户端对象连接Kafka集群。
在通过AdminClient连接到Kafka集群后,得到了AdminClient对象。
再根据AdminClient对象获取listTopics,得到主题列表,通过get到具体的name列表,打印出来的就是Kafka集群中的全部主题名。
配置文件:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.lmz</groupId>
<artifactId>springBootKafkaTest</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<artifactId>spring-boot-starter-parent</artifactId>
<groupId>org.springframework.boot</groupId>
<version>2.2.5.RELEASE</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
</dependency>
</dependencies>
</project>
注意springboot和Kafka的版本
代码展示:
import java.util.*;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.common.KafkaFuture;
/**
* @author zeyue
*/
public class KafkaTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
toAllTopic(adminClient());
}
/**
* 获取集群中的全部主题
*
* @param adminClient
*/
public static void toAllTopic(AdminClient adminClient) {
//根据adminClient获取主题列表
ListTopicsResult listTopicsResult = adminClient.listTopics();
KafkaFuture<Set<String>> names1 = listTopicsResult.names();
Set<String> names = null;
try {
//listTopicsResult.names()是KafkaFuture<Set<String>>类型的,调用get方法后得到set类型的值
names = listTopicsResult.names().get();
//打印得到的topic列表
names.forEach(c -> System.out.println("name:" + c));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
/**
* 配置连接Kafka,创建AdminClient客户端对象
* @return 返回配置连接好Kafka的客户端对象
*/
public static AdminClient adminClient(){
//配置文件操作类,我们除了编写.properties后缀的文件外,可以采用Properties类配置,省去了.properties文件
Properties properties = new Properties();
//put采用key,value的形式("bootstrap.servers"为Kafka配置集群地址的参数;"集群地址"为具体的集群地址,可以为自己配置的虚拟机中的Kafka集群地址)
properties.put("bootstrap.servers","集群地址");
return AdminClient.create(properties);
}
}
注:AdminClient 类是在 org.apache.kafka.clients.admin.AdminClient 包中的
更多推荐
已为社区贡献1条内容
所有评论(0)