<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.11.0.0</version>
        </dependency>
@Service
@Slf4j
public class KafkaListenerServiceImpl implements KafkaListenerService {
    @Value("${spring.kafka.consumer.bootstrap-servers}")
    private String servers;

    /**
     * 获取服务器上所有的groupId
     *
     * @param servers
     * @return
     */
    @Override
    public Set<String> getAllGroupId(String servers) {
        String[] serversArgs = servers.split(",");
        Set<String> groupIds = new HashSet<>();

        for (String server : serversArgs) {
            AdminClient client = AdminClient.createSimplePlaintext(server);
            try {
                // 1.查询到所有的 groupId对象
                List<GroupOverview> allGroups = JavaConversions.seqAsJavaList(client.listAllGroupsFlattened().toSeq());
                allGroups.forEach(group -> {
                    groupIds.add(group.groupId());
                });
            } finally {
                client.close();
            }
        }
        return groupIds;
    }

    /**
     * 获取具体消费情况
     *
     * @param groupId
     * @return
     */
    @Override
    public List<KafkaConsumerDTO> getConsumerDetails(String groupId) {
        List<KafkaConsumerDTO> kafkaConsumerDtoS = Lists.newArrayList();

        String[] serversArgs = servers.split(",");
        for (String server : serversArgs) {
            // kafka 客户端
            AdminClient client = AdminClient.createSimplePlaintext(server);
            // 节点
            String broker = server.substring(0, server.indexOf(":"));
            // 端口
            int port = Integer.parseInt(server.substring(server.indexOf(":") + 1));
            try {
                // 获取当前groupId消费情况
                Map<TopicPartition, Object> offsets = JavaConversions.mapAsJavaMap(client.listGroupOffsets(groupId));

                for (TopicPartition topicPartition : offsets.keySet()) {
                    String topic = topicPartition.topic();
                    int partition = topicPartition.partition();
                    Object currentOffset = offsets.get(topicPartition);

                    // 数据组装
                    KafkaConsumerDTO kafkaConsumerDTO = new KafkaConsumerDTO();
                    kafkaConsumerDTO.setGroupId(groupId);
                    kafkaConsumerDTO.setTopic(topic);
                    kafkaConsumerDTO.setPartition(partition);
                    kafkaConsumerDTO.setCurrentOffset((Long) currentOffset);

                    // 获取当前groupId消息总量
                    String clientName = "Client_" + topic + "_" + partition;
                    SimpleConsumer consumer = new SimpleConsumer(broker, port, 100000, 64 * 1024, clientName);
                    long logEndOffset = getLastOffset(consumer, topic, partition, OffsetRequest.LatestTime(), clientName);

                    // 节点不是该分区的leader获取不到最后提交的偏移量logEndOffset
                    if (logEndOffset != 0){
                        kafkaConsumerDTO.setLogEndOffset(logEndOffset);
                        kafkaConsumerDTO.setLag(logEndOffset - kafkaConsumerDTO.getCurrentOffset());
                        kafkaConsumerDTO.setHost(broker);
                        kafkaConsumerDtoS.add(kafkaConsumerDTO);
                    }
                }
            } finally {
                client.close();
            }
        }
        return kafkaConsumerDtoS;
    }


    /**
     * 获取该消费者组每个分区最后提交的偏移量
     *
     * @param consumer   消费者组对象
     * @param topic      主题
     * @param partition  分区
     * @param whichTime  最晚时间
     * @param clientName 客户端名称
     * @return 偏移量
     */
    private static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);
        if (response.hasError()) {
//            log.warn("错误代码:" + response.errorCode(topic, partition));
            return 0;
        }
        long[] offsets = response.offsets(topic, partition);
        return offsets[0];
    }
}
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐