flink scala 中TimeWindow的apply 和 process方法的区别,继承的类不同,其中prcess方法包含context,里面可以获取窗口时间,自定义延迟数据

package com.bai

import com.bai.wartermarkTest.MySource
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction, WindowFunction}

import scala.collection.mutable
import scala.util.Random
//import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

object WindowTest { //mitbaiyun
  def main(args: Array[String]): Unit = {
    var ts = 1597152929l
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
    env.setParallelism(1)
    val input: DataStream[String] = env.addSource(new MySource)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val stream: DataStream[(Long, String,String)] = input.flatMap(new FlatMapFunction[String, (Long, String,String)] {
      override def flatMap(value: String, out: Collector[(Long, String,String)]) = {
        out.collect(ts, value,"aa")
        ts+=Random.nextInt(6)
      }
    })

    val value: DataStream[String] = stream
      .assignTimestampsAndWatermarks(
        new BoundedOutOfOrdernessTimestampExtractor[(Long, String, String)](Time.seconds(4)) {
          override def extractTimestamp(element: (Long, String, String)) = element._1 * 1000
        })
      .keyBy(_._3)
      .timeWindow(Time.seconds(8))
        .apply(new MyWindow)
//      .process(new MyWindowFunction)
//    value.print("")
    value.map(println(_))

    env.execute("windowTest")

  }

}

class MyWindowFunction extends ProcessWindowFunction[(Long,String,String),String,String,TimeWindow] {
  override def process(key: String, context: Context, elements: Iterable[(Long, String, String)], out: Collector[String]): Unit = {
    var list = ListBuffer[(Long,String,String)]()
    for (e<- elements){
      list.append(e)
    }

    val a: mutable.Seq[(Long, String, String)] =list.sortWith(_._1>_._1)
    for (e<-a){
//      println(e)
    }
    out.collect(elements.size.toString()+","+a+","+list)
  }
}

class MyWindow extends WindowFunction[(Long, String, String),String,String,TimeWindow] {
  override def apply(key: String, window: TimeWindow, input: Iterable[(Long, String, String)], out: Collector[String]): Unit = ???
}
Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐