flink1.15 部分新特性解析
一:connector
一:sink v2
一:二阶段sink总架构
二阶段sink 先上图
新的sink架构中要实现二阶段的sink需要实现TwoPhaseCommittingSink
相关接口类图如下:
其中:PrecommittingSinkWriter是二阶段提交的第一阶段,Committer是第二阶段
在转换为执行operator的时候,会把TwoPhaseCommittingSink转换为2个operator,具体逻辑是在生成streamgraph时实现的,具体逻辑在SinkTransformationTranslator中
后续最终转换为了SinkWriterOperator和CommitterOperator,数据流向是从SinkWriterOperator流入CommitterOperator(以kafka为例,Writer负责producer.send,并且通过emitCommittables方法向commiter发送Collection<KafkaCommittable>,commiter最终会调用到producer.commitTransaction提交事务,中间会通过CheckpointCommittableManager组件进行通信)
具体代码详见:FLINK-25575和FLINK-25573
二:kafka
官方给出的kafka案例如下
public class KafkaExample extends KafkaExampleUtil {
public static void main(String[] args) throws Exception {
// parse input arguments
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);
DataStream<Integer> input =
env.fromSource(
KafkaSource.<Integer>builder()
.setBootstrapServers(
parameterTool
.getProperties()
.getProperty(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
.setBounded(OffsetsInitializer.latest())
.setDeserializer(
KafkaRecordDeserializationSchema.valueOnly(
IntegerDeserializer.class))
.setTopics(parameterTool.getRequired("input-topic"))
.build(),
WatermarkStrategy.noWatermarks(),
"kafka-source");
input.sinkTo(
KafkaSink.<Integer>builder()
.setBootstrapServers(
parameterTool
.getProperties()
.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
// .setTopic(parameterTool.getRequired("output-topic"))
.setTopicSelector(new TopicSelector<Integer>() {
@Override
public String apply(Integer integer) {
return null;
}
})
.setKafkaValueSerializer(IntegerSerializer.class)
.build())
.build());
env.execute("Smoke Kafka Example");
}
}
其中的setTopicSelector可以很好的实现kafka分流,不需要在像Flink动态分流到kafka,hbase - 百度文库
中这么复杂了,还提供了setPartitioner,可以根据数据指定分区
三:es sink
es并不支持二阶段sink,所以不能保证exactly-once语义,essink涉及的类如下:
BulkProcessorConfig:保存批量插入的配置的,属于ElasticsearchSink的一个属性,用于构造writer
ElasticsearchEmitter:用户编写逻辑用于构造es的request
ElasticsearchSink:sink类
ElasticsearchSinkBuilderBase:用于工厂模式生成sink的
ElasticsearchWriter:sink类的对象,是sinkWriter的实现,里面包含Emitter,es的client,es的bulkProcessor
FlushBackoffType:重试机制,es请求发送失败时的重试机制,有三种
NetworkClientConfig:保存链接es的一些配置的,属于ElasticsearchSink的一个属性,用于构造writer
RequestIndexer:用于保存请求的
二:sql
一:托管表(manager table)
设计说明网站
FLIP-188: Introduce Built-in Dynamic Table Storage - Apache Flink - Apache Software Foundation
git地址:
https://github.com/JingsongLi/flink/tree/table_storage
一:Data Retention
主要类:
CatalogManager
ManagedTableListener
ManagedTableFactory
类的包含关系如下
ManagedTableListener最终会调用上面设计说明网站说明中的ManagedTableFactory相关接口
实际实现中又多了个onCompactTable,对应于后面实现fileStore的Compaction功能的。
整个实现过程主要是通过观察者模式和工厂模式实现的
二:Concurrent Write
可以看出在执行compact语法时,多个线程可能会操作fileStore,所以引入了锁,锁是乐观锁,用到了快照,通过重命名hdfs文件方式,当前规划是先实现hive的锁,hive锁机制数仓hive锁(Hive Lock)_Vincer_DB的博客-CSDN博客_hive lock。
代码提交记录为:
主要的实体类是HiveCatalogLock,主要逻辑如下:
可以看出,是在表级别加排他锁,默认尝试100次
更多推荐
所有评论(0)