一:connector

一:sink v2

一:二阶段sink总架构

二阶段sink   先上图

新的sink架构中要实现二阶段的sink需要实现TwoPhaseCommittingSink

相关接口类图如下:

其中:PrecommittingSinkWriter是二阶段提交的第一阶段,Committer是第二阶段

在转换为执行operator的时候,会把TwoPhaseCommittingSink转换为2个operator,具体逻辑是在生成streamgraph时实现的,具体逻辑在SinkTransformationTranslator中

后续最终转换为了SinkWriterOperator和CommitterOperator,数据流向是从SinkWriterOperator流入CommitterOperator(以kafka为例,Writer负责producer.send,并且通过emitCommittables方法向commiter发送Collection<KafkaCommittable>,commiter最终会调用到producer.commitTransaction提交事务,中间会通过CheckpointCommittableManager组件进行通信)

具体代码详见:FLINK-25575和FLINK-25573

  1. FLINK-25575

二:kafka

官方给出的kafka案例如下

public class KafkaExample extends KafkaExampleUtil {

    public static void main(String[] args) throws Exception {
        // parse input arguments
        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
        StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);

        DataStream<Integer> input =
                env.fromSource(
                        KafkaSource.<Integer>builder()
                                .setBootstrapServers(
                                        parameterTool
                                                .getProperties()
                                                .getProperty(
                                                        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
                                .setBounded(OffsetsInitializer.latest())
                                .setDeserializer(
                                        KafkaRecordDeserializationSchema.valueOnly(
                                                IntegerDeserializer.class))
                                .setTopics(parameterTool.getRequired("input-topic"))
                                .build(),
                        WatermarkStrategy.noWatermarks(),
                        "kafka-source");

        input.sinkTo(
                KafkaSink.<Integer>builder()
                        .setBootstrapServers(
                                parameterTool
                                        .getProperties()
                                        .getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
                        .setRecordSerializer(
                                KafkaRecordSerializationSchema.builder()
//                                        .setTopic(parameterTool.getRequired("output-topic"))
                                        .setTopicSelector(new TopicSelector<Integer>() {
                                            @Override
                                            public String apply(Integer integer) {
                                                return null;
                                            }
                                        })
                                        .setKafkaValueSerializer(IntegerSerializer.class)
                                        .build())
                        .build());
        env.execute("Smoke Kafka Example");
    }
}

其中的setTopicSelector可以很好的实现kafka分流,不需要在像Flink动态分流到kafka,hbase - 百度文库

中这么复杂了,还提供了setPartitioner,可以根据数据指定分区

三:es sink

es并不支持二阶段sink,所以不能保证exactly-once语义,essink涉及的类如下:

BulkProcessorConfig:保存批量插入的配置的,属于ElasticsearchSink的一个属性,用于构造writer
ElasticsearchEmitter:用户编写逻辑用于构造es的request
ElasticsearchSink:sink类
ElasticsearchSinkBuilderBase:用于工厂模式生成sink的
ElasticsearchWriter:sink类的对象,是sinkWriter的实现,里面包含Emitter,es的client,es的bulkProcessor
FlushBackoffType:重试机制,es请求发送失败时的重试机制,有三种
NetworkClientConfig:保存链接es的一些配置的,属于ElasticsearchSink的一个属性,用于构造writer
RequestIndexer:用于保存请求的

二:sql

一:托管表(manager table)

设计说明网站

FLIP-188: Introduce Built-in Dynamic Table Storage - Apache Flink - Apache Software Foundation

git地址:

https://github.com/JingsongLi/flink/tree/table_storage

一:Data Retention

主要类:

CatalogManager
ManagedTableListener
ManagedTableFactory

类的包含关系如下 

ManagedTableListener最终会调用上面设计说明网站说明中的ManagedTableFactory相关接口

实际实现中又多了个onCompactTable,对应于后面实现fileStore的Compaction功能的。

整个实现过程主要是通过观察者模式和工厂模式实现的

二:Concurrent Write

可以看出在执行compact语法时,多个线程可能会操作fileStore,所以引入了锁,锁是乐观锁,用到了快照,通过重命名hdfs文件方式,当前规划是先实现hive的锁,hive锁机制数仓hive锁(Hive Lock)_Vincer_DB的博客-CSDN博客_hive lock

代码提交记录为:

  1. FLINK-25173

主要的实体类是HiveCatalogLock,主要逻辑如下:

可以看出,是在表级别加排他锁,默认尝试100次 

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐