从某种意义上说,算子状态是更底层的状态类型,因为它只针对当前算子并行任务有效,不需要考虑不同 key 的隔离。

基本概念和特点

算子状态(Operator State)就是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务,与Key无关,不同Key的数据只要分发到同一个并行子任务,就会访问到同一个算子状态。

算子状态一般用在 Source 或 Sink 等与外部系统连接的算子上,或者完全没有 key 定义的场景。比如 Flink 的 Kafka 连接器中,就用到了算子状态。在我们给 Source 算子设置并行度后,Kafka 消费者的每一个并行实例,都会为对应的主题(topic)分区维护一个偏移量, 作为算子状态保存起来。这在保证 Flink 应用“精确一次”(exactly-once)状态一致性时非常有用。

状态类型

算子状态也支持不同的结构类型,主要有三种:ListState、UnionListState 和 BroadcastState。

列表状态(ListState)

与 Keyed State 中的 ListState 一样,将状态表示为一组数据的列表。

与 Keyed State 中的列表状态的区别是:在算子状态的上下文中,不会按键(key)分别处理状态,所以每一个并行子任务上只会保留一个“列表”(list),也就是当前并行子任务上所有状态项的集合。列表中的状态项就是可以重新分配的最细粒度,彼此之间完全独立。

当算子并行度进行缩放调整时,算子的列表状态中的所有元素项会被统一收集起来,相当于把多个分区的列表合并成了一个“大列表”,然后再均匀地分配给所有并行任务。这种“均匀分配”的具体方法就是“轮询”(round-robin),与之前介绍的 rebanlance 数据传输方式类似,是通过逐一“发牌”的方式将状态项平均分配的。这种方式也叫作“平均分割重组”(even-splitredistribution)。

算子状态中不会存在“键组”(key group)这样的结构,所以为了方便重组分配,就把它直接定义成了“列表”(list)。这也就解释了,为什么算子状态中没有最简单的值状态(ValueState)。

联合列表状态(UnionListState)

与 ListState 类似,联合列表状态也会将状态表示为一个列表。它与常规列表状态的区别在于,算子并行度进行缩放调整时对于状态的分配方式不同。

UnionListState 的重点就在于“联合”(union)。在并行度调整时,常规列表状态是轮询分配状态项,而联合列表状态的算子则会直接广播状态的完整列表。这样,并行度缩放之后的并行子任务就获取到了联合后完整的“大列表”,可以自行选择要使用的状态项和要丢弃的状态项。这种分配也叫作“联合重组”(union redistribution)。如果列表中状态项数量太多,为资源和效率考虑一般不建议使用联合重组的方式

代码实现

状态的本质就是算子并行子任务实例上的一个特殊的本地变量,它的特殊之处就在于Flink会提供完整的管理机制,来保证它的持久化缓存,以便故障进行恢复。

注意:在发生故障,重启应用后,无论按键分区,还是轮询分区,都会发生变化,就好比,打牌的时候从三个人变成四个人时,即使牌的顺序不发生变化,但牌的数量到每个人手里都不同。那么问题来了,如何保证原先状态和故障恢复后数据的对应关系呢?

对于按键状态,因为状态是和key相关的,相同key的数据不管在哪个分区,最后都是在同一个分区,所以只要将状态按照key的哈希值计算对应分区,重新分配即可。

对于算子状态,因为不存在key,所有数据发往哪个分区是不可知的,当故障重启之后,不能保证数据和以前一样进入同一个子任务,访问同一个状态,所以Flink提供了一个接口,根据需求来自行设计快照保存(snapshot)和恢复(restore)逻辑。

CheckpointedFunction 接口

在 Flink 中,对状态进行持久化保存的快照机制叫作“检查点”(Checkpoint)。于是使用算子状态时,就需要对检查点的相关操作进行定义,实现一个CheckpointedFunction 接口。
CheckpointedFunction 接口在源码中定义如下:

public interface CheckpointedFunction {
	// 保存状态快照到检查点时,调用这个方法
	void snapshotState(FunctionSnapshotContext context) throws Exception
	// 初始化状态时调用这个方法,也会在恢复状态时调用
 	void initializeState(FunctionInitializationContext context) throws Exception;
}

每次应用保存检查点做快照时,都会调用.snapshotState()方法,将状态进行外部持久化。而在算子任务进行初始化时,会调用. initializeState()方法。这又有两种情况:一种是整个应用第一次运行,这时状态会被初始化为一个默认值(default value);另一种是应用重启时,从检查点(checkpoint)或者保存点(savepoint)中读取之前状态的快照,并赋给本地状态。所以,接口中的.snapshotState()方法定义了检查点的快照保存逻辑,而. initializeState()方法不仅定义了初始化逻辑,也定义了恢复逻辑。

这里需要注意,CheckpointedFunction 接口中的两个方法,分别传入了一个上下文(context)作为参数。不同的是,.snapshotState()方法拿到的是快照的上下文 FunctionSnapshotContext,它可以提供检查点的相关信息,不过无法获取状态句柄;而. initializeState()方法拿到的是FunctionInitializationContext,这是函数类进行初始化时的上下文,是真正的“运行时上下文”。FunctionInitializationContext 中提供了“算子状态存储”(OperatorStateStore)和“按键分区状态存储(” KeyedStateStore),在这两个存储对象中可以非常方便地获取当前任务实例中的 OperatorState 和 Keyed State。

例如:

ListStateDescriptor<String> descriptor = new ListStateDescriptor<>(
	"buffered-elements",
	Types.of( String ) );

ListState<String> checkpointedState = context.getOperatorStateStore().getListState( descriptor );

示例代码

需求:批量缓存输出 (攒到10条数据就做一个输出)

public class BufferingSinkExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                                    @Override
                                    public long extractTimestamp(Event element, long recordTimestamp) {
                                        return element.timestamp;
                                    }
                                })
                );

        stream.print("input");

        //批量缓存输出 (攒到10条数据就做一个输出)
        stream.addSink(new BufferingSink(10));

        env.execute();
    }

    //自定义实现SinkFunction

    /**
     * 假如说如果没到10条数据,但已经发生了故障,那么发生故障后,之前的数据就已经消失了,
     * 为了保证数据不丢,那么就要做一个持久化的缓存,就是所谓的检查点
     */
    public static class BufferingSink implements SinkFunction<Event>, CheckpointedFunction {

        //定义当前类的属性,批量
        private final int threshold;

        //将数据写入持久化,需要媒介,定义一个List集合,充当媒介
        private List<Event> bufferedElements;

        public BufferingSink(int threshold) {
            this.threshold = threshold;
            this.bufferedElements = new ArrayList<>();
        }

        //定义算子状态
        private ListState<Event> checkpointedState;

        //每来一条数据,要做什么操作,都在这个方法里
        @Override
        public void invoke(Event value, Context context) throws Exception {
            //把来的每一条数据都缓存到列表中
            bufferedElements.add(value);
            //判断如果达到阈值,就批量写入
            if (bufferedElements.size() == threshold) {
                //用打印到控制台模拟写入外部系统
                for (Event event : bufferedElements) {
                    System.out.println(event);
                }
                System.out.println("====================输出完毕====================");
                bufferedElements.clear();
            }
        }

        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {

            //清空状态,保证状态跟这里的bufferedElements完全一样
            checkpointedState.clear();

            //对状态进行持久化,复制缓存的列表到列表状态
            for (Event event : bufferedElements) {
                checkpointedState.add(event);
            }

        }

        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            //定义算子状态
            ListStateDescriptor<Event> descriptor = new ListStateDescriptor<>("buffer", Event.class);
            checkpointedState = context.getOperatorStateStore().getListState(descriptor);

            //如果故障恢复,需要将ListState中的所有元素复制到列表中
            if (context.isRestored()) {
                for (Event event : checkpointedState.get()){
                    bufferedElements.add(event);
                }
            }
        }
    }
}

当初始化好状态对象后,可以通过调用. isRestored()方法判断是否是从故障中恢复。在代码中 BufferingSink 初始化时,恢复出的 ListState 的所有元素会添加到一个局部变量bufferedElements 中,以后进行检查点快照时就可以直接使用了。在调用.snapshotState()时,直接清空 ListState,然后把当前局部变量中的所有元素写入到检查点中。

对于不同类型的算子状态,需要调用不同的获取状态对象的接口,对应地也就会使用不同的状态分配重组算法。比如获取列表状态时,调用.getListState() 会使用最简单的 平均分割重组(even-split redistribution)算法;而获取联合列表状态时,调用的是.getUnionListState() , 对应就会使用联合重组(union redistribution) 算法。

Gitee完整代码

Logo

华为云1024程序员节送福利,参与活动赢单人4000元礼包,更有热门技术干货免费学习

更多推荐