1、withIdleness 介绍

There are two places in Flink applications where a WatermarkStrategy can be used: 1) directly on sources and 2) after non-source operation.

The first option is preferable, because it allows sources to exploit knowledge about shards/partitions/splits in the watermarking logic. Sources can usually then track watermarks at a finer level and the overall watermark produced by a source will be more accurate. Specifying a WatermarkStrategy directly on the source usually means you have to use a source specific interface/ Refer to Watermark Strategies and the Kafka Connector for how this works on a Kafka Connector and for more details about how per-partition watermarking works there.

The second option (setting a WatermarkStrategy after arbitrary operations) should only be used if you cannot set a strategy directly on the source:

如果其中一个输入分割/分区/碎片有一段时间不携带事件,这意味着水印生成器也不能获得任何新的信息来为水印做基础。我们称之为空闲输入或空闲源。这是一个问题,因为您的一些分区可能仍然带有事件。在这种情况下,水印将被保留,因为它被计算为所有不同的平行水印的最小值。

为了解决这个问题,你可以使用一个 WatermarkStrategy 来检测闲置状态并将输入标记为闲置。为此,WatermarkStrategy 提供了一个方便的帮助器:

WatermarkStrategy
.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withIdleness(Duration.ofMinutes(1));

以上为官网介绍,我们接下来就用我们的一些语言,来描述一下这个问题,就是当使用event time的时候,有很大的几率会发生数据倾斜,例如我们有一个kafka集群,其中,有一个topic,有三个分区,其中三个分区中,某个分区有很多数据写入,其他的分区 数据很少,那么这个时候,就不会触发计算,那么可以通过将不繁忙的分区标记为空闲,这样watermark就可以往下走了。

2、withIdleness运行原理

如果有的同学对watermark不是很了解,那么可以看看这篇文章:https://blog.csdn.net/weixin_43704599/article/details/117411252

首先我们现在编写测试类

 * @author :qingzhi.wu
 * @date2021/5/31 2:20 下午
 */
public class KafkaMock {
    private final KafkaProducer<String, String> producer;

    public final static String TOPIC = "test";

    private KafkaMock() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");//xxx服务器ip
        props.put("acks", "all");//所有follower都响应了才认为消息提交成功,即"committed"
        props.put("retries", 0);//retries = MAX 无限重试,直到你意识到出现了问题:)
        props.put("batch.size", 16384);//producer将试图批处理消息记录,以减少请求次数.默认的批量处理消息字节数
        //batch.size当批量的数据大小达到设定值后,就会立即发送,不顾下面的linger.ms
        props.put("linger.ms", 1);//延迟1ms发送,这项设置将通过增加小的延迟来完成--即,不是立即发送一条记录,producer将会等待给定的延迟时间以允许其他消息记录发送,这些消息记录可以批量处理
        props.put("buffer.memory", 33554432);//producer可以用来缓存数据的内存大小。
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");

        producer = new KafkaProducer<String, String>(props);
    }

    public void produce() {


        long timestamp = 7000;
        String value = "flink";
        String key = String.valueOf(value);
        String data = String.format("%s,%s", timestamp, value);

        try {
            producer.send(new ProducerRecord<String, String>(TOPIC, 0, key, data));
        } catch (Exception e) {
            e.printStackTrace();
        }
        producer.close();
    }


    public static void main(String[] args) {
        new KafkaMock().produce();
    }
}
/**
 * @author :qingzhi.wu
 * @date :2021/5/31 10:15 上午
 */
public class WatermarkTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//        WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
//                .withIdleness(Duration.ofMinutes(1));
        //DataStreamSource<String> localhost = env.socketTextStream("localhost", 9999);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
        properties.setProperty("group.id", "test");

        DataStreamSource<String> kafka = env.addSource(new FlinkKafkaConsumer<String>("test",new SimpleStringSchema(),properties));
        SingleOutputStreamOperator<String> source = kafka.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new TimestampAssignerSupplier<String>() {
            @Override
            public TimestampAssigner<String> createTimestampAssigner(Context context) {
                return new TimestampAssigner<String>() {
                    @Override
                    public long extractTimestamp(String element, long recordTimestamp) {
                        return Long.parseLong(element.split(",")[0]);
                    }
                };
            }
        });
        //.withIdleness(Duration.ofSeconds(120)))
        SingleOutputStreamOperator<Tuple2<String, Long>> map = source.map(new MapFunction<String, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(String value) throws Exception {
                return new Tuple2<String, Long>(value.split(",")[1], 1L);
            }
        });
        map.keyBy(x->x.f0).window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS))).process(new ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
            @Override
            public void process(String s, Context context, Iterable<Tuple2<String, Long>> iterable, Collector<String> collector) throws Exception {
                long sum = 0L;
                Iterator<Tuple2<String, Long>> iterator = iterable.iterator();
                while (iterator.hasNext()) {
                    Tuple2<String, Long> next = iterator.next();
                    System.out.println(next.f0);
                    sum += next.f1;
                }
                collector.collect(s+","+sum);
            }
        }).print();
        env.execute();
    }
}

2.1 正常情况下的窗口触发条件

我们可以看到我的代码中,其中mock数据的时候是用的kafka的java api,测试类呢,我们开的窗口是五秒,延迟时间是两秒。

接下来我们看看具体运行的过程

第一步我们往0号kafka分区打入一条1000,flink的数据

image-20210531171838476

我们可以看到窗口没有被触发,那么什么情况下窗口会触发呢,因为多并行下,有多个task和多个subtask这个时候flink有watermark对齐机制,我们这样等待 一辈子也不可能会运行的。

我们接下来再往0号分区打入一条7000,flink的数据。

image-20210531172225866

这个时候窗口没有被触发,因为watermark的对齐机制,我们需要每个分区都打入7000这个timestamp的数据,接下来

我们往1号分区 打入1000,hadoop、7000,hadoop

我们往2号分区 打入1000,spark、7000,spark

image-20210531172657269

我们可以看到这个时候窗口会触发计算,那么我们这个时候应该可以发现一个问题,就是当某个分区的触发机制达到的时候,其他的分区触发机制迟迟未触发的时候,我们的窗口不能被触发计算。这样的话,假如数据倾斜比较严重,某个分区数据量很大,或者说一直都有数据,其他的分区迟迟没有更新watermark,那么这个时候时候就会出现问题,窗口无法被触发计算,极端情况下,等一辈子都不会触发计算。那么这个时候问题就可以依托withIdleness来进行解决。

我们接下来看看使用withIdleness的时候

2.2 withIdleness窗口触发条件

/**
 * @author :qingzhi.wu
 * @date :2021/5/31 10:15 上午
 */
public class WatermarkTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//        WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
//                .withIdleness(Duration.ofMinutes(1));
        //DataStreamSource<String> localhost = env.socketTextStream("localhost", 9999);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
        properties.setProperty("group.id", "test");

        DataStreamSource<String> kafka = env.addSource(new FlinkKafkaConsumer<String>("test",new SimpleStringSchema(),properties));
        SingleOutputStreamOperator<String> source = kafka.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new TimestampAssignerSupplier<String>() {
            @Override
            public TimestampAssigner<String> createTimestampAssigner(Context context) {
                return new TimestampAssigner<String>() {
                    @Override
                    public long extractTimestamp(String element, long recordTimestamp) {
                        return Long.parseLong(element.split(",")[0]);
                    }
                };
            }
        }).withIdleness(Duration.ofSeconds(120)));
        //
        SingleOutputStreamOperator<Tuple2<String, Long>> map = source.map(new MapFunction<String, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(String value) throws Exception {
                return new Tuple2<String, Long>(value.split(",")[1], 1L);
            }
        });
        map.keyBy(x->x.f0).window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS))).process(new ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
            @Override
            public void process(String s, Context context, Iterable<Tuple2<String, Long>> iterable, Collector<String> collector) throws Exception {
                long sum = 0L;
                Iterator<Tuple2<String, Long>> iterator = iterable.iterator();
                while (iterator.hasNext()) {
                    Tuple2<String, Long> next = iterator.next();
                    System.out.println(next.f0);
                    sum += next.f1;
                }
                collector.collect(s+","+sum);
            }
        }).print();
        env.execute();
    }
}

第一步我们往0号分区插入1000,flink

image-20210531173424096

等待两分钟看看,窗口会触发吗

并不会触发,因为并没有达到窗口的触发条件,为什么要触发呢,如果触发的话 那就乱了套了,所以肯定是不会触发的。

那么我们接下来再往0号分区插入7000,flink,等待两分钟我们看看窗口会触发吗

我们可以清晰的看到窗口触发了,而且根本没有等待两分钟,而是立即触发,原因是什么呢,因为我在写文章的时候,时间早就过了两分钟了。这个时候我们可以初步的判定,flink把其他两个没有数据的分区标记为空闲,因为两分钟以上没有数据了。

接下来我们再看看怎么,假如我很快速的插入1000,flink 和 7000,flink 往0号分区呢

我们先重启一下应用

然后进行测试。

image-20210531173957037

我们发现并没有立即触发,那么我们这个时候等待2分钟看看。

image-20210531174139573

等了大概两分钟,我们发现窗口触发计算了。

那么这个时候我们再验证一个问题:假设我一个分区窗口触发执行,其他的窗口一直有数据,但是数据达不到窗口执行的条件,会怎么样呢。

我们先来看看数据插入逻辑。

1、0->1000,flink

2、0->7000,flink

3、1->1000,hadoop

4、2->1000,spark

5、1->1000,hadoop

6、2->1000,spark

就这样间隔5秒执行3、4、5、6插入数据的操作(大概操作两分钟多),我们看看两分钟后会执行窗口触发操作吗

先重启应用。

这个时候我们持续了大概3分钟,我们发现窗口没有触发计算,我们再等两分钟试试,看看会触发计算

image-20210531175343016

我们可以看到窗口触发计算了。因为我是往其他分区写入的都是flink所以,数量会很多。

那么这个时候我们已经探索出来withidleness的秘密了。其实就是当某个分区的窗口触发条件达到,并且其他的分区没有数据的情况下持续我们约定好的空闲时间,那么窗口会触发计算。如果一直有数据但是无法达到触发条件的话,窗口并不会触发计算

那么这个值一般设置为多少好呢,一般我认为根据业务场景来看吧,如果你的数据量小,并且倾斜比较严重,那么就设置的小一些,那么如果你的数据量比较大,并且很难出现一个分区或者多个分区 迟迟无数据的情况,那么可以设置的大一些。

一般我认为设置1-10分钟比较好。

怎么样你学会了吗?

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐