Flink程序中 Timer实现定时操作

有时候,我们在计算任务中需要使用到定时器来帮助我们处理业务,例如 订单的自动结算?自动好评? 定时收集?等等…

但需要注意的 我们无法为计算任务灵活的配置CRON表达式,仅仅只能指定触发的时刻。

一、什么样的Flink作业可以开启开启定时器

需要开启定时作业的JOB,必须是由KeyedProcessFunction低阶函数进行数据处理,而非Window

image-20210815173240341

我们可以在processElement方法中 执行我们的数据处理逻辑以及开启定时器。

OnTimer方法则是定时器触发时执行的具体的方法,

二、定时器功能展示

结果展示

下单时间与自动好评时间

image-20210815174112399

对应订单好评时间到,检查是否评价与开启自动好评

image-20210815174403308

三、逻辑实现

(1)自定义数据源

我这里是写死的,各位可根据自己的需求来实现RichSourceFunction这个接口即可

package com.leilei;

import cn.hutool.core.util.RandomUtil;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author lei
 * @version 1.0
 * @date 2021/3/21 22:00
 * @desc 模拟订单来源
 */
public class MyOrderSource extends RichSourceFunction<Order> {
    private Boolean flag = true;
    private final String[] products = new String[]{"黄焖鸡米饭", "北京烤鸭", "桥头排骨"};
    private final String[] users = new String[]{"马邦德", "黄四郎", "张麻子"};
    AtomicInteger num;

    @Override
    public void run(SourceContext<Order> ctx) throws Exception {
        while (flag) {
            Order order = Order.builder()
                    .product(products[RandomUtil.randomInt(3)])
                    .username(users[RandomUtil.randomInt(3)])
                    .orderId(UUID.randomUUID().toString().replace("-", "."))
                    .orderTime(System.currentTimeMillis())
                    .build();
            Thread.sleep(5000);
            // 注释代码是模拟同一个key 仅存在一个定时器,执行时间后覆盖前
            //if (num.get()<4) {
            Thread.sleep(5000);
            }else {
                Thread.sleep(500000);
            }
            num.incrementAndGet();
            ctx.collect(order);
        }
    }

    @Override
    public void cancel() {
        flag = false;
    }
}
package com.leilei;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@NoArgsConstructor
@AllArgsConstructor
@Data
@Builder
/**
 * @author lei
 * @version 1.0
 * @date 2021/3/21 22:00
 * @desc 订单模拟对象
 */
public  class Order {
    private String product;
    private String username;
    private String orderId;
    private Long orderTime;
}

(2)自定义订单KEY生成器

package com.leilei;

import java.text.SimpleDateFormat;

/**
 * @author lei
 * @version 1.0
 * @desc
 * @date 2021-03-23 11:16
 */
public class KeyUtil {
    public static String buildKey(Order order) {
        String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(order.getOrderTime());
        return order.getUsername()+"--" + order.getProduct()+"--" + order.getOrderId() + "--下单时间:" + date;
        // 注释代码是模拟同一个key 仅存在一个定时器,执行时间后覆盖前
        //return order.getUsername();
    }
}

(3)ProcessFunction处理函数 逻辑处理与定时器功能

(1)定义低阶处理函数

我们需要继承KeyedProcessFunction这个抽象类,因为我的计算任务会根据KEY进行分组

image-20210815175232076

image-20210815175256141

(2)处理函数中编写计算逻辑与定时器注册

我们的计算程序如果是ProcessFunction的话,来一个元素便会触发一次processElement方法,其是真正的流式处理,且只能一个一个的处理,不像Window一般,我们可自定义窗口的大小(window是流与批的桥梁)。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-TAGcu2me-1629024341221)(C:\Users\leile\AppData\Roaming\Typora\typora-user-images\image-20210815175350234.png)]

value:便是我们的输入元素

ctx:是执行环境,我们可以在其中开启定时器,获取当前KEY,获取侧位输出(OutPutTag)等等一系列操作

out:则是输出元素结果搜集器

(1)注册定时器

image-20210815175841504

定时器的触发时间 支持处理时间与事件时间…我们可以根据选择自行进行注册

image-20210815175947779

image-20210815181523649

(2)处理时间定时器与事件时间定时器区别?

处理时间:即flink算子处理数据的时间(随着处理数据(元素)而不断增加)

ex:现在有三个数据 来源顺序如下 A > B > C

C 进入了算子 且注册了一个(12:02:00的处理时间定时器),Flink作业所在的机器上时间为 12:00:00,那么现在Flink作业的处理时间则为12:00:00

B 进入了算子,Flink作业所在的机器上时间为 12:01:00,那么现在Flink作业的处理时间则为12:01:00

A 进入了算子,Flink作业所在的机器上时间为 12:02:00,那么现在Flink作业的处理时间则为12:02:00

作业在处理了A元素后,便会触发 C 注册的定时器(处理时间已经大于等于12:02:00)


事件时间则为数据本身携带的时间属性(是否会增大受来源数据的时间影响,事件时间的选择受有KEY无无KEY影响,具体请查看前边的TimeWindow篇)

ex:现在有三个数据 来源顺序如下 A > B > C 假设他们的数据进入Flink程序有点延迟,但他们各自包含了自己的时间属性 A(12:01:00) > B(12:01:00) > C(12:00:00)

C 进入了算子 并注册了一个(12:02:00的事件时间定时器),且C元素的时间成为了 Flink作业最新的事件时间

B 进入了算子,B元素的事件时间为12:01:00,且B元素的时间成为了 Flink作业最新的事件时间

A 进入了算子,但A元素的事件时间还是为12:01:00, 这个时间等于 Flink作业事件时间,那么此时Flink作业的事件时间仍为12:01:00

A计算完成后,C触发注册的定时器(12:02:00)仍不会触发,因为目前Flink作业中,事件时间才到(12:01:00),其会等待直到有一个事件时间大于等于了12:02:00的数据到来才会触发

(3)处理函数完整代码
package com.leilei;

import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;
import java.util.Iterator;
import java.util.Map;

/**
 * @author lei
 * @version 1.0
 * @date 2021/3/21 22:17
 * @desc 模拟订单自动评价流程计算
 */
public class OrderSettlementProcess extends KeyedProcessFunction<String, Order, Object> {
    private final Long overTime;
    MapState<String, Long> productState;
    SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    public OrderSettlementProcess(Long overTime) {
        this.overTime = overTime;
    }

    /**
     * 数据处理
     *
     * @param currentOrder
     * @param ctx
     * @param out
     * @throws Exception
     */
    @Override
    public void processElement(Order currentOrder, Context ctx, Collector<Object> out) throws Exception {
        long time = currentOrder.getOrderTime() + this.overTime;
        //注册一个处理时间定时器 定时器触发时间为 value.getOrderTime() + this.overTime
        ctx.timerService().registerProcessingTimeTimer(time);
        // 注释代码是模拟同一个key 仅存在一个定时器,执行时间后覆盖前(将前边的定时器移除)
        //if (productState.contains(ctx.getCurrentKey())) {
        //    ctx.timerService().deleteProcessingTimeTimer(productState.get(ctx.getCurrentKey()));
        //}
        productState.put(ctx.getCurrentKey(), time);
        System.out.println(KeyUtil.buildKey(currentOrder) + " 订单过期时间为:" + time + " :" + df.format(time));
    }

    /**
     * 定时任务触发
     *
     * @param timestamp 就是上文设置的    ctx.timerService().registerProcessingTimeTimer(time); 时间戳
     * @param ctx       上下文环境  可获取当前Process的 分组key /设置定时器等
     * @param out       数据收集
     * @throws Exception
     */
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception {
        super.onTimer(timestamp, ctx, out);
        System.out.println("定时任务执行:" + timestamp + ":" + ctx.getCurrentKey());
        Iterator<Map.Entry<String, Long>> orderIterator = productState.iterator();
        if (orderIterator.hasNext()) {
            Map.Entry<String, Long> orderEntry = orderIterator.next();
            String key = orderEntry.getKey();
            Long expire = orderEntry.getValue();
            //模拟调用查询订单状态
            if (!isEvaluation(key) && expire == timestamp) {
                //todo 数据收集
                System.err.println(key + ">>>>> 超过订单未评价且超过最大评价时间,默认设置五星好评!");
            } else {
                System.out.println(key + "订单已评价!");
            }

        }
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        //定义map存储状态
        MapStateDescriptor<String, Long> mapStateDescriptor = new MapStateDescriptor<>("productState",
                TypeInformation.of(String.class),
                TypeInformation.of(Long.class));
        productState = getRuntimeContext().getMapState(mapStateDescriptor);
    }

    public Boolean isEvaluation(String orderKey) {
        //todo 查询订单状态
        return false;
    }
}

(4)Flink定时器DEMO主启动类

package com.leilei;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author lei
 * @version 1.0
 * @date 2021/3/21 22:00
 * @desc flink 定时器  (自动好评,超时结算等业务)
 */
public class FlinkTimer {
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Order> streamSource = env.addSource(new MyOrderSource());
        DataStream<Object> stream = streamSource
                //根据订单分组
                .keyBy(KeyUtil::buildKey)
                .process(new OrderSettlementProcess(120 * 1000L));
        stream.printToErr();
        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐