概述

  Flink 处理机制的核心,就是“有状态的流式计算”。我们在之前的章节中也已经多次提到了“状态”(state),不论是简单聚合、窗口聚合,还是处理函数的应用,都会有状态的身影出现。在前面,我们已经简单介绍过有状态流处理,状态就如同事务处理时数据库中保存的信息一样,是用来辅助进行任务计算的数据。而在 Flink 这样的分布式系统中,我们不仅需要定义出状态在任务并行时的处理方式,还需要考虑如何持久化保存、以便发生故障时正确地恢复。这就需要一套完整的管理机制来处理所有的状态。

一、Flink 中的状态

1. 有状态算子

在 Flink 中,算子任务可以分为无状态和有状态两种情况。

  无状态的算子任务只需要观察每个独立事件,根据当前输入的数据直接转换输出结果,如图所示。例如,可以将一个字符串类型的数据拆分开作为元组输出;也可以对数据做一些计算,比如每个代表数量的字段加 1。我们之前讲到的基本转换算子,如 map、filter、flatMap, 计算时不依赖其他数据,就都属于无状态的算子。
在这里插入图片描述
  而有状态的算子任务,则除当前数据之外,还需要一些其他数据来得到计算结果。这里的“其他数据”,就是所谓的状态(state),最常见的就是之前到达的数据,或者由之前数据计算出的某个结果。比如,做求和(sum)计算时,需要保存之前所有数据的和,这就是状态;窗口算子中会保存已经到达的所有数据,这些也都是它的状态。另外,如果我们希望检索到某种“事件模式”(event pattern),比如“先有下单行为,后有支付行为”,那么也应该把之前的行为保存下来,这同样属于状态。容易发现,之前讲过的聚合算子、窗口算子都属于有状态的算子。
在这里插入图片描述
如图所示为有状态算子的一般处理流程,具体步骤如下。
(1) 算子任务接收到上游发来的数据;
(2) 获取当前状态;
(3) 根据业务逻辑进行计算,更新状态;
(4) 得到计算结果,输出发送到下游任务。

2. 状态的管理

  在传统的事务型处理架构中,这种额外的状态数据是保存在数据库中的。而对于实时流处理来说,这样做需要频繁读写外部数据库,如果数据规模非常大肯定就达不到性能要求了。所以 Flink 的解决方案是,将状态直接保存在内存中来保证性能,并通过分布式扩展来提高吞吐量。
  在 Flink 中,每一个算子任务都可以设置并行度,从而可以在不同的 slot 上并行运行多个实例,我们把它们叫作“并行子任务”。而状态既然在内存中,那么就可以认为是子任务实例上的一个本地变量,能够被任务的业务逻辑访问和修改。
  这样看来状态的管理似乎非常简单,我们直接把它作为一个对象交给 JVM 就可以了。然而大数据的场景下,我们必须使用分布式架构来做扩展,在低延迟、高吞吐的基础上还要保证容错性,一系列复杂的问题就会随之而来了。

  • 状态的访问权限。我们知道 Flink 上的聚合和窗口操作,一般都是基于 KeyedStream 的,数据会按照 key 的哈希值进行分区,聚合处理的结果也应该是只对当前 key 有效。然而同一个分区(也就是 slot)上执行的任务实例,可能会包含多个key 的数据,它们同时访问和更改本地变量,就会导致计算结果错误。所以这时状态并不是单纯的本地变量。
  • 容错性,也就是故障后的恢复。状态只保存在内存中显然是不够稳定的,我们需要将它持久化保存,做一个备份;在发生故障后可以从这个备份中恢复状态。
  • 我们还应该考虑到分布式应用的横向扩展性。比如处理的数据量增大时,我们应该相应地对计算资源扩容,调大并行度。这时就涉及到了状态的重组调整。

  可见状态的管理并不是一件轻松的事。好在 Flink 作为有状态的大数据流式处理框架,已经帮我们搞定了这一切。Flink 有一套完整的状态管理机制,将底层一些核心功能全部封装起来,包括状态的高效存储和访问、持久化保存和故障恢复,以及资源扩展时的调整。这样,我们只需要调用相应的 API 就可以很方便地使用状态,或对应用的容错机制进行配置,从而将更多的精力放在业务逻辑的开发上。

3. 状态的分类

3.1 托管状态(Managed State)和原始状态(Raw State)

  Flink 的状态有两种:托管状态(Managed State)和原始状态(Raw State)。托管状态就是由 Flink 统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由 Flink 实现,我们只要调接口就可以;而原始状态则是自定义的,相当于就是开辟了一块内存,需要我们自己管理,实现状态的序列化和故障恢复。
  具体来讲,托管状态是由 Flink 的运行时(Runtime)来托管的;在配置容错机制后,状态会自动持久化保存,并在发生故障时自动恢复。当应用发生横向扩展时,状态也会自动地重组分配到所有的子任务实例上。对于具体的状态内容,Flink 也提供了值状态(ValueState)、列表状态(ListState)、映射状态(MapState)、聚合状态(AggregateState)等多种结构,内部支持各种数据类型。聚合、窗口等算子中内置的状态,就都是托管状态;我们也可以在富函数类(RichFunction)中通过上下文来自定义状态,这些也都是托管状态。
  而对比之下,原始状态就全部需要自定义了。Flink 不会对状态进行任何自动操作,也不知道状态的具体数据类型,只会把它当作最原始的字节(Byte)数组来存储。我们需要花费大量的精力来处理状态的管理和维护。
  所以只有在遇到托管状态无法实现的特殊需求时,我们才会考虑使用原始状态;一般情况下不推荐使用。绝大多数应用场景,我们都可以用 Flink 提供的算子或者自定义托管状态来实现需求。

3.2 算子状态(Operator State)和按键分区状态(Keyed State)

接下来我们的重点就是托管状态Managed State)。

  我们知道在 Flink 中,一个算子任务会按照并行度分为多个并行子任务执行,而不同的子任务会占据不同的任务槽(task slot)。由于不同的 slot 在计算资源上是物理隔离的,所以 Flink能管理的状态在并行任务间是无法共享的,每个状态只能针对当前子任务的实例有效。
  而很多有状态的操作(比如聚合、窗口)都是要先做 keyBy 进行按键分区的。按键分区之后,任务所进行的所有计算都应该只针对当前 key 有效,所以状态也应该按照 key 彼此隔离。在这种情况下,状态的访问方式又会有所不同。

基于这样的想法,我们又可以将托管状态分为两类:算子状态按键分区状态

(1)算子状态(Operator State)
  状态作用范围限定为当前的算子任务实例,也就是只对当前并行子任务实例有效。这就意味着对于一个并行子任务,占据了一个“分区”,它所处理的所有数据都会访问到相同的状态, 状态对于同一任务而言是共享的,如图所示。

在这里插入图片描述
  算子状态可以用在所有算子上,使用的时候其实就跟一个本地变量没什么区别——因为本地变量的作用域也是当前任务实例。在使用时,我们还需进一步实现CheckpointedFunction 接口。

(2)按键分区状态(Keyed State)
  状态是根据输入流中定义的键(key)来维护和访问的,所以只能定义在按键分区流(KeyedStream)中,也就keyBy之后才可以使用,如图所示。

在这里插入图片描述
  按键分区状态应用非常广泛。之前讲到的聚合算子必须在 keyBy 之后才能使用,就是因为聚合的结果是以Keyed State 的形式保存的。另外,也可以通过富函数类(Rich Function) 来自定义Keyed State,所以只要提供了富函数类接口的算子,也都可以使用 Keyed State。
  所以即使是map、filter 这样无状态的基本转换算子,我们也可以通过富函数类给它们“追加”Keyed State,或者实现 CheckpointedFunction 接口来定义Operator State;从这个角度讲, Flink 中所有的算子都可以是有状态的,不愧是“有状态的流处理”。
  无论是 Keyed State 还是 Operator State,它们都是在本地实例上维护的,也就是说每个并行子任务维护着对应的状态,算子的子任务之间状态不共享。关于状态的具体使用,我们会在下面继续展开讲解。

二、按键分区状态(Keyed State)

  在实际应用中,我们一般都需要将数据按照某个 key 进行分区,然后再进行计算处理;所以最为常见的状态类型就是 Keyed State。之前介绍到 keyBy 之后的聚合、窗口计算,算子所持有的状态,都是Keyed State。
  另外,我们还可以通过富函数类(Rich Function)对转换算子进行扩展、实现自定义功能, 比如 RichMapFunction、RichFilterFunction。在富函数中,我们可以调用.getRuntimeContext() 获取当前的运行时上下文(RuntimeContext),进而获取到访问状态的句柄;这种富函数中自定义的状态也是 Keyed State。

1. 基本概念和特点

  按键分区状态(Keyed State)顾名思义,是任务按照键(key)来访问和维护的状态。它的特点非常鲜明,就是以key 为作用范围进行隔离。
  我们知道,在进行按键分区(keyBy)之后,具有相同键的所有数据,都会分配到同一个并行子任务中;所以如果当前任务定义了状态,Flink 就会在当前并行子任务实例中,为每个键值维护一个状态的实例。于是当前任务就会为分配来的所有数据,按照 key 维护和处理对应的状态。
  因为一个并行子任务可能会处理多个 key 的数据,所以 Flink 需要对 Keyed State 进行一些特殊优化。在底层,Keyed State 类似于一个分布式的映射(map)数据结构,所有的状态会根据 key 保存成键值对(key-value)的形式。这样当一条数据到来时,任务就会自动将状态的访问范围限定为当前数据的key,从 map 存储中读取出对应的状态值。所以具有相同 key 的所有数据都会到访问相同的状态,而不同 key 的状态之间是彼此隔离的。
  这种将状态绑定到key 上的方式,相当于使得状态和流的逻辑分区一一对应了:不会有别的 key 的数据来访问当前状态;而当前状态对应 key 的数据也只会访问这一个状态,不会分发到其他分区去。这就保证了对状态的操作都是本地进行的,对数据流和状态的处理做到了分区一致性。
  另外,在应用的并行度改变时,状态也需要随之进行重组。不同 key 对应的 Keyed State 可以进一步组成所谓的键组(key groups),每一组都对应着一个并行子任务。键组是 Flink 重新分配 Keyed State 的单元,键组的数量就等于定义的最大并行度。当算子并行度发生改变时, Keyed State 就会按照当前的并行度重新平均分配,保证运行时各个子任务的负载相同。
  需要注意,使用Keyed State 必须基于KeyedStream。没有进行 keyBy 分区的DataStream, 即使转换算子实现了对应的富函数类,也不能通过运行时上下文访问Keyed State。

2. 支持的结构类型

  实际应用中,需要保存为状态的数据会有各种各样的类型,有时还需要复杂的集合类型, 比如列表(List)和映射(Map)。对于这些常见的用法,Flink 的按键分区状态(Keyed State)提供了足够的支持。接下来我们就来了解一下 Keyed State 所支持的结构类型.

2.1 值状态(ValueState)

  顾名思义,状态中只保存一个“值”(value)。ValueState<T>本身是一个接口,源码中定义如下:

public interface ValueState<T> extends State { 
	T value() throws IOException;
	void update(T value) throws IOException;
}

  这里的 T 是泛型,表示状态的数据内容可以是任何具体的数据类型。如果想要保存一个长整型值作为状态,那么类型就是 ValueState<Long>
我们可以在代码中读写值状态,实现对于状态的访问和更新。

  • T value():获取当前状态的值;
  • update(T value):对状态进行更新,传入的参数 value 就是要覆写的状态值。

  在具体使用时,为了让运行时上下文清楚到底是哪个状态,我们还需要创建一个“状态描述器”(StateDescriptor)来提供状态的基本信息。例如源码中,ValueState 的状态描述器构造方法如下:

public ValueStateDescriptor(String name, Class<T> typeClass) { 
	super(name, typeClass, null);
}

  这里需要传入状态的名称和类型——这跟我们声明一个变量时做的事情完全一样。有了这个描述器,运行时环境就可以获取到状态的控制句柄(handler)了。

2.2 列表状态(ListState)

  将需要保存的数据,以列表(List)的形式组织起来。在 ListState<T>接口中同样有一个类型参数T,表示列表中数据的类型。ListState 也提供了一系列的方法来操作状态,使用方式与一般的List 非常相似。

  • Iterable<T> get():获取当前的列表状态,返回的是一个可迭代类型 Iterable;
  • update(List<T> values):传入一个列表values,直接对状态进行覆盖;
  • add(T value):在状态列表中添加一个元素 value;
  • addAll(List<T> values):向列表中添加多个元素,以列表 values 形式传入。

  类似地,ListState 的状态描述器就叫作 ListStateDescriptor,用法跟 ValueStateDescriptor
完全一致。

2.3 映射状态(MapState)

  把一些键值对(key-value)作为状态整体保存起来,可以认为就是一组 key-value 映射的列表。对应的 MapState<UK, UV>接口中,就会有 UK、UV 两个泛型,分别表示保存的 key 和 value 的类型。同样,MapState 提供了操作映射状态的方法,与 Map 的使用非常类似。

  • UV get(UK key):传入一个 key 作为参数,查询对应的 value 值;
  • put(UK key, UV value):传入一个键值对,更新 key 对应的 value 值;
  • putAll(Map<UK, UV> map):将传入的映射 map 中所有的键值对,全部添加到映射状态中;
  • remove(UK key):将指定 key 对应的键值对删除;
  • boolean contains(UK key):判断是否存在指定的 key,返回一个 boolean 值。

另外,MapState 也提供了获取整个映射相关信息的方法:

  • Iterable<Map.Entry<UK, UV>> entries():获取映射状态中所有的键值对;
  • Iterable<UK> keys():获取映射状态中所有的键(key),返回一个可迭代 Iterable 类型;
  • Iterable<UV> values():获取映射状态中所有的值(value),返回一个可迭代 Iterable类型;
  • boolean isEmpty():判断映射是否为空,返回一个 boolean 值。

2.4 归约状态(ReducingState)

  类似于值状态(Value),不过需要对添加进来的所有数据进行归约,将归约聚合之后的值作为状态保存下来。ReducintState<T>这个接口调用的方法类似于 ListState,只不过它保存的只是一个聚合值,所以调用.add()方法时,不是在状态列表里添加元素,而是直接把新数据和之前的状态进行归约,并用得到的结果更新状态。
  归约逻辑的定义,是在归约状态描述器(ReducingStateDescriptor)中,通过传入一个归约函数(ReduceFunction)来实现的。这里的归约函数,就是我们之前介绍 reduce 聚合算子时讲到的 ReduceFunction,所以状态类型跟输入的数据类型是一样的。

public ReducingStateDescriptor(String name, ReduceFunction<T> reduceFunction, Class<T> typeClass) {...}

  这里的描述器有三个参数,其中第二个参数就是定义了归约聚合逻辑的 ReduceFunction, 另外两个参数则是状态的名称和类型。

2.5 聚合状态(AggregatingState)

  与归约状态非常类似,聚合状态也是一个值,用来保存添加进来的所有数据的聚合结果。与 ReducingState 不同的是,它的聚合逻辑是由在描述器中传入一个更加一般化的聚合函数
AggregateFunction)来定义的;这也就是之前我们讲过的 AggregateFunction,里面通过一个累加器(Accumulator)来表示状态,所以聚合的状态类型可以跟添加进来的数据类型完全不同,使用更加灵活。
  同样地,AggregatingState 接口调用方法也与ReducingState 相同,调用.add()方法添加元素时,会直接使用指定的AggregateFunction 进行聚合并更新状态。

3. 代码实现

3.1 整体介绍

  在 Flink 中,状态始终是与特定算子相关联的;算子在使用状态前首先需要“注册”,其实就是告诉 Flink 当前上下文中定义状态的信息,这样运行时的 Flink 才能知道算子有哪些状态。
  状态的注册,主要是通过“状态描述器”(StateDescriptor)来实现的。状态描述器中最重要的内容,就是状态的名称(name)和类型(type)。我们知道 Flink 中的状态,可以认为是加了一些复杂操作的内存中的变量;而当我们在代码中声明一个局部变量时,都需要指定变量类型和名称,名称就代表了变量在内存中的地址,类型则指定了占据内存空间的大小。同样地, 我们一旦指定了名称和类型,Flink 就可以在运行时准确地在内存中找到对应的状态,进而返回状态对象供我们使用了。所以在一个算子中,我们也可以定义多个状态,只要它们的名称不同就可以了。
  另外,状态描述器中还可能需要传入一个用户自定义函数(user-defined-function,UDF),用来说明处理逻辑,比如前面提到的 ReduceFunction 和AggregateFunction。
  以 ValueState 为例,我们可以定义值状态描述器如下:

ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>( 
	"my state", // 状态名称
	Types.LONG // 状态类型
);

这里我们定义了一个叫作“my state”的长整型 ValueState 的描述器。

  代码中完整的操作是,首先定义出状态描述器;然后调用.getRuntimeContext()方法获取运行时上下文;继而调用 RuntimeContext 的获取状态的方法,将状态描述器传入,就可以得到对应的状态了。
  因为状态的访问需要获取运行时上下文,这只能在富函数类(Rich Function)中获取到, 所以自定义的Keyed State 只能在富函数中使用。当然,底层的处理函数(Process Function) 本身继承了AbstractRichFunction 抽象类,所以也可以使用。
  在富函数中,调用.getRuntimeContext()方法获取到运行时上下文之后,RuntimeContext 有以下几个获取状态的方法:

ValueState<T> getState(ValueStateDescriptor<T>) 
MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>) 
ListState<T> getListState(ListStateDescriptor<T>)
ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)

  对于不同结构类型的状态,只要传入对应的描述器、调用对应的方法就可以了。
  获取到状态对象之后,就可以调用它们各自的方法进行读写操作了。另外,所有类型的状态都有一个方法.clear(),用于清除当前状态。

代码中使用状态的整体结构如下:

public static class MyFlatMapFunction extends RichFlatMapFunction<Long, String> {
    // 声明状态
    private transient ValueState<Long> state;


    @Override
    public void open(Configuration config) {
        // 在 open 生命周期方法中获取状态
        ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>(
            "my state", // 状态名称
            Types.LONG // 状态类型
        );

        state = getRuntimeContext().getState(descriptor);

    }


    @Override
    public void flatMap(Long input, Collector<String> out) throws Exception {
        // 访问状态
        Long currentState = state.value();
        currentState += 1;	// 状态数值加 1
        // 更新状态
        state.update(currentState); if (currentState >= 100) {
            out.collect(“state:+ currentState); state.clear();	// 清空状态
        }

    }
}

  因为 RichFlatmapFunction 中的.flatmap()是每来一条数据都会调用一次的,所以我们不应该在这里调用运行时上下文的.getState()方法,而是在生命周期方法.open()中获取状态对象。另外还有一个问题,我们获取到的状态对象也需要有一个变量名称 state(注意这里跟状态的名称 my state 不同),但这个变量不应该在 open 中声明——否则在.flatmap()里就访问不到了。所以我们还需要在外面直接把它定义为类的属性,这样就可以在不同的方法中通用了。而在外部又不能直接获取状态,因为编译时是无法拿到运行时上下文的。所以最终的解决方案就变成了: 在外部声明状态对象,在open 生命周期方法中通过运行时上下文获取状态。
  这里需要注意,这种方式定义的都是 Keyed State,它对于每个 key 都会保存一份状态实例。所以对状态进行读写操作时,获取到的状态跟当前输入数据的 key 有关;只有相同 key 的数据,才会操作同一个状态,不同 key 的数据访问到的状态值是不同的。而且上面提到的.clear()方法,也只会清除当前 key 对应的状态。
  另外,状态不一定都存储在内存中,也可以放在磁盘或其他地方,具体的位置是由一个可配置的组件来管理的,这个组件叫作“状态后端”(State Backend)。关于状态后端,我们后面详细介绍。

下面我们给出不同类型的状态的应用实例。

3.2 值状态(ValueState)

  我们这里会使用用户 id 来进行分流,然后分别统计每个用户的 pv 数据,由于我们并不想每次 pv 加一,就将统计结果发送到下游去,所以这里我们注册了一个定时器,用来隔一段时间发送 pv 的统计结果,这样对下游算子的压力不至于太大。具体实现方式是定义一个用来保存定时器时间戳的值状态变量。当定时器触发并向下游发送数据以后,便清空储存定时器时间戳的状态变量,这样当新的数据到来时,发现并没有定时器存在,就可以注册新的定时器了, 注册完定时器之后将定时器的时间戳继续保存在状态变量中。

import com.atguigu.chapter05.ClickSource;
import com.atguigu.chapter05.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;

import org.apache.flink.util.Collector;



public class PeriodicPvExample{

    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                );

        stream.print("input");

        // 统计每个用户的pv,隔一段时间(10s)输出一次结果
        stream.keyBy(data -> data.user)
                .process(new PeriodicPvResult())
                .print();

        env.execute();
    }

    // 注册定时器,周期性输出pv
    public static class PeriodicPvResult extends KeyedProcessFunction<String ,Event, String>{
        // 定义两个状态,保存当前pv值,以及定时器时间戳
        ValueState<Long> countState;
        ValueState<Long> timerTsState;

        @Override
        public void open(Configuration parameters) throws Exception {
            countState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("count", Long.class));
            timerTsState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timerTs", Long.class));
        }

        @Override
        public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
            // 更新count值
            Long count = countState.value();
            if (count == null){
                countState.update(1L);
            } else {
                countState.update(count + 1);
            }
            // 注册定时器
            if (timerTsState.value() == null){
                ctx.timerService().registerEventTimeTimer(value.timestamp + 10 * 1000L);
                timerTsState.update(value.timestamp + 10 * 1000L);
            }
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            out.collect(ctx.getCurrentKey() + " pv: " + countState.value());
            // 清空状态
            timerTsState.clear();
        }
    }}

3.3 列表状态(ListState)

在 Flink SQL 中,支持两条流的全量 Join,语法如下:

SELECT * FROM A INNER JOIN B WHERE A.id = B.id;

  这样一条 SQL 语句要慎用,因为 Flink 会将 A 流和 B 流的所有数据都保存下来,然后进行 Join。不过在这里我们可以用列表状态变量来实现一下这个 SQL 语句的功能。代码如下:

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;

public class TwoStreamFullJoinExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Tuple3<String, String, Long>> stream1 = env
                .fromElements(
                        Tuple3.of("a", "stream-1", 1000L),
                        Tuple3.of("b", "stream-1", 2000L)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
                                    @Override
                                    public long extractTimestamp(Tuple3<String, String, Long> t, long l) {
                                        return t.f2;
                                    }
                                })
                );

        SingleOutputStreamOperator<Tuple3<String, String, Long>> stream2 = env
                .fromElements(
                        Tuple3.of("a", "stream-2", 3000L),
                        Tuple3.of("b", "stream-2", 4000L)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
                                    @Override
                                    public long extractTimestamp(Tuple3<String, String, Long> t, long l) {
                                        return t.f2;
                                    }
                                })
                );

        stream1.keyBy(r -> r.f0)
                .connect(stream2.keyBy(r -> r.f0))
                .process(new CoProcessFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>, String>() {
                    private ListState<Tuple3<String, String, Long>> stream1ListState;
                    private ListState<Tuple3<String, String, Long>> stream2ListState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        stream1ListState = getRuntimeContext().getListState(
                                new ListStateDescriptor<Tuple3<String, String, Long>>("stream1-list", Types.TUPLE(Types.STRING, Types.STRING))
                        );
                        stream2ListState = getRuntimeContext().getListState(
                                new ListStateDescriptor<Tuple3<String, String, Long>>("stream2-list", Types.TUPLE(Types.STRING, Types.STRING))
                        );
                    }

                    @Override
                    public void processElement1(Tuple3<String, String, Long> left, Context context, Collector<String> collector) throws Exception {
                        stream1ListState.add(left);
                        for (Tuple3<String, String, Long> right : stream2ListState.get()) {
                            collector.collect(left + " => " + right);
                        }
                    }

                    @Override
                    public void processElement2(Tuple3<String, String, Long> right, Context context, Collector<String> collector) throws Exception {
                        stream2ListState.add(right);
                        for (Tuple3<String, String, Long> left : stream1ListState.get()) {
                            collector.collect(left + " => " + right);
                        }
                    }
                })
                .print();

        env.execute();
    }
}

3.4 映射状态(MapState)

  映射状态的用法和 Java 中的 HashMap 很相似。在这里我们可以通过 MapState 的使用来探索一下窗口的底层实现,也就是我们要用映射状态来完整模拟窗口的功能。这里我们模拟一个滚动窗口。我们要计算的是每一个 url 在每一个窗口中的 pv 数据。我们之前使用增量聚合和全窗口聚合结合的方式实现过这个需求。这里我们用 MapState 再来实现一下。

import com.atguigu.chapter05.ClickSource;
import com.atguigu.chapter05.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;

import org.apache.flink.util.Collector;

import java.sql.Timestamp;


// 使用KeyedProcessFunction模拟滚动窗口
public class FakeWindowExample {


    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                );

        // 统计每10s窗口内,每个url的pv
        stream.keyBy(data -> data.url)
                .process(new FakeWindowResult(10000L))
                .print();

        env.execute();
    }

    public static class FakeWindowResult extends KeyedProcessFunction<String, Event, String>{
        // 定义属性,窗口长度
        private Long windowSize;

        public FakeWindowResult(Long windowSize) {
            this.windowSize = windowSize;
        }

        // 声明状态,用map保存pv值(窗口start,count)
        MapState<Long, Long> windowPvMapState;

        @Override
        public void open(Configuration parameters) throws Exception {
            windowPvMapState = getRuntimeContext().getMapState(new MapStateDescriptor<Long, Long>("window-pv", Long.class, Long.class));
        }

        @Override
        public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
            // 每来一条数据,就根据时间戳判断属于哪个窗口
            Long windowStart = value.timestamp / windowSize * windowSize;
            Long windowEnd = windowStart + windowSize;

            // 注册 end -1 的定时器,窗口触发计算
            ctx.timerService().registerEventTimeTimer(windowEnd - 1);

            // 更新状态中的pv值
            if (windowPvMapState.contains(windowStart)){
                Long pv = windowPvMapState.get(windowStart);
                windowPvMapState.put(windowStart, pv + 1);
            } else {
                windowPvMapState.put(windowStart, 1L);
            }
        }

        // 定时器触发,直接输出统计的pv结果
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            Long windowEnd = timestamp + 1;
            Long windowStart = windowEnd - windowSize;
            Long pv = windowPvMapState.get(windowStart);
            out.collect( "url: " + ctx.getCurrentKey()
                    + " 访问量: " + pv
                    + " 窗口:" + new Timestamp(windowStart) + " ~ " + new Timestamp(windowEnd));

            // 模拟窗口的销毁,清除map中的key
            windowPvMapState.remove(windowStart);
        }
    }    
}

3.5 聚合状态(AggregatingState)

  我们举一个简单的例子,对用户点击事件流每 5 个数据统计一次平均时间戳。这是一个类似计数窗口( CountWindow) 求平均值的计算, 这里我们可以使用一个有聚合状态的RichFlatMapFunction 来实现。

import com.atguigu.chapter05.ClickSource;
import com.atguigu.chapter05.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;

public class AverageTimestampExample {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                );


        // 统计每个用户的点击频次,到达5次就输出统计结果
        stream.keyBy(data -> data.user)
                .flatMap(new AvgTsResult())
                .print();

        env.execute();
    }

    public static class AvgTsResult extends RichFlatMapFunction<Event, String>{
        // 定义聚合状态,用来计算平均时间戳
        AggregatingState<Event, Long> avgTsAggState;

        // 定义一个值状态,用来保存当前用户访问频次
        ValueState<Long> countState;

        @Override
        public void open(Configuration parameters) throws Exception {
            avgTsAggState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<Event, Tuple2<Long, Long>, Long>(
                    "avg-ts",
                    new AggregateFunction<Event, Tuple2<Long, Long>, Long>() {
                        @Override
                        public Tuple2<Long, Long> createAccumulator() {
                            return Tuple2.of(0L, 0L);
                        }

                        @Override
                        public Tuple2<Long, Long> add(Event value, Tuple2<Long, Long> accumulator) {
                            return Tuple2.of(accumulator.f0 + value.timestamp, accumulator.f1 + 1);
                        }

                        @Override
                        public Long getResult(Tuple2<Long, Long> accumulator) {
                            return accumulator.f0 / accumulator.f1;
                        }

                        @Override
                        public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
                            return null;
                        }
                    },
                    Types.TUPLE(Types.LONG, Types.LONG)
            ));

            countState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("count", Long.class));
        }

        @Override
        public void flatMap(Event value, Collector<String> out) throws Exception {
            Long count = countState.value();
            if (count == null){
                count = 1L;
            } else {
                count ++;
            }

            countState.update(count);
            avgTsAggState.add(value);

            // 达到5次就输出结果,并清空状态
            if (count == 5){
                out.collect(value.user + " 平均时间戳:" + new Timestamp(avgTsAggState.get()));
                countState.clear();
            }
        }
    }
}

4. 状态生存时间(TTL)

  实际应用中,很多状态会随着时间的推移逐渐增长,如果不加以限制,最终就会导致存储空间的耗尽。一个优化的思路是直接在代码中调用.clear()方法去清除状态,但是有时候我们的逻辑要求不能直接清除。这时就需要配置一个状态的“生存时间”(time-to-live,TTL),当状态在内存中存在的时间超出这个值时,就将它清除。
  具体实现上,如果用一个进程不停地扫描所有状态看是否过期,显然会占用大量资源做无用功。状态的失效其实不需要立即删除,所以我们可以给状态附加一个属性,也就是状态的“失效时间”。状态创建的时候,设置 失效时间 = 当前时间 + TTL;之后如果有对状态的访问和修改,我们可以再对失效时间进行更新;当设置的清除条件被触发时(比如,状态被访问的时候,或者每隔一段时间扫描一次失效状态),就可以判断状态是否失效、从而进行清除了。
  配置状态的 TTL 时,需要创建一个 StateTtlConfig 配置对象,然后调用状态描述器的.enableTimeToLive()方法启动TTL 功能。

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(10))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("my state", String.class);

stateDescriptor.enableTimeToLive(ttlConfig);

这里用到了几个配置项:

  • .newBuilder()
    状态TTL 配置的构造器方法,必须调用,返回一个 Builder 之后再调用.build()方法就可以得到 StateTtlConfig 了。方法需要传入一个 Time 作为参数,这就是设定的状态生存时间。
  • .setUpdateType()
    设置更新类型。更新类型指定了什么时候更新状态失效时间,这里的 OnCreateAndWrite 表示只有创建状态和更改状态(写操作)时更新失效时间。另一种类型OnReadAndWrite 则表示无论读写操作都会更新失效时间,也就是只要对状态进行了访问,就表明它是活跃的,从而延长生存时间。这个配置默认为 OnCreateAndWrite。
  • .setStateVisibility()
    设置状态的可见性。所谓的“状态可见性”,是指因为清除操作并不是实时的,所以当状态过期之后还有可能基于存在,这时如果对它进行访问,能否正常读取到就是一个问题了。这里设置的NeverReturnExpired 是默认行为,表示从不返回过期值,也就是只要过期就认为它已经被清除了,应用不能继续读取;这在处理会话或者隐私数据时比较重要。对应的另一种配置是ReturnExpireDefNotCleanedUp,就是如果过期状态还存在,就返回它的值。

  除此之外,TTL 配置还可以设置在保存检查点(checkpoint)时触发清除操作,或者配置增量的清理(incremental cleanup),还可以针对 RocksDB 状态后端使用压缩过滤器(compaction filter)进行后台清理。关于检查点和状态后端的内容,我们会在后续章节继续讲解。
  这里需要注意,目前的 TTL 设置只支持处理时间。另外,所有集合类型的状态(例如ListState、MapState)在设置 TTL 时,都是针对每一项(per-entry)元素的。也就是说,一个列表状态中的每一个元素,都会以自己的失效时间来进行清理,而不是整个列表一起清理。

三、算子状态(Operator State)

1.基本概念和特点

  算子状态(Operator State)就是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务。算子状态跟数据的 key 无关,所以不同 key 的数据只要被分发到同一个并行子任务, 就会访问到同一个Operator State。
  算子状态的实际应用场景不如Keyed State 多,一般用在 Source 或 Sink 等与外部系统连接的算子上,或者完全没有 key 定义的场景。比如 Flink 的 Kafka 连接器中,就用到了算子状态。在我们给 Source 算子设置并行度后,Kafka 消费者的每一个并行实例,都会为对应的主题(topic)分区维护一个偏移量, 作为算子状态保存起来。这在保证 Flink 应用“精确一次”(exactly-once)状态一致性时非常有用。关于状态一致性的内容,我们会在后面详细展开。
  当算子的并行度发生变化时,算子状态也支持在并行的算子任务实例之间做重组分配。根据状态的类型不同,重组分配的方案也会不同。

2. 状态类型

  算子状态也支持不同的结构类型,主要有三种:ListStateUnionListStateBroadcastState

2.1 列表状态(ListState)

  与 Keyed State 中的 ListState 一样,将状态表示为一组数据的列表。
  与 Keyed State 中的列表状态的区别是:在算子状态的上下文中,不会按键(key)分别处理状态,所以每一个并行子任务上只会保留一个“列表”(list),也就是当前并行子任务上所有状态项的集合。列表中的状态项就是可以重新分配的最细粒度,彼此之间完全独立。
  当算子并行度进行缩放调整时,算子的列表状态中的所有元素项会被统一收集起来,相当于把多个分区的列表合并成了一个“大列表”,然后再均匀地分配给所有并行任务。这种“均匀分配”的具体方法就是“轮询”(round-robin),与之前介绍的 rebanlance 数据传输方式类似,是通过逐一“发牌”的方式将状态项平均分配的。这种方式也叫作“平均分割重组”(even-split redistribution)。
  算子状态中不会存在“键组”(key group)这样的结构,所以为了方便重组分配,就把它直接定义成了“列表”( list)。这也就解释了,为什么算子状态中没有最简单的值状态(ValueState)。

2.2 联合列表状态(UnionListState)

  与 ListState 类似,联合列表状态也会将状态表示为一个列表。它与常规列表状态的区别在于,算子并行度进行缩放调整时对于状态的分配方式不同。
  UnionListState 的重点就在于“联合”(union)。在并行度调整时,常规列表状态是轮询分配状态项,而联合列表状态的算子则会直接广播状态的完整列表。这样,并行度缩放之后的并行子任务就获取到了联合后完整的“大列表”,可以自行选择要使用的状态项和要丢弃的状态项。这种分配也叫作“联合重组”(union redistribution)。如果列表中状态项数量太多,为资源和效率考虑一般不建议使用联合重组的方式。

2.3 广播状态(BroadcastState)

  有时我们希望算子并行子任务都保持同一份“全局”状态,用来做统一的配置和规则设定。这时所有分区的所有数据都会访问到同一个状态,状态就像被“广播”到所有分区一样,这种特殊的算子状态,就叫作广播状态(BroadcastState)。
  因为广播状态在每个并行子任务上的实例都一样,所以在并行度调整的时候就比较简单, 只要复制一份到新的并行任务就可以实现扩展;而对于并行度缩小的情况,可以将多余的并行子任务连同状态直接砍掉——因为状态都是复制出来的,并不会丢失。
  在底层,广播状态是以类似映射结构(map)的键值对(key-value)来保存的,必须基于一个“广播流”(BroadcastStream)来创建。关于广播流,我们在第八章“广播连接流”的讲解中已经做过介绍,稍后还会在后面做一个总结。

3. 代码实现

  我们已经知道,状态从本质上来说就是算子并行子任务实例上的一个特殊本地变量。它的特殊之处就在于 Flink 会提供完整的管理机制,来保证它的持久化保存,以便发生故障时进行状态恢复;另外还可以针对不同的 key 保存独立的状态实例。按键分区状态(Keyed State)对这两个功能都要考虑;而算子状态(Operator State)并不考虑 key 的影响,所以主要任务就是要让 Flink 了解状态的信息、将状态数据持久化后保存到外部存储空间。
  看起来算子状态的使用应该更加简单才对。不过仔细思考又会发现一个问题:我们对状态进行持久化保存的目的是为了故障恢复;在发生故障、重启应用后,数据还会被发往之前分配的分区吗?显然不是,因为并行度可能发生了调整,不论是按键(key)的哈希值分区,还是直接轮询(round-robin)分区,数据分配到的分区都会发生变化。这很好理解,当打牌的人数从 3 个增加到 4 个时,即使牌的次序不变,轮流发到每个人手里的牌也会不同。数据分区发生变化,带来的问题就是,怎么保证原先的状态跟故障恢复后数据的对应关系呢?
  对于Keyed State 这个问题很好解决:状态都是跟 key 相关的,而相同key 的数据不管发往哪个分区,总是会全部进入一个分区的;于是只要将状态也按照 key 的哈希值计算出对应的分区,进行重组分配就可以了。恢复状态后继续处理数据,就总能按照 key 找到对应之前的状态,就保证了结果的一致性。所以 Flink 对Keyed State 进行了非常完善的包装,我们不需实现任何接口就可以直接使用。
  而对于 Operator State 来说就会有所不同。因为不存在 key,所有数据发往哪个分区是不可预测的;也就是说,当发生故障重启之后,我们不能保证某个数据跟之前一样,进入到同一个并行子任务、访问同一个状态。所以 Flink 无法直接判断该怎样保存和恢复状态,而是提供了接口,让我们根据业务需求自行设计状态的快照保存(snapshot)和恢复(restore)逻辑。

3.1 CheckpointedFunction 接口

  在 Flink 中,对状态进行持久化保存的快照机制叫作“检查点”(Checkpoint)。于是使用算子状态时,就需要对检查点的相关操作进行定义,实现一个 CheckpointedFunction 接口。

CheckpointedFunction 接口在源码中定义如下:

public interface CheckpointedFunction {
// 保存状态快照到检查点时,调用这个方法
void snapshotState(FunctionSnapshotContext context) throws Exception
// 初始化状态时调用这个方法,也会在恢复状态时调用
void	initializeState(FunctionInitializationContext context) throws Exception;

  每次应用保存检查点做快照时,都会调用.snapshotState()方法,将状态进行外部持久化。而在算子任务进行初始化时,会调用. initializeState()方法。这又有两种情况:一种是整个应用第一次运行,这时状态会被初始化为一个默认值(default value);另一种是应用重启时,从检查点(checkpoint)或者保存点(savepoint)中读取之前状态的快照,并赋给本地状态。所以, 接口中的.snapshotState()方法定义了检查点的快照保存逻辑,而. initializeState()方法不仅定义了初始化逻辑,也定义了恢复逻辑。
  这里需要注意,CheckpointedFunction 接口中的两个方法,分别传入了一个上下文(context)作为参数。不同的是,.snapshotState()方法拿到的是快照的上下文 FunctionSnapshotContext, 它可以提供检查点的相关信息,不过无法获取状态句柄;而. initializeState()方法拿到的是FunctionInitializationContext,这是函数类进行初始化时的上下文,是真正的“运行时上下文”。FunctionInitializationContext 中提供了“算子状态存储”(OperatorStateStore)和“按键分区状态
存储(” KeyedStateStore),在这两个存储对象中可以非常方便地获取当前任务实例中的 Operator
State 和Keyed State。例如:

ListStateDescriptor<String> descriptor = new ListStateDescriptor<>(
    "buffered-elements",
    Types.of(String)
    );
ListState<String> checkpointedState = context.getOperatorStateStore().getListState(descriptor);

  我们看到,算子状态的注册和使用跟 Keyed State 非常类似,也是需要先定义一个状态描述器(StateDescriptor),告诉 Flink 当前状态的名称和类型,然后从上下文提供的算子状态存储(OperatorStateStore)中获取对应的状态对象。如果想要从 KeyedStateStore 中获取 Keyed State也是一样的,前提是必须基于定义了 key 的KeyedStream,这和富函数类中的方式并不矛盾。通过这里的描述可以发现,CheckpointedFunction 是 Flink 中非常底层的接口,它为有状态的流处理提供了灵活且丰富的应用。

3.2 示例代码

  接下来我们举一个算子状态的应用案例。在下面的例子中,自定义的 SinkFunction 会在CheckpointedFunction 中进行数据缓存,然后统一发送到下游。这个例子演示了列表状态的平均分割重组(event-split redistribution)。

import com.atguigu.chapter05.ClickSource;
import com.atguigu.chapter05.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.util.ArrayList;
import java.util.List;

public class BufferingSinkExample {

    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env.enableCheckpointing(10000L);
//        env.setStateBackend(new EmbeddedRocksDBStateBackend());

//        env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(""));

        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        checkpointConfig.setMinPauseBetweenCheckpoints(500);
        checkpointConfig.setCheckpointTimeout(60000);
        checkpointConfig.setMaxConcurrentCheckpoints(1);
        checkpointConfig.enableExternalizedCheckpoints(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        checkpointConfig.enableUnalignedCheckpoints();


        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                );

        stream.print("input");

        // 批量缓存输出
        stream.addSink(new BufferingSink(10));

        env.execute();
    }

    public static class BufferingSink implements SinkFunction<Event>, CheckpointedFunction {
        private final int threshold;
        private transient ListState<Event> checkpointedState;
        private List<Event> bufferedElements;

        public BufferingSink(int threshold) {
            this.threshold = threshold;
            this.bufferedElements = new ArrayList<>();
        }

        @Override
        public void invoke(Event value, Context context) throws Exception {
            bufferedElements.add(value);
            if (bufferedElements.size() == threshold) {
                for (Event element: bufferedElements) {
                    // 输出到外部系统,这里用控制台打印模拟
                    System.out.println(element);
                }
                System.out.println("==========输出完毕=========");
                bufferedElements.clear();
            }
        }

        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            checkpointedState.clear();
            // 把当前局部变量中的所有元素写入到检查点中
            for (Event element : bufferedElements) {
                checkpointedState.add(element);
            }
        }

        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            ListStateDescriptor<Event> descriptor = new ListStateDescriptor<>(
                    "buffered-elements",
                    Types.POJO(Event.class));

            checkpointedState = context.getOperatorStateStore().getListState(descriptor);

            // 如果是从故障中恢复,就将ListState中的所有元素添加到局部变量中
            if (context.isRestored()) {
                for (Event element : checkpointedState.get()) {
                    bufferedElements.add(element);
                }
            }
        }
    }
}

  当初始化好状态对象后,我们可以通过调用. isRestored()方法判断是否是从故障中恢复。在代码中 BufferingSink 初始化时,恢复出的 ListState 的所有元素会添加到一个局部变量bufferedElements 中,以后进行检查点快照时就可以直接使用了。在调用.snapshotState()时,直接清空 ListState,然后把当前局部变量中的所有元素写入到检查点中。
  对于不同类型的算子状态,需要调用不同的获取状态对象的接口,对应地也就会使用不同的状态分配重组算法。比如获取列表状态时,调用.getListState() 会使用最简单的 平均分割重组(even-split redistribution)算法;而获取联合列表状态时,调用的是.getUnionListState() , 对应就会使用联合重组(union redistribution) 算法。

四、广播状态(Broadcast State)

1. 基本用法

  让所有并行子任务都持有同一份状态,也就意味着一旦状态有变化,所以子任务上的实例都要更新。什么时候会用到这样的广播状态呢?
  一个最为普遍的应用,就是“动态配置”或者“动态规则”。我们在处理流数据时,有时会基于一些配置(configuration)或者规则(rule)。简单的配置当然可以直接读取配置文件,一次加载,永久有效;但数据流是连续不断的,如果这配置随着时间推移还会动态变化,那又该怎么办呢?
  一个简单的想法是,定期扫描配置文件,发现改变就立即更新。但这样就需要另外启动一个扫描进程,如果扫描周期太长,配置更新不及时就会导致结果错误;如果扫描周期太短,又会耗费大量资源做无用功。解决的办法,还是流处理的“事件驱动”思路——我们可以将这动态的配置数据看作一条流,将这条流和本身要处理的数据流进行连接(connect),就可以实时地更新配置进行计算了。
  由于配置或者规则数据是全局有效的,我们需要把它广播给所有的并行子任务。而子任务需要把它作为一个算子状态保存起来,以保证故障恢复后处理结果是一致的。这时的状态,就是一个典型的广播状态。我们知道,广播状态与其他算子状态的列表(list)结构不同,底层是以键值对(key-value)形式描述的,所以其实就是一个映射状态(MapState)。
  在代码上,可以直接调用 DataStream 的.broadcast()方法,传入一个“映射状态描述器”
(MapStateDescriptor)说明状态的名称和类型,就可以得到一个“广播流”(BroadcastStream);进而将要处理的数据流与这条广播流进行连接( connect ), 就会得到“ 广播连接流”
(BroadcastConnectedStream)。注意广播状态只能用在广播连接流中。
  关于广播连接流,我们已经在前面做过介绍,这里可以复习一下:

MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(...);
BroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);
DataStream<String> output = stream
        .connect(ruleBroadcastStream)
        .process( new BroadcastProcessFunction<>() {...} );

  这里我们定义了一个“规则流”ruleStream,里面的数据表示了数据流 stream 处理的规则, 规则的数据类型定义为Rule。于是需要先定义一个 MapStateDescriptor 来描述广播状态,然后传入 ruleStream.broadcast()得到广播流,接着用 stream 和广播流进行连接。这里状态描述器中的 key 类型为 String,就是为了区分不同的状态值而给定的 key 的名称。
  对 于 广 播 连 接 流 调 用 .process() 方 法 , 可 以 传 入 “ 广 播 处 理 函 数 ” KeyedBroadcastProcessFunction 或者BroadcastProcessFunction 来进行处理计算。广播处理函数里面有两个方法.processElement()和.processBroadcastElement(),源码中定义如下:

public abstract class BroadcastProcessFunction<IN1, IN2, OUT>	extends BaseBroadcastProcessFunction {
...

public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
...

}

  这里的.processElement()方法,处理的是正常数据流,第一个参数 value 就是当前到来的流数据;而.processBroadcastElement()方法就相当于是用来处理广播流的,它的第一个参数 value 就是广播流中的规则或者配置数据。两个方法第二个参数都是一个上下文 ctx,都可以通过调用.getBroadcastState()方法获取到当前的广播状态;区别在于,.processElement()方法里的上下文是“ 只读” 的( ReadOnly ), 因此获取到的广播状态也只能读取不能更改;而.processBroadcastElement()方法里的 Context 则没有限制,可以根据当前广播流中的数据更新状态。

Rule rule = ctx.getBroadcastState( new MapStateDescriptor<>("rules", Types.String,Types.POJO(Rule.class))).get("my rule");

  通过调用 ctx.getBroadcastState()方法,传入一个 MapStateDescriptor,就可以得到当前的叫作“rules”的广播状态;调用它的.get()方法,就可以取出其中“my rule”对应的值进行计算处理。

2. 代码实例

  接下来我们举一个广播状态的应用案例。考虑在电商应用中,往往需要判断用户先后发生的行为的“组合模式”,比如“登录-下单”或者“登录-支付”,检测出这些连续的行为进行统计,就可以了解平台的运用状况以及用户的行为习惯。
具体代码如下:

import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;

public class BroadcastStateExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 读取用户行为事件流
        DataStreamSource<Action> actionStream = env.fromElements(
                new Action("Alice", "login"),
                new Action("Alice", "pay"),
                new Action("Bob", "login"),
                new Action("Bob", "buy")
        );

        // 定义行为模式流,代表了要检测的标准
        DataStreamSource<Pattern> patternStream = env
                .fromElements(
                        new Pattern("login", "pay"),
                        new Pattern("login", "buy")
                );

        // 定义广播状态的描述器,创建广播流
        MapStateDescriptor<Void, Pattern> bcStateDescriptor = new MapStateDescriptor<>(
                "patterns", Types.VOID, Types.POJO(Pattern.class));
        BroadcastStream<Pattern> bcPatterns = patternStream.broadcast(bcStateDescriptor);

        // 将事件流和广播流连接起来,进行处理
        DataStream<Tuple2<String, Pattern>> matches = actionStream
                .keyBy(data -> data.userId)
                .connect(bcPatterns)
                .process(new PatternEvaluator());

        matches.print();

        env.execute();
    }

    public static class PatternEvaluator
            extends KeyedBroadcastProcessFunction<String, Action, Pattern, Tuple2<String, Pattern>> {

        // 定义一个值状态,保存上一次用户行为
        ValueState<String> prevActionState;

        @Override
        public void open(Configuration conf) {
            prevActionState = getRuntimeContext().getState(
                    new ValueStateDescriptor<>("lastAction", Types.STRING));
        }

        @Override
        public void processBroadcastElement(
                Pattern pattern,
                Context ctx,
                Collector<Tuple2<String, Pattern>> out) throws Exception {

            BroadcastState<Void, Pattern> bcState = ctx.getBroadcastState(
                    new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class)));

            // 将广播状态更新为当前的pattern
            bcState.put(null, pattern);
        }

        @Override
        public void processElement(Action action, ReadOnlyContext ctx,
                                   Collector<Tuple2<String, Pattern>> out) throws Exception {
            Pattern pattern = ctx.getBroadcastState(
                    new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class))).get(null);

            String prevAction = prevActionState.value();
            if (pattern != null && prevAction != null) {
                // 如果前后两次行为都符合模式定义,输出一组匹配
                if (pattern.action1.equals(prevAction) && pattern.action2.equals(action.action)) {
                    out.collect(new Tuple2<>(ctx.getCurrentKey(), pattern));
                }
            }
            // 更新状态
            prevActionState.update(action.action);
        }
    }

    // 定义用户行为事件POJO类
    public static class Action {
        public String userId;
        public String action;

        public Action() {
        }

        public Action(String userId, String action) {
            this.userId = userId;
            this.action = action;
        }

        @Override
        public String toString() {
            return "Action{" +
                    "userId=" + userId +
                    ", action='" + action + '\'' +
                    '}';
        }
    }

    // 定义行为模式POJO类,包含先后发生的两个行为
    public static class Pattern {
        public String action1;
        public String action2;

        public Pattern() {
        }

        public Pattern(String action1, String action2) {
            this.action1 = action1;
            this.action2 = action2;
        }

        @Override
        public String toString() {
            return "Pattern{" +
                    "action1='" + action1 + '\'' +
                    ", action2='" + action2 + '\'' +
                    '}';
        }
    }
}

  这里我们将检测的行为模式定义为 POJO 类 Pattern,里面包含了连续的两个行为。由于广播状态中只保存了一个 Pattern,并不关心 MapState 中的 key,所以也可以直接将 key 的类型指定为 Void,具体值就是 null。在具体的操作过程中,我们将广播流中的 Pattern 数据保存为广播变量;在行为数据 Action 到来之后读取当前广播变量,确定行为模式,并将之前的一次行为保存为一个 ValueState——这是针对当前用户的状态保存,所以用到了 Keyed State。检测到如果前一次行为与 Pattern 中的 action1 相同,而当前行为与 action2 相同,则发现了匹配模式的一组行为,输出检测结果。

五、状态持久化和状态后端

  在 Flink 的状态管理机制中,很重要的一个功能就是对状态进行持久化(persistence)保存,这样就可以在发生故障后进行重启恢复。Flink 对状态进行持久化的方式,就是将当前所有分布式状态进行“快照”保存,写入一个“检查点”(checkpoint)或者保存点(savepoint)保存到外部存储系统中。具体的存储介质,一般是分布式文件系统(distributed file system)。

1. 检查点(Checkpoint)

  有状态流应用中的检查点(checkpoint),其实就是所有任务的状态在某个时间点的一个快照(一份拷贝)。简单来讲,就是一次“存盘”,让我们之前处理数据的进度不要丢掉。在一个流应用程序运行时,Flink 会定期保存检查点,在检查点中会记录每个算子的 id 和状态;如果发生故障,Flink 就会用最近一次成功保存的检查点来恢复应用的状态,重新启动处理流程, 就如同“读档”一样。
  如果保存检查点之后又处理了一些数据,然后发生了故障,那么重启恢复状态之后这些数据带来的状态改变会丢失。为了让最终处理结果正确,我们还需要让源(Source)算子重新读取这些数据,再次处理一遍。这就需要流的数据源具有“数据重放”的能力,一个典型的例子就是Kafka,我们可以通过保存消费数据的偏移量、故障重启后重新提交来实现数据的重放。这是对“至少一次”(at least once)状态一致性的保证,如果希望实现“精确一次”(exactly once)的一致性,还需要数据写入外部系统时的相关保证。关于这部分内容我们会在后面继续讨论。
  默认情况下, 检查点是被禁用的, 需要在代码中手动开启。直接调用执行环境的.enableCheckpointing()方法就可以开启检查点。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getEnvironment();

env.enableCheckpointing(1000);

这里传入的参数是检查点的间隔时间,单位为毫秒。关于检查点的详细配置,后面会详细讲。
  除了检查点之外,Flink 还提供了“保存点”(savepoint)的功能。保存点在原理和形式上跟检查点完全一样,也是状态持久化保存的一个快照;区别在于,保存点是自定义的镜像保存,所以不会由 Flink 自动创建,而需要用户手动触发。这在有计划地停止、重启应用时非常有用。

2. 状态后端(State Backends)

  检查点的保存离不开 JobManager 和 TaskManager,以及外部存储系统的协调。在应用进行检查点保存时,首先会由 JobManager 向所有 TaskManager 发出触发检查点的命令; TaskManger 收到之后,将当前任务的所有状态进行快照保存,持久化到远程的存储介质中; 完成之后向 JobManager 返回确认信息。这个过程是分布式的,当 JobManger 收到所有TaskManager 的返回信息后,就会确认当前检查点成功保存,如图所示。而这一切工作的协调,就需要一个“专职人员”来完成。

在这里插入图片描述
  在 Flink 中,状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就叫作状态后端(state backend)。状态后端主要负责两件事:一是本地的状态管理,二是将检查点(checkpoint)写入远程的持久化存储。

2.1 状态后端的分类

  状态后端是一个“开箱即用”的组件,可以在不改变应用程序逻辑的情况下独立配置。Flink 中提供了两类不同的状态后端,一种是“哈希表状态后端”(HashMapStateBackend),另一种是“内嵌 RocksDB 状态后端”(EmbeddedRocksDBStateBackend)。如果没有特别配置,系统默认的状态后端是HashMapStateBackend。
(1)哈希表状态后端(HashMapStateBackend)
  这种方式就是我们之前所说的,把状态存放在内存里。具体实现上,哈希表状态后端在内部会直接把状态当作对象(objects),保存在 Taskmanager 的 JVM 堆(heap)上。普通的状态,以及窗口中收集的数据和触发器(triggers),都会以键值对(key-value)的形式存储起来,所以底层是一个哈希表(HashMap),这种状态后端也因此得名。
  对于检查点的保存,一般是放在持久化的分布式文件系统(file system)中,也可以通过配置“检查点存储”(CheckpointStorage)来另外指定。
  HashMapStateBackend 是将本地状态全部放入内存的,这样可以获得最快的读写速度,使计算性能达到最佳;代价则是内存的占用。它适用于具有大状态、长窗口、大键值状态的作业, 对所有高可用性设置也是有效的。
(2)内嵌RocksDB 状态后端(EmbeddedRocksDBStateBackend)
  RocksDB 是一种内嵌的 key-value 存储介质,可以把数据持久化到本地硬盘。配置EmbeddedRocksDBStateBackend 后,会将处理中的数据全部放入 RocksDB 数据库中,RocksDB 默认存储在TaskManager 的本地数据目录里。
  与 HashMapStateBackend 直接在堆内存中存储对象不同,这种方式下状态主要是放在RocksDB 中的。数据被存储为序列化的字节数组(Byte Arrays),读写操作需要序列化/反序列化,因此状态的访问性能要差一些。另外,因为做了序列化,key 的比较也会按照字节进行, 而不是直接调用.hashCode()和.equals()方法。
  对于检查点,同样会写入到远程的持久化文件系统中。
  EmbeddedRocksDBStateBackend 始终执行的是异步快照,也就是不会因为保存检查点而阻塞数据的处理;而且它还提供了增量式保存检查点的机制,这在很多情况下可以大大提升保存效率。
由于它会把状态数据落盘,而且支持增量化的检查点,所以在状态非常大、窗口非常长、键/值状态很大的应用场景中是一个好选择,同样对所有高可用性设置有效。

2.2 如何选择正确的状态后端

  HashMap 和RocksDB 两种状态后端最大的区别,就在于本地状态存放在哪里:前者是内存,后者是 RocksDB。在实际应用中,选择那种状态后端,主要是需要根据业务需求在处理性能和应用的扩展性上做一个选择。
  HashMapStateBackend 是内存计算,读写速度非常快;但是,状态的大小会受到集群可用内存的限制,如果应用的状态随着时间不停地增长,就会耗尽内存资源。
  而 RocksDB 是硬盘存储,所以可以根据可用的磁盘空间进行扩展,而且是唯一支持增量检查点的状态后端,所以它非常适合于超级海量状态的存储。不过由于每个状态的读写都需要做序列化/反序列化,而且可能需要直接从磁盘读取数据,这就会导致性能的降低,平均读写性能要比HashMapStateBackend 慢一个数量级。
  我们可以发现,实际应用就是权衡利弊后的取舍。最理想的当然是处理速度快且内存不受限制可以处理海量状态,那就需要非常大的内存资源了,这会导致成本超出项目预算。比起花更多的钱,稍慢的处理速度或者稍小的处理规模,老板可能更容易接受一点。

2.3 状态后端的配置

  在不做配置的时候,应用程序使用的默认状态后端是由集群配置文件 flink-conf.yaml 中指定的,配置的键名称为 state.backend。这个默认配置对集群上运行的所有作业都有效,我们可以通过更改配置值来改变默认的状态后端。另外,我们还可以在代码中为当前作业单独配置状态后端,这个配置会覆盖掉集群配置文件的默认值。
(1)配置默认的状态后端
  在flink-conf.yaml中,可以使用 state.backend 来配置默认状态后端。
  配置项的可能值为hashmap,这样配置的就是HashMapStateBackend;也可以是rocksdb,这样配置的就是 EmbeddedRocksDBStateBackend。另外,也可以是一个实现了状态后端工厂StateBackendFactory 的类的完全限定类名。
下面是一个配置HashMapStateBackend 的例子:

# 默认状态后端
state.backend: hashmap # 存放检查点的文件路径
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints

  这里的state.checkpoints.dir配置项,定义了状态后端将检查点和元数据写入的目录。
(2)为每个作业(Per-job)单独配置状态后端
  每个作业独立的状态后端,可以在代码中,基于作业的执行环境直接设置。代码如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());

  上面代码设置的是HashMapStateBackend,如果想要设置 EmbeddedRocksDBStateBackend,可以用下面的配置方式:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend());

需要注意,如果想在 IDE 中使用 EmbeddedRocksDBStateBackend,需要为 Flink 项目添加依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
    <version>1.13.0</version>
</dependency>

  而由于 Flink 发行版中默认就包含了RocksDB,所以只要我们的代码中没有使用 RocksDB的相关内容,就不需要引入这个依赖。即使我们在 flink-conf.yaml 配置文件中设定了state.backend 为 rocksdb,也可以直接正常运行,并且使用 RocksDB 作为状态后端。

参考资料

Word版:https://download.csdn.net/download/mengxianglong123/85035166
PDF版:https://download.csdn.net/download/mengxianglong123/85035172

需要的可以私信我,免费

Logo

华为云1024程序员节送福利,参与活动赢单人4000元礼包,更有热门技术干货免费学习

更多推荐