方式1:设置水位线延迟时间

水位线延迟设置,一般设置为毫秒到秒级别。

SingleOutputStreamOperator<Event> watermarks = streamOperator.assignTimestampsAndWatermarks(
                // 方案1:设置水位线延迟时间2s
                WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event event, long l) {
                                return event.timeStamp;
                            }
                        })
        );

方式2:允许窗口处理迟到数据

由于大部分乱序数据已经被水位线的延迟等到了,所以往往迟到的数据不会太多。这样,我们会在水位线到达窗口结束时间时,先快速地输出一个近似正确的计算结果; 然后保持窗口继续等到延迟数据,每来一条数据,窗口就会再次计算,并将更新后的结果输出。这样就可以逐步修正计算结果,最终得到准确的统计值了。

SingleOutputStreamOperator<String> result = watermarks.keyBy(data -> data.user)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                // 方案2:允许窗口处理迟到1分钟的数据
                .allowedLateness(Time.minutes(1))

方案3:迟到的数据放在侧输出流

即使我们有了前面的双重保证,可窗口不能一直等下去,最后总要真正关闭。窗口一旦关闭,后续的数据就都要被丢弃了。那如果真的还有漏网之鱼又该怎么办呢?
那就要用到最后一招了:用窗口的侧输出流来收集关窗以后的迟到数据。这种方式是最后“兜底”的方法,只能保证数据不丢失;因为窗口已经真正关闭,所以是无法基于之前窗口的结果直接做更新的。

package com.wanshun.bigdata.chapter06;

import com.wanshun.bigdata.chapter05.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.sql.Timestamp;
import java.time.Duration;

/**
 * Author:panghu
 * Date:2022-07-20
 * Description: 处理迟到的数据
 */
public class _08ProcessLateDataExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 从端口接收数据
        DataStreamSource<String> streamSource = env.socketTextStream("hadoop102", 7777);
        // 处理接受到的数据
        SingleOutputStreamOperator<Event> streamOperator = streamSource.map(new MapFunction<String, Event>() {
            @Override
            public Event map(String line) throws Exception {
                String[] split = line.split(",");
                return new Event(split[0], split[1], Long.parseLong(split[2]));
            }
        });
        streamOperator.print("来自端口的数据");

        // 提取时间戳,设置水位线
        SingleOutputStreamOperator<Event> watermarks = streamOperator.assignTimestampsAndWatermarks(
                // 方案1:设置水位线延迟时间2s
                WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event event, long l) {
                                return event.timeStamp;
                            }
                        })
        );

        // 定义侧输出流标签
        OutputTag<Event> outputTag = new OutputTag<Event>("late"){};

        SingleOutputStreamOperator<String> result = watermarks.keyBy(data -> data.user)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                // 方案2:允许窗口处理迟到1分钟的数据
                .allowedLateness(Time.minutes(1))
                // 方案3:将迟到的数据放在侧输出流
                .sideOutputLateData(outputTag)
                .aggregate(
                        new AggregateFunction<Event, Long, Long>() {
                            // 创建累加器
                            @Override
                            public Long createAccumulator() {
                                return 0L;
                            }

                            @Override
                            public Long add(Event event, Long acc) {
                                return acc + 1;
                            }

                            @Override
                            public Long getResult(Long acc) {
                                return acc;
                            }

                            @Override
                            public Long merge(Long acc, Long acc1) {
                                return null;
                            }
                        },
                        new ProcessWindowFunction<Long, String, String, TimeWindow>() {
                            @Override
                            public void process(String key, Context context, Iterable<Long> elements, Collector<String> out) throws Exception {
                                out.collect(
                                        key + " 的访问次数为:" + elements.iterator().next()
                                                + ", 窗口开始时间:" + new Timestamp(context.window().getStart())
                                                + ", 窗口结束时间:" + new Timestamp(context.window().getEnd())
                                                + ", 当前水位线时间:" + new Timestamp(context.currentWatermark())
                                );
                            }
                        }
                );

        result.print("result");

        // 获取侧输出流中的数据
        result.getSideOutput(outputTag).print("侧输出流中的数据");

        env.execute();
    }
}

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐