一.如何记录kafka的lag值

获取Lag的三种方法:
  • 使用 Kafka 自带的命令行工具 kafka-consumer-groups 脚本。

    (新旧两个版本之间是以0.90)
    【新版本的两种方式】:
    bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker连接信息> --describe --group <group名称>
    ./kafka-consumer-groups.sh --bootstrap-server 10.0.90.74:9000,10.0.90.74:9000,10.0.90.74:9000 --describe --group zyj-in
    
    ./kafka-run-class.sh kafka.admin.ConsumerGroupCommand --group zyj-in --bootstrap-server 10.0.90.74:9000,10.0.90.74:9000,10.0.90.74:9000 --describe --new-consumer
    【旧版本的方式】:
    ./kafka-run-class.sh kafka.tools.ConsumerOfferChecker --group zyj-in --topic zyj-in --zookeeper 10.0.90.74:2181,10.0.90.75:2181,10.0.90.76:2181
    
    查看所有的用户组
    ./kafka-consumer-groups.sh --bootstrap-server  10.0.90.74:9000,10.0.90.74:9000,10.0.90.74:9000 --list
    
  • 使用 Kafka Java Consumer API 编程(但是下面这段代码只使用kafka2.0.0版本)

    第 1 处是调用 AdminClient.listConsumerGroupOffsets 方法获取给定消费者组的最新消费消息的位移;
    
    第 2 处则是获取订阅分区的最新消息位移;
    
    第3 处就是执行相应的减法操作,获取 Lag 值并封装进一个 Map 对象。
    
    
    public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException {
            Properties props = new Properties();
            props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            try (AdminClient client = AdminClient.create(props)) {
                ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);
                try {
                    Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
                    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自动提交位移
                    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
                    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                    try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
                        Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet());
                        return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),
                                entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    // 处理中断异常
                    // ...
                    return Collections.emptyMap();
                } catch (ExecutionException e) {
                    // 处理ExecutionException
                    // ...
                    return Collections.emptyMap();
                } catch (TimeoutException e) {
                    throw new TimeoutException("Timed out when getting lag for consumer group " + groupID);
                }
            }
    
    
  • 使用 Kafka 自带的 JMX 监控指标。

Lag取值有正负数和0都表示什么意思

  • 正数:就是kafka数据积压了,往kafka进数据的速度,大于这个数据被消费的速度。a-b就是正数了。供大于求。
  • 负数:就是有时候,我刚刚取了a还没来得及做减法呢,b已经查、超过a了,导致结果是负数,说明kafka的消费者干活很快,分分钟就处理完消费的数据,供小于求。
  • 0:生产者和消费者速率基本相当,说明2者都工作正常。

Flink本身暴露的相关的指标是否能满足需求

  • register.consumer.metrics specifies whether to register metrics of KafkaConsumer in Flink metric group

    register.consumer.metrics 指定是否在 Flink metric group 中注册 KafkaConsumer 的 metrics
    

    能开启这个kafka的指标

  • The last successfully committed offsets to Kafka, for each partition. A particular partition’s metric can be specified by topic name and partition id。

    KafkaSourceReader.committedOffsets	
    
  • The consumer’s current read offset, for each partition. A particular partition’s metric can be specified by topic name and partition id.

    KafkaSourceReader.currentOffsets	
    

用以上的知识得到flink 连接kafka获取kafka的代码

  //自定义source源
    public static class CustomerKafkaConsumer<T> extends FlinkKafkaConsumer<Tuple2<Map<String, byte[]>, byte[]>> {
        private final MetaJsonKeyValueDeserializationSchema metaJsonKeyValueDeserializationSchema;

        public CustomerKafkaConsumer(List<String> topic, KafkaDeserializationSchema<Tuple2<Map<String, byte[]>, byte[]>> valueDeserializer, Properties props) {
            super(topic, valueDeserializer, props);
            this.metaJsonKeyValueDeserializationSchema = (MetaJsonKeyValueDeserializationSchema) valueDeserializer;
        }

        @Override
        protected AbstractFetcher<Tuple2<Map<String, byte[]>, byte[]>, ?> createFetcher(SourceContext<Tuple2<Map<String, byte[]>, byte[]>> sourceContext,
                                                                                        Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
                                                                                        SerializedValue<WatermarkStrategy<Tuple2<Map<String, byte[]>, byte[]>>> watermarkStrategy,
                                                                                        StreamingRuntimeContext runtimeContext,
                                                                                        OffsetCommitMode offsetCommitMode, MetricGroup consumerMetricGroup,
                                                                                        boolean useMetrics) throws Exception {
            AbstractFetcher<Tuple2<Map<String, byte[]>, byte[]>, ?> fetcher =
                    super.createFetcher(sourceContext, assignedPartitionsWithInitialOffsets, watermarkStrategy, runtimeContext, offsetCommitMode, consumerMetricGroup, useMetrics);
            metaJsonKeyValueDeserializationSchema.setFetcher(fetcher);
            return fetcher;
        }


        @Override
        public void run(SourceContext<Tuple2<Map<String, byte[]>, byte[]>> sourceContext) throws Exception {
            metaJsonKeyValueDeserializationSchema.setRuntimeContext(getRuntimeContext());
            super.run(sourceContext);
        }
    }
    
    //自定义Gauge
    public static class CustomerKafkaLag implements Gauge<Long> {
        private Map<TopicPartition, OffsetAndMetadata> consumedOffsets;
        private final Properties properties;
        private final Set<TopicPartition> assignedPartitions;
        private final Fetcher<Tuple2<Map<String, byte[]>, byte[]>, byte[]> fetcher;
        private final Time time;
        private final TopicPartition topicPartition;
        private final AdminClient adminClient;

        public CustomerKafkaLag(Map<TopicPartition, OffsetAndMetadata> consumedOffsets, Properties properties, Set<TopicPartition> assignedPartitions,
                                Fetcher<Tuple2<Map<String, byte[]>, byte[]>, byte[]> fetcher, Time time, TopicPartition topicPartition, AdminClient adminClient) {
            this.consumedOffsets = consumedOffsets;
            this.properties = properties;
            this.assignedPartitions = assignedPartitions;
            this.fetcher = fetcher;
            this.time = time;
            this.topicPartition = topicPartition;
            this.adminClient = adminClient;
        }

        @Override
        public Long getValue() {
            try {
                consumedOffsets = adminClient.listConsumerGroupOffsets(properties.getProperty(ConsumerConfig.GROUP_ID_CONFIG)).partitionsToOffsetAndMetadata().get(60, TimeUnit.SECONDS);
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                log.error("kafka get currentOffset failed = {}", e.getMessage());
            }
            Map<TopicPartition, Long> endOffsets = fetcher.endOffsets(assignedPartitions, time.timer(Duration.ofMillis(60000)));
            Map<TopicPartition, OffsetAndMetadata> finalConsumedOffsets = consumedOffsets;
            Map<TopicPartition, Long> collect = endOffsets.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue() - finalConsumedOffsets.get(entry.getKey()).offset()));
            return collect.get(topicPartition);
        }
    }
  
  
  //自定义序列化器:注册指标:
   public static class MetaJsonKeyValueDeserializationSchema implements KafkaDeserializationSchema<Tuple2<Map<String, byte[]>, byte[]>> {
        private AbstractFetcher<Tuple2<Map<String, byte[]>, byte[]>, ?> fetcher;
        private RuntimeContext runtimeContext;
        private Properties properties;

        public MetaJsonKeyValueDeserializationSchema(Properties properties) {
            this.properties = properties;
        }

        public void setFetcher(AbstractFetcher<Tuple2<Map<String, byte[]>, byte[]>, ?> fetcher) {
            this.fetcher = fetcher;
        }

        public void setRuntimeContext(RuntimeContext runtimeContext) {
            this.runtimeContext = runtimeContext;
        }

        @Override
        public Tuple2<Map<String, byte[]>, byte[]> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
            if (first.get()) {
                first.set(false);
                registerPtMetric(properties);
            }
            Map<String, byte[]> map = new HashMap<>();
            for (Header header : record.headers().toArray()) {
                String key = header.key();
                byte[] value = header.value();
                map.put(key, value);
            }
            return new Tuple2<>(map, record.value());
        }

        @Override
        public boolean isEndOfStream(Tuple2<Map<String, byte[]>, byte[]> nextElement) {
            return false;
        }

        @Override
        public TypeInformation<Tuple2<Map<String, byte[]>, byte[]>> getProducedType() {
            return TypeInformation.of(new TypeHint<Tuple2<Map<String, byte[]>, byte[]>>() {
            });
        }
		//注册指标
        protected void registerPtMetric(Properties properties) throws NoSuchFieldException, IllegalAccessException, ExecutionException, InterruptedException, TimeoutException {
            Field consumerThreadField = ((KafkaFetcher) fetcher).getClass().getDeclaredField("consumerThread");
            consumerThreadField.setAccessible(true);
            KafkaConsumerThread<Tuple2<Map<String, byte[]>, byte[]>> consumerThread = (KafkaConsumerThread) consumerThreadField.get(fetcher);
            Field hasAssignedPartitionsField = consumerThread.getClass().getDeclaredField("hasAssignedPartitions");
            hasAssignedPartitionsField.setAccessible(true);
            boolean hasAssignedPartitions = (boolean) hasAssignedPartitionsField.get(consumerThread);
            if (!hasAssignedPartitions) {
                throw new RuntimeException("wait 50 secs, but not assignedPartitions");
            }
            Field consumerField = consumerThread.getClass().getDeclaredField("consumer");
            consumerField.setAccessible(true);
            KafkaConsumer<Tuple2<Map<String, byte[]>, byte[]>, byte[]> kafkaConsumer = (KafkaConsumer) consumerField.get(consumerThread);
            Field subscriptionStateField = kafkaConsumer.getClass().getDeclaredField("subscriptions");
            subscriptionStateField.setAccessible(true);
            SubscriptionState subscriptionState = (SubscriptionState) subscriptionStateField.get(kafkaConsumer);
            Set<TopicPartition> assignedPartitions = subscriptionState.assignedPartitions();
            Field time = kafkaConsumer.getClass().getDeclaredField("time");
            time.setAccessible(true);
            Time time1 = (Time) time.get(kafkaConsumer);
            Field fetcher = kafkaConsumer.getClass().getDeclaredField("fetcher");
            fetcher.setAccessible(true);
            Fetcher<Tuple2<Map<String, byte[]>, byte[]>, byte[]> fetcher1 = (Fetcher) fetcher.get(kafkaConsumer);
            AdminClient adminClient = AdminClient.create(properties);
            Map<TopicPartition, OffsetAndMetadata> consumedOffsets = new HashMap<>();
            for (TopicPartition topicPartition : assignedPartitions) {
                runtimeContext.getMetricGroup().addGroup(DT_TOPIC_GROUP, topicPartition.topic())
                        .addGroup(DT_PARTITION_GROUP, topicPartition.partition() + "")
                        .gauge(DT_TOPIC_PARTITION_LAG_GAUGE, new CustomerKafkaLag(consumedOffsets, properties, assignedPartitions,
                                fetcher1, time1, topicPartition, adminClient));
            }
        }
    }

参考文献:https://blog.csdn.net/daijiguo/article/details/107868359

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐