kafka lag
kafka flink lag
·
一.如何记录kafka的lag值
获取Lag的三种方法:
-
使用 Kafka 自带的命令行工具 kafka-consumer-groups 脚本。
(新旧两个版本之间是以0.90) 【新版本的两种方式】: bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker连接信息> --describe --group <group名称> ./kafka-consumer-groups.sh --bootstrap-server 10.0.90.74:9000,10.0.90.74:9000,10.0.90.74:9000 --describe --group zyj-in ./kafka-run-class.sh kafka.admin.ConsumerGroupCommand --group zyj-in --bootstrap-server 10.0.90.74:9000,10.0.90.74:9000,10.0.90.74:9000 --describe --new-consumer 【旧版本的方式】: ./kafka-run-class.sh kafka.tools.ConsumerOfferChecker --group zyj-in --topic zyj-in --zookeeper 10.0.90.74:2181,10.0.90.75:2181,10.0.90.76:2181 查看所有的用户组 ./kafka-consumer-groups.sh --bootstrap-server 10.0.90.74:9000,10.0.90.74:9000,10.0.90.74:9000 --list
-
使用 Kafka Java Consumer API 编程(但是下面这段代码只使用kafka2.0.0版本)
第 1 处是调用 AdminClient.listConsumerGroupOffsets 方法获取给定消费者组的最新消费消息的位移; 第 2 处则是获取订阅分区的最新消息位移; 第3 处就是执行相应的减法操作,获取 Lag 值并封装进一个 Map 对象。
public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException { Properties props = new Properties(); props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); try (AdminClient client = AdminClient.create(props)) { ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID); try { Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自动提交位移 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet()); return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset())); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); // 处理中断异常 // ... return Collections.emptyMap(); } catch (ExecutionException e) { // 处理ExecutionException // ... return Collections.emptyMap(); } catch (TimeoutException e) { throw new TimeoutException("Timed out when getting lag for consumer group " + groupID); } }
-
使用 Kafka 自带的 JMX 监控指标。
Lag取值有正负数和0都表示什么意思
- 正数:就是kafka数据积压了,往kafka进数据的速度,大于这个数据被消费的速度。a-b就是正数了。供大于求。
- 负数:就是有时候,我刚刚取了a还没来得及做减法呢,b已经查、超过a了,导致结果是负数,说明kafka的消费者干活很快,分分钟就处理完消费的数据,供小于求。
- 0:生产者和消费者速率基本相当,说明2者都工作正常。
Flink本身暴露的相关的指标是否能满足需求
-
register.consumer.metrics specifies whether to register metrics of KafkaConsumer in Flink metric group
register.consumer.metrics 指定是否在 Flink metric group 中注册 KafkaConsumer 的 metrics
能开启这个kafka的指标
-
The last successfully committed offsets to Kafka, for each partition. A particular partition’s metric can be specified by topic name and partition id。
KafkaSourceReader.committedOffsets
-
The consumer’s current read offset, for each partition. A particular partition’s metric can be specified by topic name and partition id.
KafkaSourceReader.currentOffsets
用以上的知识得到flink 连接kafka获取kafka的代码
//自定义source源
public static class CustomerKafkaConsumer<T> extends FlinkKafkaConsumer<Tuple2<Map<String, byte[]>, byte[]>> {
private final MetaJsonKeyValueDeserializationSchema metaJsonKeyValueDeserializationSchema;
public CustomerKafkaConsumer(List<String> topic, KafkaDeserializationSchema<Tuple2<Map<String, byte[]>, byte[]>> valueDeserializer, Properties props) {
super(topic, valueDeserializer, props);
this.metaJsonKeyValueDeserializationSchema = (MetaJsonKeyValueDeserializationSchema) valueDeserializer;
}
@Override
protected AbstractFetcher<Tuple2<Map<String, byte[]>, byte[]>, ?> createFetcher(SourceContext<Tuple2<Map<String, byte[]>, byte[]>> sourceContext,
Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
SerializedValue<WatermarkStrategy<Tuple2<Map<String, byte[]>, byte[]>>> watermarkStrategy,
StreamingRuntimeContext runtimeContext,
OffsetCommitMode offsetCommitMode, MetricGroup consumerMetricGroup,
boolean useMetrics) throws Exception {
AbstractFetcher<Tuple2<Map<String, byte[]>, byte[]>, ?> fetcher =
super.createFetcher(sourceContext, assignedPartitionsWithInitialOffsets, watermarkStrategy, runtimeContext, offsetCommitMode, consumerMetricGroup, useMetrics);
metaJsonKeyValueDeserializationSchema.setFetcher(fetcher);
return fetcher;
}
@Override
public void run(SourceContext<Tuple2<Map<String, byte[]>, byte[]>> sourceContext) throws Exception {
metaJsonKeyValueDeserializationSchema.setRuntimeContext(getRuntimeContext());
super.run(sourceContext);
}
}
//自定义Gauge
public static class CustomerKafkaLag implements Gauge<Long> {
private Map<TopicPartition, OffsetAndMetadata> consumedOffsets;
private final Properties properties;
private final Set<TopicPartition> assignedPartitions;
private final Fetcher<Tuple2<Map<String, byte[]>, byte[]>, byte[]> fetcher;
private final Time time;
private final TopicPartition topicPartition;
private final AdminClient adminClient;
public CustomerKafkaLag(Map<TopicPartition, OffsetAndMetadata> consumedOffsets, Properties properties, Set<TopicPartition> assignedPartitions,
Fetcher<Tuple2<Map<String, byte[]>, byte[]>, byte[]> fetcher, Time time, TopicPartition topicPartition, AdminClient adminClient) {
this.consumedOffsets = consumedOffsets;
this.properties = properties;
this.assignedPartitions = assignedPartitions;
this.fetcher = fetcher;
this.time = time;
this.topicPartition = topicPartition;
this.adminClient = adminClient;
}
@Override
public Long getValue() {
try {
consumedOffsets = adminClient.listConsumerGroupOffsets(properties.getProperty(ConsumerConfig.GROUP_ID_CONFIG)).partitionsToOffsetAndMetadata().get(60, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("kafka get currentOffset failed = {}", e.getMessage());
}
Map<TopicPartition, Long> endOffsets = fetcher.endOffsets(assignedPartitions, time.timer(Duration.ofMillis(60000)));
Map<TopicPartition, OffsetAndMetadata> finalConsumedOffsets = consumedOffsets;
Map<TopicPartition, Long> collect = endOffsets.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue() - finalConsumedOffsets.get(entry.getKey()).offset()));
return collect.get(topicPartition);
}
}
//自定义序列化器:注册指标:
public static class MetaJsonKeyValueDeserializationSchema implements KafkaDeserializationSchema<Tuple2<Map<String, byte[]>, byte[]>> {
private AbstractFetcher<Tuple2<Map<String, byte[]>, byte[]>, ?> fetcher;
private RuntimeContext runtimeContext;
private Properties properties;
public MetaJsonKeyValueDeserializationSchema(Properties properties) {
this.properties = properties;
}
public void setFetcher(AbstractFetcher<Tuple2<Map<String, byte[]>, byte[]>, ?> fetcher) {
this.fetcher = fetcher;
}
public void setRuntimeContext(RuntimeContext runtimeContext) {
this.runtimeContext = runtimeContext;
}
@Override
public Tuple2<Map<String, byte[]>, byte[]> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
if (first.get()) {
first.set(false);
registerPtMetric(properties);
}
Map<String, byte[]> map = new HashMap<>();
for (Header header : record.headers().toArray()) {
String key = header.key();
byte[] value = header.value();
map.put(key, value);
}
return new Tuple2<>(map, record.value());
}
@Override
public boolean isEndOfStream(Tuple2<Map<String, byte[]>, byte[]> nextElement) {
return false;
}
@Override
public TypeInformation<Tuple2<Map<String, byte[]>, byte[]>> getProducedType() {
return TypeInformation.of(new TypeHint<Tuple2<Map<String, byte[]>, byte[]>>() {
});
}
//注册指标
protected void registerPtMetric(Properties properties) throws NoSuchFieldException, IllegalAccessException, ExecutionException, InterruptedException, TimeoutException {
Field consumerThreadField = ((KafkaFetcher) fetcher).getClass().getDeclaredField("consumerThread");
consumerThreadField.setAccessible(true);
KafkaConsumerThread<Tuple2<Map<String, byte[]>, byte[]>> consumerThread = (KafkaConsumerThread) consumerThreadField.get(fetcher);
Field hasAssignedPartitionsField = consumerThread.getClass().getDeclaredField("hasAssignedPartitions");
hasAssignedPartitionsField.setAccessible(true);
boolean hasAssignedPartitions = (boolean) hasAssignedPartitionsField.get(consumerThread);
if (!hasAssignedPartitions) {
throw new RuntimeException("wait 50 secs, but not assignedPartitions");
}
Field consumerField = consumerThread.getClass().getDeclaredField("consumer");
consumerField.setAccessible(true);
KafkaConsumer<Tuple2<Map<String, byte[]>, byte[]>, byte[]> kafkaConsumer = (KafkaConsumer) consumerField.get(consumerThread);
Field subscriptionStateField = kafkaConsumer.getClass().getDeclaredField("subscriptions");
subscriptionStateField.setAccessible(true);
SubscriptionState subscriptionState = (SubscriptionState) subscriptionStateField.get(kafkaConsumer);
Set<TopicPartition> assignedPartitions = subscriptionState.assignedPartitions();
Field time = kafkaConsumer.getClass().getDeclaredField("time");
time.setAccessible(true);
Time time1 = (Time) time.get(kafkaConsumer);
Field fetcher = kafkaConsumer.getClass().getDeclaredField("fetcher");
fetcher.setAccessible(true);
Fetcher<Tuple2<Map<String, byte[]>, byte[]>, byte[]> fetcher1 = (Fetcher) fetcher.get(kafkaConsumer);
AdminClient adminClient = AdminClient.create(properties);
Map<TopicPartition, OffsetAndMetadata> consumedOffsets = new HashMap<>();
for (TopicPartition topicPartition : assignedPartitions) {
runtimeContext.getMetricGroup().addGroup(DT_TOPIC_GROUP, topicPartition.topic())
.addGroup(DT_PARTITION_GROUP, topicPartition.partition() + "")
.gauge(DT_TOPIC_PARTITION_LAG_GAUGE, new CustomerKafkaLag(consumedOffsets, properties, assignedPartitions,
fetcher1, time1, topicPartition, adminClient));
}
}
}
参考文献:https://blog.csdn.net/daijiguo/article/details/107868359
更多推荐
已为社区贡献1条内容
所有评论(0)