如何获取Kafka中的全部主题

想要获取Kafka的全部主题,首先要连接Kafka,连接Kafka有多种方式,目前最常用的就是采用AdminClient的客户端对象连接Kafka集群。

在通过AdminClient连接到Kafka集群后,得到了AdminClient对象。

再根据AdminClient对象获取listTopics,得到主题列表,通过get到具体的name列表,打印出来的就是Kafka集群中的全部主题名。

配置文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.lmz</groupId>
    <artifactId>springBootKafkaTest</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <artifactId>spring-boot-starter-parent</artifactId>
        <groupId>org.springframework.boot</groupId>
        <version>2.2.5.RELEASE</version>
        <relativePath/>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
        </dependency>
    </dependencies>

</project>

注意springboot和Kafka的版本

代码展示:

import java.util.*;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.common.KafkaFuture;

/**
 * @author zeyue
 */
public class KafkaTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        toAllTopic(adminClient());
    }

    /**
     * 获取集群中的全部主题
     *
     * @param adminClient
     */
    public static void toAllTopic(AdminClient adminClient) {
        //根据adminClient获取主题列表
        ListTopicsResult listTopicsResult = adminClient.listTopics();
        KafkaFuture<Set<String>> names1 = listTopicsResult.names();
        Set<String> names = null;
        try {
            //listTopicsResult.names()是KafkaFuture<Set<String>>类型的,调用get方法后得到set类型的值
            names = listTopicsResult.names().get();
            //打印得到的topic列表
            names.forEach(c -> System.out.println("name:" + c));
            
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

    }

    /**
     * 配置连接Kafka,创建AdminClient客户端对象
     * @return 返回配置连接好Kafka的客户端对象
     */
    public static AdminClient adminClient(){
        //配置文件操作类,我们除了编写.properties后缀的文件外,可以采用Properties类配置,省去了.properties文件
        Properties properties = new Properties();
        //put采用key,value的形式("bootstrap.servers"为Kafka配置集群地址的参数;"集群地址"为具体的集群地址,可以为自己配置的虚拟机中的Kafka集群地址)
        properties.put("bootstrap.servers","集群地址");

        return AdminClient.create(properties);
    }
}

注:AdminClient 类是在 org.apache.kafka.clients.admin.AdminClient 包中的

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐