Flink新特性withIdleness一文全解析
1、withIdleness 介绍There are two places in Flink applications where a WatermarkStrategy can be used: 1) directly on sources and 2) after non-source operation.The first option is preferable, because it a
1、withIdleness 介绍
There are two places in Flink applications where a
WatermarkStrategy
can be used: 1) directly on sources and 2) after non-source operation.The first option is preferable, because it allows sources to exploit knowledge about shards/partitions/splits in the watermarking logic. Sources can usually then track watermarks at a finer level and the overall watermark produced by a source will be more accurate. Specifying a
WatermarkStrategy
directly on the source usually means you have to use a source specific interface/ Refer to Watermark Strategies and the Kafka Connector for how this works on a Kafka Connector and for more details about how per-partition watermarking works there.The second option (setting a
WatermarkStrategy
after arbitrary operations) should only be used if you cannot set a strategy directly on the source:如果其中一个输入分割/分区/碎片有一段时间不携带事件,这意味着水印生成器也不能获得任何新的信息来为水印做基础。我们称之为空闲输入或空闲源。这是一个问题,因为您的一些分区可能仍然带有事件。在这种情况下,水印将被保留,因为它被计算为所有不同的平行水印的最小值。
为了解决这个问题,你可以使用一个 WatermarkStrategy 来检测闲置状态并将输入标记为闲置。为此,WatermarkStrategy 提供了一个方便的帮助器:
WatermarkStrategy
.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withIdleness(Duration.ofMinutes(1));
以上为官网介绍,我们接下来就用我们的一些语言,来描述一下这个问题,就是当使用event time的时候,有很大的几率会发生数据倾斜,例如我们有一个kafka集群,其中,有一个topic,有三个分区,其中三个分区中,某个分区有很多数据写入,其他的分区 数据很少,那么这个时候,就不会触发计算,那么可以通过将不繁忙的分区标记为空闲,这样watermark就可以往下走了。
2、withIdleness运行原理
如果有的同学对watermark不是很了解,那么可以看看这篇文章:https://blog.csdn.net/weixin_43704599/article/details/117411252
首先我们现在编写测试类
* @author :qingzhi.wu
* @date :2021/5/31 2:20 下午
*/
public class KafkaMock {
private final KafkaProducer<String, String> producer;
public final static String TOPIC = "test";
private KafkaMock() {
Properties props = new Properties();
props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");//xxx服务器ip
props.put("acks", "all");//所有follower都响应了才认为消息提交成功,即"committed"
props.put("retries", 0);//retries = MAX 无限重试,直到你意识到出现了问题:)
props.put("batch.size", 16384);//producer将试图批处理消息记录,以减少请求次数.默认的批量处理消息字节数
//batch.size当批量的数据大小达到设定值后,就会立即发送,不顾下面的linger.ms
props.put("linger.ms", 1);//延迟1ms发送,这项设置将通过增加小的延迟来完成--即,不是立即发送一条记录,producer将会等待给定的延迟时间以允许其他消息记录发送,这些消息记录可以批量处理
props.put("buffer.memory", 33554432);//producer可以用来缓存数据的内存大小。
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(props);
}
public void produce() {
long timestamp = 7000;
String value = "flink";
String key = String.valueOf(value);
String data = String.format("%s,%s", timestamp, value);
try {
producer.send(new ProducerRecord<String, String>(TOPIC, 0, key, data));
} catch (Exception e) {
e.printStackTrace();
}
producer.close();
}
public static void main(String[] args) {
new KafkaMock().produce();
}
}
/**
* @author :qingzhi.wu
* @date :2021/5/31 10:15 上午
*/
public class WatermarkTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
// .withIdleness(Duration.ofMinutes(1));
//DataStreamSource<String> localhost = env.socketTextStream("localhost", 9999);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
properties.setProperty("group.id", "test");
DataStreamSource<String> kafka = env.addSource(new FlinkKafkaConsumer<String>("test",new SimpleStringSchema(),properties));
SingleOutputStreamOperator<String> source = kafka.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new TimestampAssignerSupplier<String>() {
@Override
public TimestampAssigner<String> createTimestampAssigner(Context context) {
return new TimestampAssigner<String>() {
@Override
public long extractTimestamp(String element, long recordTimestamp) {
return Long.parseLong(element.split(",")[0]);
}
};
}
});
//.withIdleness(Duration.ofSeconds(120)))
SingleOutputStreamOperator<Tuple2<String, Long>> map = source.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
return new Tuple2<String, Long>(value.split(",")[1], 1L);
}
});
map.keyBy(x->x.f0).window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS))).process(new ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
@Override
public void process(String s, Context context, Iterable<Tuple2<String, Long>> iterable, Collector<String> collector) throws Exception {
long sum = 0L;
Iterator<Tuple2<String, Long>> iterator = iterable.iterator();
while (iterator.hasNext()) {
Tuple2<String, Long> next = iterator.next();
System.out.println(next.f0);
sum += next.f1;
}
collector.collect(s+","+sum);
}
}).print();
env.execute();
}
}
2.1 正常情况下的窗口触发条件
我们可以看到我的代码中,其中mock数据的时候是用的kafka的java api,测试类呢,我们开的窗口是五秒,延迟时间是两秒。
接下来我们看看具体运行的过程
第一步我们往0号kafka分区打入一条1000,flink的数据
我们可以看到窗口没有被触发,那么什么情况下窗口会触发呢,因为多并行下,有多个task和多个subtask这个时候flink有watermark对齐机制,我们这样等待 一辈子也不可能会运行的。
我们接下来再往0号分区打入一条7000,flink的数据。
这个时候窗口没有被触发,因为watermark的对齐机制,我们需要每个分区都打入7000这个timestamp的数据,接下来
我们往1号分区 打入1000,hadoop、7000,hadoop
我们往2号分区 打入1000,spark、7000,spark
我们可以看到这个时候窗口会触发计算,那么我们这个时候应该可以发现一个问题,就是当某个分区的触发机制达到的时候,其他的分区触发机制迟迟未触发的时候,我们的窗口不能被触发计算。这样的话,假如数据倾斜比较严重,某个分区数据量很大,或者说一直都有数据,其他的分区迟迟没有更新watermark,那么这个时候时候就会出现问题,窗口无法被触发计算,极端情况下,等一辈子都不会触发计算。那么这个时候问题就可以依托withIdleness来进行解决。
我们接下来看看使用withIdleness的时候
2.2 withIdleness窗口触发条件
/**
* @author :qingzhi.wu
* @date :2021/5/31 10:15 上午
*/
public class WatermarkTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
// .withIdleness(Duration.ofMinutes(1));
//DataStreamSource<String> localhost = env.socketTextStream("localhost", 9999);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
properties.setProperty("group.id", "test");
DataStreamSource<String> kafka = env.addSource(new FlinkKafkaConsumer<String>("test",new SimpleStringSchema(),properties));
SingleOutputStreamOperator<String> source = kafka.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new TimestampAssignerSupplier<String>() {
@Override
public TimestampAssigner<String> createTimestampAssigner(Context context) {
return new TimestampAssigner<String>() {
@Override
public long extractTimestamp(String element, long recordTimestamp) {
return Long.parseLong(element.split(",")[0]);
}
};
}
}).withIdleness(Duration.ofSeconds(120)));
//
SingleOutputStreamOperator<Tuple2<String, Long>> map = source.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
return new Tuple2<String, Long>(value.split(",")[1], 1L);
}
});
map.keyBy(x->x.f0).window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS))).process(new ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
@Override
public void process(String s, Context context, Iterable<Tuple2<String, Long>> iterable, Collector<String> collector) throws Exception {
long sum = 0L;
Iterator<Tuple2<String, Long>> iterator = iterable.iterator();
while (iterator.hasNext()) {
Tuple2<String, Long> next = iterator.next();
System.out.println(next.f0);
sum += next.f1;
}
collector.collect(s+","+sum);
}
}).print();
env.execute();
}
}
第一步我们往0号分区插入1000,flink
等待两分钟看看,窗口会触发吗
并不会触发,因为并没有达到窗口的触发条件,为什么要触发呢,如果触发的话 那就乱了套了,所以肯定是不会触发的。
那么我们接下来再往0号分区插入7000,flink,等待两分钟我们看看窗口会触发吗
我们可以清晰的看到窗口触发了,而且根本没有等待两分钟,而是立即触发,原因是什么呢,因为我在写文章的时候,时间早就过了两分钟了。这个时候我们可以初步的判定,flink把其他两个没有数据的分区标记为空闲,因为两分钟以上没有数据了。
接下来我们再看看怎么,假如我很快速的插入1000,flink 和 7000,flink 往0号分区呢
我们先重启一下应用
然后进行测试。
我们发现并没有立即触发,那么我们这个时候等待2分钟看看。
等了大概两分钟,我们发现窗口触发计算了。
那么这个时候我们再验证一个问题:假设我一个分区窗口触发执行,其他的窗口一直有数据,但是数据达不到窗口执行的条件,会怎么样呢。
我们先来看看数据插入逻辑。
1、0->1000,flink
2、0->7000,flink
3、1->1000,hadoop
4、2->1000,spark
5、1->1000,hadoop
6、2->1000,spark
…
就这样间隔5秒执行3、4、5、6插入数据的操作(大概操作两分钟多),我们看看两分钟后会执行窗口触发操作吗
先重启应用。
这个时候我们持续了大概3分钟,我们发现窗口没有触发计算,我们再等两分钟试试,看看会触发计算吗
我们可以看到窗口触发计算了。因为我是往其他分区写入的都是flink所以,数量会很多。
那么这个时候我们已经探索出来withidleness的秘密了。其实就是当某个分区的窗口触发条件达到,并且其他的分区没有数据的情况下持续我们约定好的空闲时间,那么窗口会触发计算。如果一直有数据但是无法达到触发条件的话,窗口并不会触发计算。
那么这个值一般设置为多少好呢,一般我认为根据业务场景来看吧,如果你的数据量小,并且倾斜比较严重,那么就设置的小一些,那么如果你的数据量比较大,并且很难出现一个分区或者多个分区 迟迟无数据的情况,那么可以设置的大一些。
一般我认为设置1-10分钟比较好。
怎么样你学会了吗?
更多推荐
所有评论(0)