每五秒计算一次mini1:8888(mini1是虚拟机ip映射名称)收到的数据
可以用nc命令模拟产生数据:
在mini1上执行
nc -lk 8888


import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object StreamingWordCount {
  def main(args: Array[String]): Unit = {
      //给一个线程的话Local[1]  打印不出结果,因为没有线程去计算
      //需要两个线程一个是receiver,另一个用于计算
      val conf = new SparkConf().setAppName("streamingwordcount")
      .setMaster("local[2]")
      val sc = new SparkContext(conf)
      val ssc = new StreamingContext(sc,Seconds(5))
      ssc.sparkContext.setLogLevel("WARN")
      //接收数据
      val ds = ssc.socketTextStream("mini1",8888)

      val result = ds.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
      result.print()
      ssc.start()
      ssc.awaitTermination()
  }
}

以上代码存在一个问题,就是每五秒计算几次端口收到的数据,但是结果并不会累加,下面是可以累加之前计算结果的代码。

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

object StreamingWordCountLeiJia {
    //Option[Int] 以前的结果
    //(hello,1)(hello,1)(tom,1)
    //(hello,Seq(1,1)),(tom,Seq(1))
    val updateFunc = (iter:Iterator[(String,Seq[Int],Option[Int])])=>{
        iter.flatMap{
            //it=>Some(it._2.sum+it._3.getOrElse(0)).map(x=>(it._1,x))
            case(x,y,z)=>Some(y.sum+z.getOrElse(0)).map(m=>(x,m))
            //iter.map(t=>(t._1,t._2,sum+t._3.getOrElse(0)))

        }
    }
  def main(args: Array[String]): Unit = {
      val conf = new SparkConf().setAppName("streamingwordcount").setMaster("local[2]")
      val sc = new SparkContext(conf)
      //本地跑的话设置一个本地的目录就好了,如果放在集群上的话需要写一个hdfs的目录
      sc.setCheckpointDir("C:\\Users\\User\\Desktop")
      val ssc = new StreamingContext(sc,Seconds(5))
      ssc.sparkContext.setLogLevel("WARN")
      //接收数据
      val ds = ssc.socketTextStream("mini1",8888)

      //updateStateByKey这种方式必须要设置checkpoint
      val result = ds.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunc,new HashPartitioner(sc.defaultParallelism),true)
      result.print()
      ssc.start()
      ssc.awaitTermination()
  }
}
Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐