本文参考 Flink 1.14.0 版本

前置知识

如果只想了解 Flink 几种 Join 可以跳过。要是需要上手开发,前置知识必须先掌握。
Calcite、Flink SQL 相关知识,网上已有很多关于这方面的文章,可参考:
flink sql 知其所以然(六)| flink sql 约会 calcite(看这篇就够了) - 掘金
[源码分析] 带你梳理 Flink SQL / Table API内部执行流程

Flink Join

常规Join

例如常用的内联接:

SELECT * FROM Orders
JOIN Product
ON Orders.productId = Product.id

这种 JOIN 要求 JOIN 两边数据都永久保留在 Flink state 中,才能保证输出结果的准确性,这将导致 State 的无限膨胀。
可以配置 state 的TTL(time-to-live:table.exec.state.ttl)来避免其无限增长,但请注意这可能会影响查询结果的准备性。

Interval Join

根据 JOIN 条件和时间限制进行的 JOIN。它基于两个 KeyStream,按照 JOIN 条件将一条流上的每条数据与另一条流上不同时间窗口的数据进行连接。
例如,查询订单及关联的支付信息,其中支付是在下单时间前后各1小时内:

SELECT
  ...
FROM
  Orders AS o JOIN Payment AS p ON
  o.orderId = p.orderId AND
  p.payTime BETWEEN orderTime - INTERVAL '1' HOUR AND
  orderTime + INTERVAL '1' HOUR

Temporal join

首先介绍一个时态表的概念,这是一个随时间不断变化的动态表,它可能包含表的多个快照。
对于时态表中的记录,可以追踪、访问其历史版本的表称为版本表,如数据库的 changeLog;
只能追踪、访问最新版本的表称为普通表,如数据库的表。


在Flink中,定义了主键约束和事件时间属性的表就是版本表

Temporal Join 允许 JOIN 版本表,即主表可以用一个不断更新的版本表,根据时间和等值关联条件来扩充其详细信息。两个表必须同时为事件时间或处理时间。

  • 当使用事件时间时,版本表保留从上一个 watermark 到当前时刻的所有版本数据,左右表都需要配置好 watermark;右表必须为 CDC 数据,正确配置主键,且主键必须在 JOIN 的等值关联条件中。例如:
-- 左表为普通的 append-only 表.
CREATE TABLE orders (
    order_id    STRING,
    price       DECIMAL(32,2),
    currency    STRING,
    order_time  TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time
) WITH (/* ... */);

-- 右表为汇率的版本表,CDC 数据
CREATE TABLE currency_rates (
    currency STRING,
    conversion_rate DECIMAL(32, 2),
    update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,
    WATERMARK FOR update_time AS update_time,
    PRIMARY KEY(currency) NOT ENFORCED
) WITH (
   'connector' = 'kafka',
   'value.format' = 'debezium-json',
   /* ... */
);

SELECT 
     order_id,
     price,
     currency,
     conversion_rate,
     order_time
FROM orders
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
-- 主键必须在关联条件中
ON orders.currency = currency_rates.currency;

order_id  price  currency  conversion_rate  order_time
========  =====  ========  ===============  =========
o_001     11.11  EUR       1.14             12:00:00
o_002     12.51  EUR       1.10             12:06:00
  • 当使用处理时间时,用户可以将 Lookup 表(右表)看成一个普通的HashMap,它存储了最新的全量数据。Flink 可直接 JOIN 一个外部数据库系统的表,而无须存储最新版本的状态。例如:
SELECT
  o.amout, o.currency, r.rate, o.amount * r.rate
FROM
  Orders AS o
  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
  ON r.currency = o.currency;

-- 或 Join 一个表函数
SELECT
  o_amount, r_rate
FROM
  Orders,
  LATERAL TABLE (Rates(o_proctime))
WHERE
  r_currency = o_currency

注意:"FOR SYSTEM_TIME AS OF"语法不支持 VIEW/任意最新表是因为考虑到Flink的实现与其语义不大相符,左流的 JOIN 处理不会等待右边的版本表(VIEW/表函数)完成快照后才进行。个人理解可能会导致左表 JOIN 上的右表并不一定是当前最新的数据。

Lookup Join

同基于事件时间的 Temporal Join,以 JOIN 算子执行时的时间点查询右表的数据进行关联。一般用于维表关联,只支持等值 JOIN。例如:

SELECT
  o.amout, o.currency, r.rate, o.amount * r.rate
FROM
  Orders AS o
  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
  ON r.currency = o.currency;

Lookup Join 执行流程

本文以 Flink 单测用例为例进行讲解,新手可以基于此上手开发自定义的 Rule。

准备工作

编译 Flink Table 模块

flink-table 目录下执行:mvn clean package -Pfast,hive-2.1.1,scala-2.12 -DskipTests

打开单测文件

Flink Rule 的 UT 包含:

  • 逻辑计划测试:flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical
  • 物理计划测试:flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql、XXX/batch/sql
  • 集成测试:flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql、XXX/batch/sql

这也是向社区提交 Rule 相关 PR 需要完成的 UT

打开日志级别

在需要单测的代码前,加上:Configurator.setAllLevels("", Level.TRACE)

跟踪sql执行

  • 下文基于文件:flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala 的执行进行分析。
  • 执行单测:testJoinTemporalTable
    SELECT * FROM MyTable AS T JOIN LookupTable
    FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id

sql解析

parser(calcite语法支持)会将SQL语句 "FOR SYSTEM_TIME AS OF " 解析成 SqlSnapshot (
SqlNode),validate() 将其转换成 LogicalSnapshot(RelNode),可以看到逻辑执行计划:

LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
  LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
    LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
    LogicalFilter(condition=[=($cor0.a, $0)])
      LogicalSnapshot(period=[$cor0.proctime])
        LogicalTableScan(table=[[default_catalog, default_database, LookupTable]])

优化器优化

FlinkStreamProgram/FlinkBatchProgram中定义了一系列规则,对逻辑/物理计划进行转换和优化。
该案例中会经历下边的几个重要的转换过程:

  1. LogicalCorrelateToJoinFromLookupTableRuleWithFilter:
// 从类的定义可以看出,上方的逻辑计划能匹配上该规则
class LogicalCorrelateToJoinFromLookupTableRuleWithFilter
  extends LogicalCorrelateToJoinFromLookupTemporalTableRule(
    operand(classOf[LogicalCorrelate],
      operand(classOf[RelNode], any()),
      operand(classOf[LogicalFilter],
        operand(classOf[LogicalSnapshot],
          operand(classOf[RelNode], any())))),
    "LogicalCorrelateToJoinFromLookupTableRuleWithFilter"
  ) {
    override def matches(call: RelOptRuleCall): Boolean = {
      val snapshot: LogicalSnapshot = call.rel(3)
      val snapshotInput: RelNode = trimHep(call.rel(4))
      isLookupJoin(snapshot, snapshotInput)
    }
    ……
}
// 匹配到规则后判断是否为 lookupJoin
protected def isLookupJoin(snapshot: LogicalSnapshot, snapshotInput: RelNode): Boolean = {
  ……
  // 是处理时间 且 快照的表为LookupTableSource
  isProcessingTime && snapshotOnLookupSource
}

匹配到后,会将LogicalCorrelate转换成LogicalJoin

 LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
+- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
   +- LogicalSnapshot(period=[$cor0.proctime])
      +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]])
  1. FlinkProjectJoinTransposeRule + ProjectRemoveRule:Project算子下推并裁剪
// 对调Project和下方的Join算子,实现下推Project
public FlinkProjectJoinTransposeRule(
        PushProjector.ExprCondition preserveExprCondition, RelBuilderFactory relFactory) {
    super(operand(Project.class, operand(Join.class, any())), relFactory, null);
    this.preserveExprCondition = preserveExprCondition;
}

优化后:

LogicalJoin(condition=[=($0, $5)], joinType=[inner])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+- LogicalSnapshot(period=[$cor0.proctime])
   +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]])
  1. 接下来的Volcano规则会对逻辑计划进行组合优化,生成最优的计划。可以看到执行后,最优结果为:

    12129 [main] DEBUG org.apache.calcite.plan.RelOptPlanner [] - Cheapest plan:
    FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner]): rowcount = 3.0E7, cumulative cost = {4.0E8 rows, 5.0E8 cpu, 1.37E10 io, 0.0 network, 0.0 memory}, id = 403
      FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 4.8E9 io, 0.0 network, 0.0 memory}, id = 378
      FlinkLogicalSnapshot(period=[$cor0.proctime]): rowcount = 1.0E8, cumulative cost = {2.0E8 rows, 2.0E8 cpu, 4.0E9 io, 0.0 network, 0.0 memory}, id = 402
        FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable]], fields=[id, name, age]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.0E9 io, 0.0 network, 0.0 memory}, id = 381
    
    // 最后结果:
    FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner])
    :- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
    +- FlinkLogicalSnapshot(period=[$cor0.proctime])
       +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable]], fields=[id, name, age])    
    

    尝试规则

    Rules                                                                   Attempts           Time (us)
    FlinkJoinPushExpressionsRule                                                   2                 553
    JoinConditionPushRule                                                          2                 152
    FlinkLogicalTableSourceScanConverter(in:NONE,out:LOGICAL)                      1              54,956
    FlinkLogicalJoinConverter(in:NONE,out:LOGICAL)                                 1               4,787
    FlinkLogicalSnapshotConverter(in:NONE,out:LOGICAL)                             1               3,162
    FlinkLogicalDataStreamTableScanConverter(in:NONE,out:LOGICAL)                   1               1,403
    SimplifyJoinConditionRule                                                      1                 249
    * Total                                                                        9              65,262
    

    其中:几个Converter放在LOGICAL_CONVERTERS中,该集合包含了一系列将 Calcite node 转换成 Flink node 的逻辑规则。

    • 比如:FlinkLogicalSnapshotConverter:
// 把 LogicalSnapshot 转换成  FlinkLogicalSnapshot
class FlinkLogicalSnapshotConverter
  extends ConverterRule(
  // 匹配 LogicalSnapshot 类型,且没有Convention,输出的为 FlinkConventions.LOGICAL
    classOf[LogicalSnapshot],
    Convention.NONE,
    FlinkConventions.LOGICAL,
    "FlinkLogicalSnapshotConverter") {

  def convert(rel: RelNode): RelNode = {
    val snapshot = rel.asInstanceOf[LogicalSnapshot]
    val newInput = RelOptRule.convert(snapshot.getInput, FlinkConventions.LOGICAL)
    FlinkLogicalSnapshot.create(newInput, snapshot.getPeriod)
  }
}
  1. 增加处理时间实体化的算子
// convert time indicators
chainedProgram.addLast(TIME_INDICATOR, new FlinkRelTimeIndicatorProgram)
// 如果是事件时间,且必要的情况下,这里会创建一个 sqlFunction 来实现
rexBuilder.makeCall(FlinkSqlOperatorTable.PROCTIME_MATERIALIZE, expr)

经转换:

FlinkLogicalCalc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age])
+- FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner])
   :- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
   +- FlinkLogicalSnapshot(period=[$cor0.proctime])
      +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable]], fields=[id, name, age])

物理规则优化

经下述物理Volcano规则处理后

FlinkJoinPushExpressionsRule          
JoinConditionPushRule                                                  
StreamPhysicalTableSourceScanRule(in:LOGICAL,out:STREAM_PHYSICAL)
FlinkLogicalTableSourceScanConverter(in:NONE,out:LOGICAL)      
StreamPhysicalSnapshotOnTableScanRule                       
StreamPhysicalCalcRule(in:LOGICAL,out:STREAM_PHYSICAL)    
FlinkLogicalJoinConverter(in:NONE,out:LOGICAL)         
StreamPhysicalDataStreamScanRule(in:LOGICAL,out:STREAM_PHYSICAL) 
FlinkLogicalSnapshotConverter(in:NONE,out:LOGICAL)                 
FlinkLogicalDataStreamTableScanConverter(in:NONE,out:LOGICAL)
SimplifyJoinConditionRule 

得到最优结果:

Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age])
+- LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age])
   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
  • StreamPhysicalCalcRule:将FlinkLogicalCalc转换成StreamPhysicalCalc
  • SnapshotOnTableScanRule:将
FlinkLogicalJoin
+-  FlinkLogicalDataStreamTableScan
+-  FlinkLogicalSnapshot
    +- FlinkLogicalTableSourceScan

转换成

StreamPhysicalLookupJoin
+- StreamPhysicalDataStreamScan

这里是LookupJoin的关键转换逻辑:

// 该规则使用父类的匹配条件
class SnapshotOnTableScanRule
  extends BaseSnapshotOnTableScanRule("StreamPhysicalSnapshotOnTableScanRule") {
}
// 可以看到,正好匹配上未优化前的逻辑计划
abstract class BaseSnapshotOnTableScanRule(description: String)
  extends RelOptRule(
    operand(classOf[FlinkLogicalJoin],
      operand(classOf[FlinkLogicalRel], any()),
      operand(classOf[FlinkLogicalSnapshot],
        operand(classOf[TableScan], any()))),
    description)
  with CommonLookupJoinRule 

private def doTransform(
  join: FlinkLogicalJoin,
  input: FlinkLogicalRel,
  temporalTable: RelOptTable,
  calcProgram: Option[RexProgram]): StreamPhysicalLookupJoin = {
    
  val joinInfo = join.analyzeCondition

  val cluster = join.getCluster

  val providedTrait = join.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
  val requiredTrait = input.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
   //将input从逻辑节点转换成物理节点,这里会触发 StreamPhysicalDataStreamScanRule,
   //把FlinkLogicalTableSourceScan转换成StreamPhysicalDataStreamScan
  val convInput = RelOptRule.convert(input, requiredTrait)
  new StreamPhysicalLookupJoin(
    cluster,
    providedTrait,
    convInput,
    temporalTable,
    calcProgram,
    joinInfo,
    join.getJoinType)
}

至此完成物理计划的转换

翻译物理计划

planner.translate()
其中包括了:

val execGraph = translateToExecNodeGraph(optimizedRelNodes)
val transformations = translateToPlan(execGraph)

在translateToExecNodeGraph中:会调用物理计划生成最后节点的translateToExecNode方法。如

  • StreamPhysicalLookupJoin会转换成StreamExecLookupJoin
    在translateToPlan中:调用ExecNode的translateToPlanInternal方法。以CommonExecLookupJoin为例:
protected CommonExecLookupJoin(……){
    //这里忽略校验和异步LookupFunction逻辑
    public Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
        // -----------创建lookupFunction Operator的工厂---------------
        RelOptTable temporalTable = temporalTableSourceSpec.getTemporalTable(planner);
    
        UserDefinedFunction userDefinedFunction =
                LookupJoinUtil.getLookupFunction(temporalTable, lookupKeys.keySet());
        UserDefinedFunctionHelper.prepareInstance(
                planner.getTableConfig().getConfiguration(), userDefinedFunction);
    
        boolean isLeftOuterJoin = joinType == FlinkJoinType.LEFT;
        StreamOperatorFactory<RowData> operatorFactory;
    
        operatorFactory =
                createSyncLookupJoin(
                        temporalTable,
                        planner.getTableConfig(),
                        lookupKeys,
                        (TableFunction<Object>) userDefinedFunction,
                        planner.getRelBuilder(),
                        inputRowType,
                        tableSourceRowType,
                        resultRowType,
                        isLeftOuterJoin,
                        planner.getExecEnv().getConfig().isObjectReuseEnabled());
        //-------------------------------------------------------
        // 转换成Transformation
        Transformation<RowData> inputTransformation =
                (Transformation<RowData>) inputEdge.translateToPlan(planner);
        return new OneInputTransformation<>(
                inputTransformation,
                getDescription(),
                operatorFactory,
                InternalTypeInfo.of(resultRowType),
                inputTransformation.getParallelism());
    }
}
//只罗列核心逻辑,主要分三块
private StreamOperatorFactory<RowData> createSyncLookupJoin() {
    // 通过codeGenerator,生成lookupFunction的函数,包装成FlatMap函数
    GeneratedFunction<FlatMapFunction<RowData, RowData>> generatedFetcher =
        LookupJoinCodeGenerator.generateSyncLookupFunction();
    // 生成表函数的输出结果的Collector
    GeneratedCollector<TableFunctionCollector<RowData>> generatedCollector =
        LookupJoinCodeGenerator.generateCollector();
    // 最后会生成LookupJoinRunner的ProcessFunction
    // 如果在lookupJoin这一侧(即右表)有Calc的话,该Runner中会带有Calc的计算逻辑
    // 比如:SELECT * FROM T JOIN DIM FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.b + 1
    // Fetcher会读出LookupFunction中的原始数据,再经过calc计算后,再与主表(左流)的数据进行比对
    GeneratedFunction<FlatMapFunction<RowData, RowData>> generatedCalc =
        LookupJoinCodeGenerator.generateCalcMapFunction(
                config,
                JavaScalaConversionUtil.toScala(projectionOnTemporalTable),
                filterOnTemporalTable,
                temporalTableOutputType,
                tableSourceRowType);
    
    ProcessFunction<RowData, RowData> processFunc =
            new LookupJoinWithCalcRunner(
                    generatedFetcher,
                    generatedCalc,
                    generatedCollector,
                    isLeftOuterJoin,
                    rightRowType.getFieldCount());
}

最后再Transformations->StreamGraph->JobGraph,与DataStream API的流程就统一了。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐