Flink SQL(基础实践)
第一章 核心概念1.1、动态表和连续查询动态表(Dynamic Tables)是Flink的支持流数据的Table API 和SQL的核心概念。动态表是随时间变化的,可以像查询静态批处理表一样查询它们。查询动态表将生成一个连续查询(Continuous Query).一个连续查询永远不会终止,结果会生成一个动态表,查询不断更新其(动态)结果表,以反映其(动态)输入表上的更改。需要注意的是,连续查询
第一章 核心概念
1.1、动态表和连续查询
动态表(Dynamic Tables)是Flink的支持流数据的Table API 和SQL的核心概念。
动态表是随时间变化的,可以像查询静态批处理表一样查询它们。查询动态表将生成一个连续查询(Continuous Query).一个连续查询永远不会终止,结果会生成一个动态表,查询不断更新其(动态)结果表,以反映其(动态)输入表上的更改。
需要注意的是,连续查询的结果在语义上总是等价于以批处理模式在输入表快照上执行的相同查询的结果。
1、将流转换为动态表
2、在动态表上计算一个连续查询,生成一个新的动态表
3、生成的动态表被转换为流
1.2、在流上定义表(动态表)
为了使用关系查询处理流,必须将其转换为Table,从概念上讲,流的每条记录都被解释为对结果表的INSERT 操作。
假设有如下格式的数据:
{ user:VARCHAR,//用户名 cTime:TIMESTAMP,//访问url的时间 uri:VARCHAR//用户访问的url }
下图显示了单击时间流(左侧)如何转换为表(右侧),当插入更多的单击流记录时,结果的表的数据将不断增长。
1.2.1、连续查询
在动态表上的计算一个连续查询,并生成一个新的动态表,与批处理查询不同,连续查询从不终止,并根据其插入表上的更新其结果表。
在任何时候。连续查询的结果在语义上与以批处理模式在输入表快照上执行的相同查询的结果相同。
说明:
1.、当查询开始,Clicks表(左侧)是空的。
2、在第一行数据被插入到clicks表时,查询开始计算结果表。第一行数据【Mary,./home】插入后,结果表(右侧,上部)由一行【Mary,1】组成。
3、当第二行【Bob,./cart】插入到clicks表时,查询会更新结果表并插入了一行新数据【Bob,1】。
4、第三行【Mary,./prod?id=1】将产生已计算的结果行的更新,【Mary,1】更新【Mary,2】
5、最后,当第四行数据加入clicks表时,查询将第三行数据【Liz,1】插入到结果表中。
第二章 Flink Table API
2.1 导入需要的依赖
//老版官方提供的依赖没有融合blink的 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> //blink二次开发之后的依赖 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency>
2.2 基本使用:表与DataStream的混合使用
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; import static org.apache.flink.table.api.Expressions.$; public class BasicUse { public static void main(String[] args) { //1 流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //并行度设置 env.setParallelism(1); //数据来源 DataStreamSource<WaterSensor> waterSensorDataStream = env.fromElements(new WaterSensor("sensor_1", 1000L, 10), new WaterSensor("sensor_1", 2000L, 20), new WaterSensor("sensor_2", 3000L, 30), new WaterSensor("sensor_1", 4000L, 40), new WaterSensor("sensor_1", 5000L, 50), new WaterSensor("sensor_2", 6000L, 60)); //todo 1 创建表的执行环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //todo 2 创建表 将流转换为动态流,表的字段名从pojo的属性名自动抽取 Table table = tableEnv.fromDataStream(waterSensorDataStream); //todo 3 对动态表进行查询 Table resultTable = table.where($("id").isEqual("sensor_1")) .select($("id"), $("ts"), $("vc")); //todo 4 把动态表转换为流 DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable, Row.class); resultStream.print(); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } } }
2.3 基本使用:聚合操作
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; import static org.apache.flink.table.api.Expressions.$; public class BasicUseSum { public static void main(String[] args) { //1 流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //并行度设置 env.setParallelism(1); //读取数据 DataStreamSource<WaterSensor> waterSensorDataStreamSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 10), new WaterSensor("sensor_1", 2000L, 40), new WaterSensor("sensor_2", 3000L, 30), new WaterSensor("sensor_1", 4000L, 40), new WaterSensor("sensor_1", 5000L, 50), new WaterSensor("sensor_2", 6000L, 60)); //todo 1 创建表的执行环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //todo 2 创建表 将流转换为动态流,表的字段名从pojo的属性名自动抽取 Table table = tableEnv.fromDataStream(waterSensorDataStreamSource); //todo 3 对动态表进行查询 Table result = table.where($("vc").isGreaterOrEqual(20)) .groupBy($("id")) .aggregate($("vc").sum().as("vc_sum")) .select($("id"), $("vc_sum")); //todo 4 把动态表转换为流 DataStream<Tuple2<Boolean, Row>> resultDS = tableEnv.toRetractStream(result, Row.class); resultDS.print(); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } } }
2.4 表到流的转换
动态表可以像普通数据库一样通过INSERT、UPDATE和DELETE来不断修改,它可能是一个只有一行、不断更新的表,也可能是一个insert-only的表,没有UPDATE和DELETE修改,或者介于两者之间的其他表。
在将动态表转换为流或将其写入外部系统时,需要对这些更改进行编码。Flink的Table API和SQL支持三种方式来编码一个动态表的变化:
2.4.1 Append-only 流(只追加流)
仅通过insert操作修改的动态表可以通过输出插入的行转换为流。
2.4.2 Retract流(撤回流)
retract流包含两种类型的message:add messages和retract messages。通过将insert操作编码为add message、将delete操作编码为retract messages、将update操作编码为更新(先前)行的retract message和更新行的add message,将动态表转换为retract流。
更新的过程:
(1)旧的结果,标记为;撤回,用false表示
(2)新的结果,标记为;插入,用true表示
2.4.3 upsert流
upsert流包含两种类型的message:upsert message和delete messages.
转换为upsert流的动态表需要(可能是组合的)唯一键。通过将insert和update操作编码为upsert message,将delete操作编码为delete message,将具有唯一键的动态表转换为流,
消费流的算子需要知道唯一键的属性,以便正确地应用message。与retract流的主要区别在于update操作用单个message编码的,因此效率更高。
请注意,在将动态表转换为DataStream时,只支持append流和retract流。
2.5 通过Connector声明读入数据
前面是先得到流,再转成动态表,其实动态表也可以直接连接到数据
2.5.1 File source
准备文件sensor-sql.txt
sensor_1,1000,10 sensor_1,2000,20 sensor_2,3000,30 sensor_1,4000,40 sensor_1,5000,50 sensor_2,6000,60
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.Csv; import org.apache.flink.table.descriptors.FileSystem; import org.apache.flink.table.descriptors.Schema; import org.apache.flink.types.Row; import static org.apache.flink.table.api.Expressions.$; public class FileSource { public static void main(String[] args) throws Exception { //1 流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2 并行度设置 env.setParallelism(2); //3 表环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //todo 1 表的元数据信息 //org.apache.flink.table.descriptor.Schema Schema schema = new Schema(); schema.field("id", DataTypes.STRING()); schema.field("ts",DataTypes.BIGINT()); schema.field("vc",DataTypes.INT()); //todo 2 连接文件,并创建一个临时表,其实就是一个动态表 tableEnv.connect(new FileSystem().path("input/sensor.txt")) .withFormat(new Csv().fieldDelimiter(',').lineDelimiter("\n")) .withSchema(schema) .createTemporaryTable("sensor"); //todo 3 做成表对象,然后对动态表进行查询 Table sensortable = tableEnv.from("sensor"); Table resultTable = sensortable.groupBy($("id")) .select($("id"), $("id").count().as("cnt")); //todo 4 把动态表转换成流,如果涉及到数据的更新,要用到撤回流,多了一个boolean标记 DataStream<Tuple2<Boolean, Row>> resultStream = tableEnv.toRetractStream(resultTable, Row.class); resultStream.print(); //执行 env.execute(); } }
通过 connect直接 将 外部系统 抽象成 动态表,作为数据源 1. 调用 connect方法,传入一个外部系统的描述器,还有一些参数 2. 调用 withFormat方法,指定数据的存储格式: 列分隔符、行分隔符,等等 3. 调用 withSchema方法,指定 表的结构信息: 列名、列的类型 4. 调用 createTemporaryTable方法,创建一张临时表,并且指定表名
2.5.2 Kafka Source
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.Json; import org.apache.flink.table.descriptors.Kafka; import org.apache.flink.table.descriptors.Schema; import org.apache.flink.types.Row; import static org.apache.flink.table.api.Expressions.$; public class KafkaSource { public static void main(String[] args) throws Exception { //1 流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2 并行度设置 env.setParallelism(2); //3 表环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //todo 1 表的元数据信息 Schema schema = new Schema(); schema.field("id", DataTypes.STRING()); schema.field("ts",DataTypes.BIGINT()); schema.field("vc",DataTypes.INT()); //todo 2 连接文件,并创建一个临时表,其实就是一个动态表 tableEnv.connect(new Kafka() .version("universal")//kafka通用版本、 .topic("sensor") .startFromLatest() .property("group.id","bigdata") .property("bootstrap.servers","hadoop102:9092,hadoop103:9092,hadoop104:9092")) .withFormat(new Json()) .withSchema(schema) .createTemporaryTable("sensor"); //todo 3 对动态表进行查询 Table sensorTable = tableEnv.from("sensor"); Table resultTable = sensorTable.groupBy($("id")) .select($("id"), $("id").count().as("cnt")); //todo 4 把动态表转换为流,如果涉及到数据的更新,要用撤回流,多了一个boolean标记 DataStream<Tuple2<Boolean, Row>> resultStream = tableEnv.toRetractStream(resultTable, Row.class); //打印 resultStream.print(); //执行 env.execute(); } }
启动kafka生产者生产数据
kafka-console-producer.sh --broker-list hadoop102:9092 --topic sensor
2.6 通过Connector声明写出数据
2.6.1 File Sink
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.Csv; import org.apache.flink.table.descriptors.FileSystem; import org.apache.flink.table.descriptors.Schema; import static org.apache.flink.table.api.Expressions.$; public class FileSink { public static void main(String[] args) throws Exception { //1 流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2 并行度设置 env.setParallelism(2); //3 表环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //读取数据fromElements DataStreamSource<WaterSensor> waterSensorStream = env.fromElements(new WaterSensor("sensor_1", 1000L, 10), new WaterSensor("sensor_1", 2000L, 20), new WaterSensor("sensor_2", 3000L, 30), new WaterSensor("sensor_1", 4000L, 40), new WaterSensor("sensor_1", 5000L, 50), new WaterSensor("sensor_2", 6000L, 60)); //todo 1 创建表的执行环境 Table sensorTable = tableEnv.fromDataStream(waterSensorStream); Table resultTable = sensorTable //过滤条件 .where($("id").isEqual("sensor_1")) .select($("id"), $("ts"), $("vc")); //todo 2 创建输出表 Schema schema = new Schema(); schema.field("id", DataTypes.STRING()); schema.field("ts",DataTypes.BIGINT()); schema.field("vc",DataTypes.INT()); //todo 3 创建一个临时表 tableEnv.connect(new FileSystem().path("output/sensor_out.txt")) .withFormat(new Csv().fieldDelimiter('|')) .withSchema(schema) .createTemporaryTable("sensor"); //todo 4 把数据写入到输出表中 resultTable.executeInsert("sensor"); // 方法会去分析代码。生成一些graph,但是我们代码中没有调用算子,所以会报错,可以直接不写 //env.execute(); } }
2.6.2 Kafka Sink
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.Json; import org.apache.flink.table.descriptors.Kafka; import org.apache.flink.table.descriptors.Schema; import static org.apache.flink.table.api.Expressions.$; public class KafkaSink { public static void main(String[] args) { //1 流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2 并行度设置 env.setParallelism(2); //3 表环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //4 读取数据 DataStreamSource<WaterSensor> waterSensorStream = env.fromElements(new WaterSensor("sensor_1", 1000L, 10), new WaterSensor("sensor_1", 2000L, 20), new WaterSensor("sensor_2", 3000L, 30), new WaterSensor("sensor_1", 4000L, 40), new WaterSensor("sensor_1", 5000L, 50), new WaterSensor("sensor_2", 6000L, 60)); //todo 1 创建表的环境,表查询 Table sensorTable = tableEnv.fromDataStream(waterSensorStream); Table resultTable = sensorTable //过滤条件 .where($("id").isEqual("sensor_1")) .select($("id"), $("ts"), $("vc")); //todo 2 创建表 Schema schema = new Schema(); schema.field("id", DataTypes.STRING()); schema.field("ts",DataTypes.BIGINT()); schema.field("vc",DataTypes.INT()); tableEnv .connect(new Kafka() .version("universal") .topic("sensor") .sinkPartitionerRoundRobin() .property("bootstrap.servers","hadoop102:9092,hadoop103:9092,hadoop104:9092")) .withFormat(new Json()) .withSchema(schema) .createTemporaryTable("sensor"); //todo 3 将数据写入到输出表中 resultTable.executeInsert("sensor"); } }
启动kafka消费者消费数据
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic sensor --from-beginning
2.7 其他Connector用法
参考官方文档: Apache Flink 1.12 Documentation: Table API Legacy Connectors
第三章 Flink Sql
3.1 基本使用
3.1.1 查询未注册的表
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; public class SqlBase_01 { public static void main(String[] args) throws Exception { //1 流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2 并行度设置 env.setParallelism(1); //3 表环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //4 获取数据 DataStreamSource<WaterSensor> waterSensorStream = env.fromElements(new WaterSensor("sensor_1", 1000L, 10), new WaterSensor("sensor_1", 2000L, 20), new WaterSensor("sensor_2", 3000L, 30), new WaterSensor("sensor_1", 4000L, 40), new WaterSensor("sensor_1", 5000L, 50), new WaterSensor("sensor_2", 6000L, 60)); //todo 1 流转换表 Table inputTable = tableEnv.fromDataStream(waterSensorStream); //todo 2 查询未注册的表 Table resultTable = tableEnv.sqlQuery("select * from " + inputTable + " where id='sensor_1'"); //todo 3 动态表转换未流 DataStream<Row> resultTableStream = tableEnv.toAppendStream(resultTable, Row.class); resultTableStream.print(); //执行 env.execute(); } }
3.1.2 查询已注册的表
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; public class SqlBase_02 { public static void main(String[] args) throws Exception { //1 流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2 并行度设置 env.setParallelism(2); //3 表环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //4 读取数据 DataStreamSource<WaterSensor> waterSensorStream = env.fromElements(new WaterSensor("sensor_1", 1000L, 10), new WaterSensor("sensor_1", 2000L, 20), new WaterSensor("sensor_2", 3000L, 30), new WaterSensor("sensor_1", 4000L, 40), new WaterSensor("sensor_1", 5000L, 50), new WaterSensor("sensor_2", 6000L, 60)); //使用sql查询一个已注册的表 //todo 1 流转换为表 Table inputTable = tableEnv.fromDataStream(waterSensorStream); //todo 2 表注册为一个临时视图 tableEnv.createTemporaryView("sensor",inputTable); //todo 3 在临时视图查询数据,并得到一个新表 Table resultTable = tableEnv.sqlQuery("select * from sensor where id ='sensor_1'"); //todo 4 输出数据,表转为流 DataStream<Row> resultTableStream = tableEnv.toAppendStream(resultTable, Row.class); //打印 resultTableStream.print(); //执行 env.execute(); } }
3.2 Kafka到Kafka
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class KafkaToKafka { public static void main(String[] args) { //1 流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2 并行度设置 env.setParallelism(1); //3 表环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //todo 1 注册SourceTable:source_sensor tableEnv.executeSql("create table source_sensor(id string,ts bigint,vc int) with (" +"'connector'='kafka'," + "'topic'='topic_source_sensor'," + "'properties.bootstrap.servers'='hadoop102:9092,hadoop103:9092,hadoop104:9092'," + "'scan.startup.mode'='latest-offset',"+ "'format'='csv'" +")"); //todo 2 注册sinkTable:sink_sensor tableEnv.executeSql("create table sink_sensor(id string,ts bigint,vc int) with("+"'connector'='kafka'"+ "'topic'='topic_sink_sensor'"+ "'properties.bootstrap.servers'='hadoop102:9092,hadoop103:9092,hadoop104:9092',"+ "'format'='csv'"+ ")"); //todo 3 从SourceTable 查询数据,并写入到SinkTable tableEnv.executeSql("insert into sink_sensor select * from source_sensor where id='sensor_1'"); } }
启动kafka生产者生产数据 //kafka-console-producer.sh --broker-list hadoop102:9092 --topic topic_source_sensor
启动kafka消费者消费数据 //kafka-console-consumer.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic topic_sink_sensor --from-beginning
使用 sql 关联外部系统: 语法 :create table 表名 (字段名 字段类型,字段名 字段类型.....) with ( 参数名=参数值,参数名=参数值.....) 注意 : 去官网查看 参数名 有哪些,有一些是必需的,有一些是可选的Apache Flink 1.12 Documentation: Apache Kafka SQL Connector
第四章 时间属性
像窗口(在 Table API 和 SQL )这种基于时间的操作,需要有时间信息。因此,Table API 中的表就需要提供逻辑时间属性来表示时间,以及支持时间相关的操作。
4.1 处理时间
4.1.1 DataStream 到 Table 转换时定义
import flinksql.WaterSensor; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import static org.apache.flink.table.api.Expressions.$; public class TimeBase { public static void main(String[] args) { //处理时间属性可以在schema定义的时候用.proctime后缀来定义,时间属性一定不能定义在就一个已有字段上,所以只能定义在schema定义的最后 //1 流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2 并行度设置 env.setParallelism(1); //3 读取数据 DataStreamSource<WaterSensor> waterSensorStream = env.fromElements(new WaterSensor("sensor_1", 1000L, 10), new WaterSensor("sensor_1", 2000L, 20), new WaterSensor("sensor_2", 3000L, 30), new WaterSensor("sensor_1", 4000L, 40), new WaterSensor("sensor_1", 5000L, 50), new WaterSensor("sensor_2", 6000L, 60)); //todo 1 表环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //todo 2 声明一个新的字段作为处理时间字段 Table sensorTable = tableEnv.fromDataStream(waterSensorStream, $("id"), $("ts"), $("vc"), $("pt").proctime()); sensorTable.execute().print(); } }
4.1.2 在创建表的 DDL 中定义
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class ProcessTime { public static void main(String[] args) { //1 流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2 并行度设置 env.setParallelism(1); //3 表环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //todo 1 创建表,声明一个额外的列作为处理时间 tableEnv.executeSql("create table sensor(id string,ts bigint,vc int,pt_time as PROCTIME())with (" + "'connector'='filesystem',"+ "'path'='input/sensor.txt',"+ "'format'='csv'"+ ")"); //todo 2 查询表 TableResult resultTable = tableEnv.executeSql("select * from sensor"); resultTable.print(); } }
4.2 事件时间
事件时间允许程序按照数据中包含的时间来处理,这样可以在有乱序或者晚到的数据的情况下产生一致的处理结果。它可以保证从外部存储读取数据后产生可以复现(replayable)的结果。 除此之外,事件时间可以让程序在流式和批式作业中使用同样的语法。在流式程序中的事件时间属性,在批式程序中就是一个正常的时间字段。 为了能够处理乱序的事件,并且区分正常到达和晚到的事件,Flink 需要从事件中获取事件时间并且产生 watermark(watermarks)。
4.2.1 DataStream 到 Table 转换时定义
事件时间属性可以用.rowtime后缀在定义DataStream schema 的时候来定义。时间戳和watermark在这之前一定是在DataStream上已经定义好了。 在从DataStream到Table转换时定义事件时间属性有两种方式。取决于用 .rowtime 后缀修饰的字段名字是否是已有字段,事件时间字段可以是: 在 schema 的结尾追加一个新的字段。 替换一个已经存在的字段。 不管在哪种情况下,事件时间字段都表示DataStream中定义的事件的时间戳。
import flinksql.WaterSensor; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import java.time.Duration; import static org.apache.flink.table.api.Expressions.$; public class EventTime { public static void main(String[] args) throws Exception { //TODO 事件时间允许程序按照数据中包含的时间来处理,这样可以在有乱序或者晚到的数据情况下产生一致的处理结果,它可以保证从外部存储读取数据后产生可以复现的结果 除此之外,事件时间可以让程序在流式和批式作业中使用同样的语法,在流式程序中的事件时间属性,在批式程序中就是一个正常的时间字段 为了能够处理乱序事件,并且区分正常到达和晚到达的事件,Flink需要从事件中获取事件时间并且产生watermark //TODO 事件时间属性可以用.rowtime后缀在定义DataStream schema的时候来定义。时间戳和watermark在这之前一定是在DataStream上已经定义好了 //TODO 在从DataStream 到Table转换时定义事件时间属性有两种方式。取决于用.rowtime后缀修饰的字段名字是否已经有字段,事件时间字段可以是: 在schema的结尾追加一个新的字段 替换一个已经存在的字段。 //TODO 不管在哪种情况下,事件时间字段都表示DataStream中定义的事件时间戳。 //1 流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2 并行度设置 env.setParallelism(1); //3 读取数据 DataStreamSource<WaterSensor> waterSensorStream = env.fromElements(new WaterSensor("sensor_1", 1000L, 20), new WaterSensor("sensor_1", 2000L, 20), new WaterSensor("sensor_2", 3000L, 30), new WaterSensor("sensor_1", 4000L, 40), new WaterSensor("sensor_1", 5000L, 50), new WaterSensor("sensor_2", 6000L, 60)); SingleOutputStreamOperator<WaterSensor> waterSensorStreamWithWatermark = waterSensorStream //指定数据中的字段为时间戳 .assignTimestampsAndWatermarks( WatermarkStrategy //乱序程度 .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner((element, reconrdTimestamp) -> element.getTs()) ); //todo 表环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Table resultTable = tableEnv //用一个额外字段作为事件时间属性 .fromDataStream(waterSensorStreamWithWatermark, $("id"), $("ts"), $("vc"), $("et").rowtime()); resultTable.execute().print(); //执行 env.execute(); } }
4.2.2 在创建表的DDL中定义
事件时间属性可以用 WATERMARK 语句在 CREATE TABLE DDL 中进行定义。WATERMARK 语句在一个已有字段上定义一个 watermark 生成表达式,同时标记这个已有字段为时间属性字段。
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class EventTimeDDL { public static void main(String[] args) { //1 流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2 并行度设置 env.setParallelism(1); //3 表环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //TODO 事件时间属性可以用watermark语句在create table DDL 中定义,watermark语句在一个已有字段上定义一个watermark生成表达式,同时标记这个已有字段作为时间属性字段 //TODO 作为事件时间的字段必须是timestamp(3)类型,所以根据long类型的ts计算出来一个t tableEnv.executeSql("create table sensor("+ "id string,"+ "ts bigint,"+ "vc int,"+ "t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss')),"+ "watermark for t as t - interval '5' second)"+ "with("+ "'connector'='filesystem',"+ "'path'='input/sensor.txt',"+ "'format'='csv'"+ ")"); //todo sql查询 Table resultTable = tableEnv.sqlQuery("select * from sensor"); resultTable.execute().print(); } }
说明: 1.把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3),且是 schema 中的顶层列,它也可以是一个计算列。 2.严格递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column。 3.递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND。 4.有界乱序时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit。 5.当发现时区所导致的时间问题时,可设置本地使用的时区: Configuration configuration = tableEnv.getConfig().getConfiguration(); configuration.setString("table.local-time-zone", "GMT"); 6.参考官网https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/create.html#watermark
第五章 窗口(Window)
时间语义,要配合窗口操作才能发挥作用。最主要的用途是开窗口然后根据时间段做计算。
在Table API和SQL中,主要有两种窗口:Group Window 和Over Window
5.1 Table API中使用窗口
5.1.1 Group Windows
分组窗口(Group Windows)会根据时间或行计数间隔,将行聚合到有限的组中,并对每个组的数据执行一次聚合函数。
Table API中的Group Windows都是使用Window子语句定义的。并且必须有as子句指定一个别名,为了按窗口对表进行分组,窗口的别名必须在group by子句中,像常规的分组字段一样引用。
滚动窗口
package window; import flinksql.WaterSensor; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.Tumble; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import java.time.Duration; import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.table.api.Expressions.lit; public class GroupTumble { public static void main(String[] args) throws Exception { //l 流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2 并行度设置 env.setParallelism(1); //3 读取数据 DataStreamSource<WaterSensor> waterSensorStream = env.fromElements(new WaterSensor("sensor_1", 1000L, 10), new WaterSensor("sensor_1", 2000L, 20), new WaterSensor("sensor_2", 3000L, 30), new WaterSensor("sensor_1", 4000L, 40), new WaterSensor("sensor_1", 5000L, 50), new WaterSensor("sensor_2", 6000L, 60)); //todo 1 生成watermark SingleOutputStreamOperator<WaterSensor> waterSensorStreamWithWatermark = waterSensorStream .assignTimestampsAndWatermarks( WatermarkStrategy .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5)) //指定数据中字段为watermarks .withTimestampAssigner((element, recordTimestamp) -> (element.getTs())) ); //todo 2 表环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //todo 3 声明处理时间 Table table = tableEnv .fromDataStream(waterSensorStreamWithWatermark, $("id"), $("ts").rowtime(), $("vc")); //todo 4 sql查询 Table resultTable = table .window(Tumble.over(lit(10).second()).on($("ts")).as("w"))//定义滚动窗口并给窗口起一个别名 .groupBy($("id"), $("w"))//窗口必须出现在分组字段中 .select($("id"), $("w").start(), $("w").end(), $("vc").sum()); TableResult tableResult = resultTable.execute(); tableResult.print(); //执行 env.execute(); //TODO 开窗四部曲 // 1 窗口类型 window(Tumble over(lit(10).second()).on($("ts").as("w")) // 2 窗口相关的参数:比如窗口大小 .window(Tumble over(lit(10).second()).on($("ts")).as("w")) // 3 指定时间字段 .window(Tumble.over(lit(10).second()).on($("ts")).as("w")) // 4 窗口别名 .window(Tumble.over(lit(10).second()).on($("ts")).as("w")) } }
滑动窗口
.window(slide.over(lit(10).second()).every(lit(5).second()).on($("ts")).as("w"))
会话窗口
.window(Session.withGap(lit(6).second()).on($("ts")).as("w"))
5.1.2 Over Windows
over window 聚合是标准sql中已有的over子句,可以在查询的select字句中定义,over window 聚合,会针对每个输入行,计算相邻范围内的聚合。
Table API提供了over类,来配置over窗口的属性。可以在事件时间或处理时间,以及指定为时间间隔,或行计数的范围内,定义over windows。
无界的over window是使用常量指定的。时间间隔要指定UNBOUNDED_RANGE,或者行计数间隔要指定UNBOUNDED_ROW。而有界的over window 是用间隔的大小指定的
Unbounded Over Windows
import flinksql.WaterSensor; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Over; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import java.time.Duration; import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.table.api.Expressions.UNBOUNDED_ROW; public class OverWindows { public static void main(String[] args) throws Exception { //1 流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2 并行度设置 env.setParallelism(1); //3 读取数据 DataStreamSource<WaterSensor> waterSensorStream = env.fromElements(new WaterSensor("sensor_1", 1000L, 10), new WaterSensor("sensor_1", 4000L, 40), new WaterSensor("sensor_1", 2000L, 20), new WaterSensor("sensor_2", 3000L, 30), new WaterSensor("sensor_1", 5000L, 50), new WaterSensor("sensor_2", 6000L, 60)); //todo 1 生成watermark SingleOutputStreamOperator<WaterSensor> waterSensorStreamWithWatermark = waterSensorStream.assignTimestampsAndWatermarks( WatermarkStrategy .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner((element, recordTimestamp) -> element.getTs()) ); //todo 2 表环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //todo 3 sql查询 Table table = tableEnv .fromDataStream(waterSensorStreamWithWatermark, //.rowtime 处理时间 $("id"), $("ts").rowtime(), $("vc")); Table selectTable = table //开窗 .window(Over.partitionBy($("id")).orderBy($("ts")).preceding(UNBOUNDED_ROW).as("w")) .select($("id"), $("ts"), $("vc").sum().over($("w")).as("sum_vc")); selectTable.execute().print(); env.execute(); } }
Bounded Over Windows
//当事件时间向前算3s得到一个窗口 .window(Over.partitonBy($("id")).orderBy($("ts")).preceding(lit(3).second()).as("w")) //当前行向前推算2行一个窗口 .window(Over.partitionBy($("id")).orderBy($("ts")).preceding(rowInterval(2L)).as("w"))
5.2 SQL API 中使用窗口
5.2.1 Group Windows
SQL查询的分组窗口是通过group By子句定义的,类似于使用常规Group by语句的查询,窗口分组语句的Group By子句中带有一个窗口函数为每个分组计算出一个结果,以下是批处理表和流处理表支持的分组窗口函数:
*分组窗口函数* | *描述* |
---|---|
TUMBLE(time_attr, interval) | 定义滚动时间窗口。滚动时间窗口将行分配给具有固定持续时间(间隔)的不重叠的连续窗口。例如,一个5分钟的滚动窗口以5分钟的间隔对行进行分组。滚动窗口可以在事件时间(流+批处理)或处理时间(流)上定义。 |
HOP(time_attr, interval, interval) | 定义一个跳跃时间窗口(在Table API中称为滑动窗口)。跳变时间窗口具有固定的持续时间(第二个间隔参数)和通过指定的跳变间隔(第一个间隔参数)进行跳变。如果跳距小于窗口大小,则跳转窗口重叠。因此,可以将行分配给多个窗口。例如,一个15分钟大小的跳转窗口和5分钟的跳转间隔将每行分配给3个15分钟大小的不同窗口,这些窗口在5分钟的间隔中进行评估。跳跃窗口可以在事件时间(流+批处理)或处理中定义 |
SESSION(time_attr, interval) | 定义会话时间窗口。 会话时间窗口没有固定的持续时间,但其边界由不活动的时间间隔定义,即,如果在定义的间隔时间段内未出现任何事件,则关闭会话窗口。 例如,间隔30分钟的会话窗口在30分钟不活动后观察到一行时开始(否则该行将被添加到现有窗口),如果在30分钟内未添加任何行,则关闭该窗口。 会话窗口可以在事件时间(流+批处理)或处理时间(流)上工作。 |
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); // 作为事件时间的字段必须是 timestamp 类型, 所以根据 long 类型的 ts 计算出来一个 t tEnv.executeSql("create table sensor(" + "id string," + "ts bigint," + "vc int, " + "t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," + "watermark for t as t - interval '5' second)" + "with(" + "'connector' = 'filesystem'," + "'path' = 'input/sensor.txt'," + "'format' = 'csv'" + ")"); tEnv .sqlQuery( "SELECT id, " + " TUMBLE_START(t, INTERVAL '1' minute) as wStart, " + " TUMBLE_END(t, INTERVAL '1' minute) as wEnd, " + " SUM(vc) sum_vc " + "FROM sensor " + "GROUP BY TUMBLE(t, INTERVAL '1' minute), id" ) .execute() .print();
tEnv .sqlQuery( "SELECT id, " + " hop_start(t, INTERVAL '1' minute, INTERVAL '1' hour) as wStart, " + " hop_end(t, INTERVAL '1' minute, INTERVAL '1' hour) as wEnd, " + " SUM(vc) sum_vc " + "FROM sensor " + "GROUP BY hop(t, INTERVAL '1' minute, INTERVAL '1' hour), id" ) .execute().print();
5.2.2 Over Windows
tEnv .sqlQuery( "select " + "id," + "vc," + "sum(vc) over(partition by id order by t rows between 1 PRECEDING and current row)" + "from sensor" ) .execute() .print();
tEnv .sqlQuery( "select " + "id," + "vc," + "count(vc) over w, " + "sum(vc) over w " + "from sensor " + "window w as (partition by id order by t rows between 1 PRECEDING and current row)" ) .execute() .print();
第六章 函数(Functions)
Flink Table 和SQL内置了很多SQL中支持的函数;如果有无法满足的需要,则可以实现用户自定义的函数(UDF)来解决。
6.1 系统内置函数
Flink Table API和SQL为用户提供了一组用于数据转换的内置函数,SQL中支持的很多函数,Table API和SQL都已经做了实现,其他还在快速开发扩展中。
典型内置函数,参考官网
//nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/functions/systemfunctions/
比较函数 SQL: value1 = value2 value1 > value2 Table API: ANY1 === ANY2 ANY1 > ANY2 逻辑函数 SQL: boolean1 OR boolean2 boolean IS FALSE NOT boolean Table API: BOOLEAN1 || BOOLEAN2 BOOLEAN.isFalse !BOOLEAN 算术函数 SQL: numeric1 + numeric2 POWER(numeric1, numeric2) Table API: NUMERIC1 + NUMERIC2 NUMERIC1.power(NUMERIC2) 字符串函数 SQL: string1 || string2 UPPER(string) CHAR_LENGTH(string) Table API: STRING1 + STRING2 STRING.upperCase() STRING.charLength() 时间函数 SQL: DATE string TIMESTAMP string CURRENT_TIME INTERVAL string range Table API: STRING.toDate STRING.toTimestamp currentTime() NUMERIC.days NUMERIC.minutes 聚合函数 SQL: COUNT(*) SUM([ ALL | DISTINCT ] expression) RANK() ROW_NUMBER() Table API: FIELD.count FIELD.sum0
6.2 UDF
学习建议:面向官网编程!
用户定义函数(User-defined Functions,UDF)是一个重要的特性,因为它们显著地扩展了查询(Query)的表达能力。一些系统内置函数无法解决的需求,我们可以用UDF来自定义实现。
6.2.1 注册用户自定义函数UDF
大多数情况下,用户定义的函数必须先注册,然后才能在查询中使用,不需要专门为scala的Table API注册函数
函数通过调用registerFunction()方法在TableEnvironment中注册,当用户定义的函数被注册时,它被插入到TableEnvironment的函数目录中。这样Table API或SQL解析器就可以识别并正确解析它。
6.2.2 标量函数(Scalar Functions)
用户定义的标量函数,可以将0,1或多个标量值,映射到新的标量值。
为了定义标量函数,必须在org.apache.flink.table.functions中扩展基类Scalar Function,并实现(一个或者多个)求值(evaluation,eval)方法标量函数的行为由求值方法决定,求值方法必须公开声明并命名为eval(直接def声明,没有override).求值方法的参数类型和返回类型,确定了标量函数的参数和返回类型。
import flinksql.WaterSensor; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.functions.ScalarFunction; import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.table.api.Expressions.call; public class ScalarFunctions { public static void main(String[] args) { //1 获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //2 读取数据 DataStreamSource<WaterSensor> waterSenStream = env.fromElements(new WaterSensor("sensor_1", 1000L, 10), new WaterSensor("sensor_1", 2000L, 20), new WaterSensor("sensor_2", 3000L, 30), new WaterSensor("sensor_1", 4000L, 40), new WaterSensor("sensor_1", 5000L, 50), new WaterSensor("sensor_2", 6000L, 60)); //3 流转换为动态表 Table table = tableEnv.fromDataStream(waterSenStream); //4 不注册直接使用(Table API) /*table.select(call(MyLenth.class,$("id"))).execute().print();*/ //4.1 先注册再使用 tableEnv.createTemporarySystemFunction("MyLenth",MyLenth.class); //SQL tableEnv.executeSql("select Mylenth(id) from "+table).print(); //Table API /*table.select(call("MyLenth",$("id"))).execute().print();*/ } //TODO 自定义UDF函数,求数据的长度 public static class MyLenth extends ScalarFunction { //求值方法必须公开声明并且命名eval(直接def声明,没有override) public int eval(String value) { return value.length(); } } }
6.2.3 表函数(Table Functions)
与用户定义的标量函数类似,用户定义的表函数,可以将0,1或多个标量值作为输入参数,与标量函数不同的是它可以返回任意数量的行作为输出,而不是单个值。
为了定义一个表函数,必须扩展org.apache.flink.table.functions 中的基类TableFunction并实现(一个或者多个)求值方法。表函数的行为由其求值方法决定,求值方法必须是public的,并命名为eval。求值方法的参数类型,决定表函数的所有有效参数。
返回表的类型由TableFunction的泛型类型确定。求值方法使用protected collect(T)方法发出输出行。
在Table API中,Table函数需要与.joinLateral或leftOuterJoinLaterala一起使用。
joinLateral算子,会将外部表中的每一行,与表函数(TableFunction,算子的参数是它的表达式)计算得到的所有行连接起来。
而leftOuterJoinLateral算子,则是左外连接,它同样会将外部表中的每一行与表函数计算生成的所有行连接起来,并且,对于表函数返回的是空表的外部行,也要保留下来。
在SQL中,则需要使用Lateral Table(<TableFunction>),或者带有ON TRUE条件的左连接
import flinksql.WaterSensor; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.annotation.FunctionHint; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.types.Row; import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.table.api.Expressions.call; public class TableFunctions { public static void main(String[] args) { //1 流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2 并行度 env.setParallelism(1); //3 读取数据 DataStreamSource<WaterSensor> waterSensorStream = env.fromElements(new WaterSensor("sensor_1", 2000L, 10), new WaterSensor("sensor_1", 2000L, 20), new WaterSensor("sensor_2", 3000L, 30), new WaterSensor("sensor_1", 4000L, 40), new WaterSensor("sensor_1", 5000L, 50), new WaterSensor("sensor_2", 6000L, 60)); //4 流转化为动态表 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Table table = tableEnv.fromDataStream(waterSensorStream); //5 先注册再使用 tableEnv.createTemporarySystemFunction("split",SplitFunction.class); //TableAPI /* table .joinLateral(call("split",$("id"))) .select($("id"),$("word")) .execute() .print(); */ //SQL tableEnv.executeSql("select id,word from "+table+", lateral table(split(id))").print(); } //自定义UDTF函数 @FunctionHint(output = @DataTypeHint("ROW<word STRING>")) public static class SplitFunction extends TableFunction<Row> { public void eval(String str){ for (String s : str.split("_")) { collect(Row.of(s)); } } } }
6.2.4 聚合函数(Aggregate Functions)
用户自定义聚合函数(UDAGGs)可以把一个表中的数据,聚合成一个标量值,用户定义的聚合函数,是通过继承AggregateFunction抽象类实现的
上图显示一个聚合的例子
假设现在有一张表,包含了各种饮料的数据,该表由三列(id,name和price),五行组成数据,现在我们需要找到表中所有饮料的最高价格,即执行max()聚合,结果将是一个数值。
AggregateFunction的工作原理:
首先:它需要一个累加器,用来保存聚合中间结果的数据结构(状态)。可以通过调用AggregateFunction的createAccumulator()方法创建空累加器。
随后: 对每个输入行调用函数的accumulate()方法来更新累加器。
处理完所有行后,将调用函数的getValue()方法来计算并返回最终结果。
AggregationFunction要求必须实现的方法:
createAccumulator()
accumulate()
getValue()
除了上述方法之外,还有一些可选择实现的方法。其中一些方法,可以让系统执行查询更有效率,而另一些方法,对于某些场景是必需的。例如,如果聚合函数应用在会话窗口(session group window)的上下文中,则merge()方法是必需的。 retract() merge() resetAccumulator()
import flinksql.WaterSensor; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.table.api.Expressions.call; public class AggregateFunction { public static void main(String[] args) { //1 流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2 并行度设置 env.setParallelism(1); //3 读取数据 DataStreamSource<WaterSensor> waterSensorStream = env.fromElements(new WaterSensor("sensor_1", 1000L, 20), new WaterSensor("sensor_1", 2000L, 20), new WaterSensor("sensor_2", 3000L, 30), new WaterSensor("sensor_1", 4000L, 40), new WaterSensor("sensor_1", 5000L, 50), new WaterSensor("sensor_2", 6000L, 60)); //4 表转化为流 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Table table = tableEnv.fromDataStream(waterSensorStream); //5 先注册再使用 tableEnv.createTemporarySystemFunction("myavg",MyAvg.class); //table API /*table.groupBy($("id")) .select($("id"),call("myavg",$("vc"))) .execute() .print();*/ //SQL tableEnv .executeSql("select id,myavg(vc) from "+table+" group by id").print(); } //定义一个类当作累加器,并声明总数和总个数这两个值 public static class MyAvgAccumulator{ public long sum=0; public int count=0; } //自定义UDAF函数,求每个waterSensor中vc的平均值 public static class MyAvg extends org.apache.flink.table.functions.AggregateFunction<Double,MyAvgAccumulator> { //创建一个累加器 @Override public MyAvgAccumulator createAccumulator() { return new MyAvgAccumulator(); } //做累加操作 public void accumulate(MyAvgAccumulator acc,Integer vc){ acc.sum+=vc; acc.count+=1; } //将计算的结果的值返回 @Override public Double getValue(MyAvgAccumulator accumulator) { return accumulator.sum*1D/accumulator.count; } } }
6.2.5 表聚合函数(Table Aggregate Functions)
用户定义的表聚合函数(UDTAGGs),可以把一个表中数据,聚合为具有多行和多列的结果表。这跟AggregateFunction非常类似,只是之前聚合结果是一个标量值,现在变成了一张表。
比如现在我们需要找到表中所有waterSensor的前两个最高水位线,即执行top2()表聚合。
用户定义的表聚合函数,是通过继承TableAggtegateFunction抽象来实现的。
TableAggregateFunction的工作原理如下:
首先:它同样需要一个累加器(Accumulator),它是保存聚合中间结果的数据结构。通过调用TableAggregateFunctiom的createAccumutor()方法可以创建空累加器。
随后:对每个输入行调用函数的accumulate()方法来更新累加器。
处理完所有行后,将调用函数的emitValue()方法来计算并返回最终结果。
AggregationFunction要求必须实现的方法:
createAccumulator()
getValue()
除了上述方法之外,还有一些可选择实现的方法。 retract() merge() resetAccumulator() emitValue() emitUpdateWithRetract()
import flinksql.WaterSensor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.util.Collector; import static org.apache.flink.table.api.Expressions.*; public class TableAggregateFunction { public static void main(String[] args) { //1 流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2 并行度设置 env.setParallelism(1); //3 读取数据 DataStreamSource<WaterSensor> waterSensorStream = env.fromElements(new WaterSensor("sensor_1", 2000L, 10), new WaterSensor("sensor_1", 2000L, 20), new WaterSensor("sensor_2", 3000L, 30), new WaterSensor("sensor_1", 4000L, 40), new WaterSensor("sensor_1", 5000L, 50), new WaterSensor("sensor_2", 6000L, 60)); //4 流装化为表 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Table table = tableEnv .fromDataStream(waterSensorStream); //5 先注册再使用 tableEnv.createTemporarySystemFunction("Top2",Top2.class); //Table API table .groupBy($("id")) .flatAggregate(call("Top2", $("vc")).as("top", "rank")) .select($("id"), $("top"), $("rank")) .execute() .print(); } //定义一个类当作累加器,并声明第一和第二这两个值 public static class vcTop2 { public Integer first = Integer.MIN_VALUE; public Integer second = Integer.MIN_VALUE; } //自定义UDATF函数(多进多出),求每个waterSensor中最高的两个水位值 public static class Top2 extends org.apache.flink.table.functions.TableAggregateFunction<Tuple2<Integer,Integer>,vcTop2>{ @Override public vcTop2 createAccumulator() { return new vcTop2(); } //比较数据,如果当前数据大于累加器中存的数据则替换,并将原累加器中的数据往下(第二)赋值 public void accumulate(vcTop2 acc, Integer value){ if (value>acc.first){ acc.second=acc.first; acc.first=value; }else if (value>acc.second){ acc.second=value; } } //计算(排名) public void emitValue(vcTop2 acc,Collector<Tuple2<Integer,Integer>>out){ //emit the value and rank if (acc.first!=Integer.MIN_VALUE){ out.collect(Tuple2.of(acc.first,1)); } if (acc.second!=Integer.MIN_VALUE){ out.collect(Tuple2.of(acc.second,2)); } } } }
第七章 Catalog
Catalog提供了元数据信息,例如数据库,表,分区视图以及数据库或者其他外部系统中存储的函数和信息。
数据处理最关键的方面之一是管理元数据,元数据可以是临时的,例如临时表或者通过TableEnvrionment注册的UDF。元数据也可以是持久化的,例如Hive Metastore中的元数据。Catalog提供了一个统一的API,用于管理元数据,并使其可以从Table API和SQL查询语句中来访问。
前面用到Connector其实就是在使用Catalog.
7.1 Catalog类型
7.1.1 GenericlnMemoryCatalog(临时的)
GenericInMemoryCatalog是基于内存实现的Calalog,所有元数据旨在session的生命周期内可用。
7.1.2 JdbcCatalog
JdbcCatalog使得用户可以将Flink通过JDBC协议连接到关系数据库。PostgreCatalog是当前实现的唯一一种JDBC Catalog
7.1.3 HiveCatalog(用的最多)
HiveCatalog有两个用途,作为原生Flink元数据的持久化存储,以及作为读写现有Hive元数据的接口。Flink的Hive文档提供了有关设置HiveCatalog以及访问现有Hive元数据的详细信息。
7.2 HiveCatalog
7.2.1 导入需要的依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- Hive Dependency --> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>3.1.2</version> <exclusions> <exclusion> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.1.3</version> </dependency>
7.2.2 在hadoop102启动hive元数据
nohup hive --service metastore >/dev/null 2>&1 &
7.2.3 连接Hive
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.hive.HiveCatalog; public class HiveCatalogBase { public static void main(String[] args) { //todo 1设置用户权限 System.setProperty("HADOOP_USER_NAME","atguigu"); //todo 2 流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //并行度 env.setParallelism(1); //todo 3 表环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String name="myhive";//Catalog 名字 String defaultDatabase="flink_test";//默认数据库 String hiveConfDir="c:/conf";//hive配置文件的目录,需要把hive—site.xml添加到该目录 //1 创建hiveCatalog HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir); //2 连接HiveCatalog tableEnv.registerCatalog(name,hiveCatalog); //3 把hiveCatalog:myhive作为当前sesion的catalog tableEnv.useCatalog(name); tableEnv.useDatabase("flink_test"); //指定SQL语法为Hive语法 tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); tableEnv .sqlQuery("select * from stu") .execute() .print(); } }
第八章· SqlClient
启动完一个yarn-session,然后启动另外一个sql客户端
bin/sql-client.sh embedded
8.1 建立到Kafka的连接
下面创建一个流表从kafka读取数据
copy “flink-sql-connector-kafka_2.11-1.13.3.jar”依赖到 flink的lib 目录下 flink-sql-connector-kafka_2.11-1.13.3.jar 下载地址: https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.13.3/flink-sql-connector-kafka_2.11-1.13.3.jar
create table sensor(id string, ts bigint, vc int) with( 'connector'='kafka', 'topic'='sensor', 'properties.bootstrap.servers'='hadoop102:9092', 'properties.group.id'='atguigu', 'format'='csv', 'scan.startup.mode'='latest-offset' )
从流标查询数据 select * from sensor;
向Kafka写入数据:
先开启一个生产者 kafka-console-producer.sh --broker-list hadoop102:9092 --topic sensor
写入csv格式的数据:sensor_1,1,1
8.2 建立到mysql的连接
依赖:flink-connector-jdbc_2.11-1.13.3.jar
:mysql-connector-java-5.1.49.jar
copy mysql驱动到lib目录
create table sessor(id string,ts bigin,vc int) with( 'connector'='jdbc', 'url'='jdbc:mysql://hadoop102:3306/test', 'username'='root', 'password'='000000', 'table-name'='sensor', 'driver'='com.mysql.jdbc.Driver' )
更多推荐
所有评论(0)