目录

RDD队列

自定义数据源

 kafka数据源

DStream转换

无状态转化操作

有状态转化操作

DStream输出

优雅关闭


用到的全部依赖:

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.27</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>1.2.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>2.4.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.10.1</version>
        </dependency>

    </dependencies>
    <build>
        <plugins>
            <!--该插件用于把Scala代码编译成为class文件-->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <!--声明绑定到maven的compile阶段-->
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.1.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

RDD队列

可以通过使用ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列中的RDD都会作为一个DStream处理。

循环创建几个RDD并放入队列。通过SparkStream创建Dstream计算WordCount。

代码:

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable

object RDD {
  def main(args: Array[String]): Unit = {
    //TODO 创建配置环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val ssc = new StreamingContext(sparkConf,Seconds(5))

    //TODO 操作
    //创建RDD队列
    val Rdd = new mutable.Queue[RDD[Int]]()
    //创建queueStream
    val imputStream = ssc.queueStream(Rdd, oneAtATime = false)
    //处理RDD数据
    val mapRDD = imputStream.map((_, 1))
    val reduceRDD = mapRDD.reduceByKey(_ + _)
    //打印
    reduceRDD.print()
    //启动任务
    ssc.start()
    //循环创建RDD
    for (i <- 1 to 50) {
      Rdd += ssc.sparkContext.makeRDD(1 to 100,5)
      Thread.sleep(2000)
    }

    ssc.awaitTermination()
  }

}

结果:

自定义数据源

继承Receiver,实现onStart、onStop方法来自定义数据源采集。

案例1代码:

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.util.Random

object UserDefined_DataSource {
  def main(args: Array[String]): Unit = {
    //TODO 创建配置环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val ssc = new StreamingContext(sparkConf,Seconds(5))
    //采集数据
    val messageDS = ssc.receiverStream(new MyReceiver())
    messageDS.print()
    //开始
    ssc.start()
    ssc.awaitTermination()
  }
  /*
  自定义数据采集器
  1.继承Receiver,定义泛型,传递参数
  2.重写方法
  */
  class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) {
    private var fig = true
    //最初启动,读数据
    override def onStart(): Unit = {
      new Thread(new Runnable {
        override def run(): Unit = {
          while (true) {
            val message = "采集的数据为:" + new Random().nextInt(10).toString
            store(message)
            Thread.sleep(500)
          }
        }
      }).start()
    }
    //停止
    override def onStop(): Unit = {
      fig = false
    }
  }
}

结果:

案列2代码:

import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}

object UserDefined_DataSource02 {
  def main(args: Array[String]): Unit = {
    //初始化Spark配置信息
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    //初始化SparkStreamingContext
    val ssc = new StreamingContext(sparkConf, Seconds(10))
    //创建自定义数据接收器
    val lineStream = ssc.receiverStream(new MyReceiver("hadoop01", 9999))
    //数据切分
    val word = lineStream.flatMap(_.split("\t"))
    //映射为(word,1)
    val word2 = word.map((_, 1))
    //统计
    val wordCount = word2.reduceByKey(_ + _)
    //7.打印
    wordCount.print()
    //启动
    ssc.start()
    ssc.awaitTermination()
  }
  /*
  自定义数据采集器
  1.继承Receiver,定义泛型,传递参数
  2.重写方法
  */
  class MyReceiver(host: String,port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {
    //最初启动,调用方法
    override def onStart(): Unit = {
      new Thread("Socket Receiver") {
        override def run() {
          receive()
        }
      }.start()
    }
    //读数据并发送给Spark
    def receive(): Unit = {
      //创建Socket
      var socket: Socket = new Socket(host, port)
      //定义变量,接收数据
      var input: String = null
      //创建BufferedReader,读取数据
      val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))
      //读数据
      input = reader.readLine()
      //如果receiver没有关闭且输入数据不为空
      //循环发送数据给Spark
      while (!isStopped() && input != null) {
        store(input)
        input = reader.readLine()
      }
      //跳出循环,关闭资源
      reader.close()
      socket.close()
      //重启
      restart("restart")
    }
    override def onStop(): Unit = {}
  }
}

 结果:

   

  

 kafka数据源

ReceiverAPI:需要一个专门的Executor接收数据,然后发送给其他的Executor计算。存在数据的节点内存溢出问题。

DirectAPI:由计算的Executor主动消费Kafka的数据,速度由自身控制。

添加依赖

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.10.1</version>
        </dependency>

代码:

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object UserDefined_DataSource03_kafka {
  def main(args: Array[String]): Unit = {
    //初始化Spark配置信息
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Kafka")
    //初始化SparkStreamingContext
    val ssc = new StreamingContext(sparkConf, Seconds(15))
    //定义kafka参数
    val kafkaPara: Map[String, Object] = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop01:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "testkafka",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
    )
    //读取Kafka数据创建DStream
    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =
      KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](Set("testkafka"), kafkaPara))
    //取出KV对
    val value = kafkaDStream.map(record => record.value())
    //计算
    value.flatMap(_.split(" "))
        .map((_,1))
        .reduceByKey(_+_)
        .print()
    //启动
    ssc.start()
    ssc.awaitTermination()
  }
}

测试:

1)开启zookeeper

2)开启kafka

3)创建主题

进入kafka目录,运行:

bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 1 --partitions 1 --topic testkafka

4)打开生产者

进入kafka目录,运行:

bin/kafka-console-producer.sh --broker-list hadoop01:9092 --topic testkafka

5)打开消费者

打开另一个窗口,进入kafka目录,运行:

bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic testkafka

6)运行程序

结果

 

 

DStream转换

DStream的操作和RDD类似,分为Transformations(转换)和Output Operations(输出)两种,转换操作中还有一些比较特殊的原语:updateStateByKey()transform()以及各种Window相关的原语。

无状态转化和有状态转化:是否保存一个采集周期的数据,保存就是有状态,不保存就是无状态。

无状态转化操作

无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个 RDD

注意:1)针对键值对的DStream转化操作要添加import StreamingContext._才能在Scala上使用。

2)每个DStream在内部是由许多RDD组成,且无状态转化操作是分别应用到每个RDD上,例如:reduceByKey()会归约每个时间区间中的数据,但不会归约不同区间之间的数据。

join

两个流之间的join需要两个流的批次大小一致才能做到同时触发计算计算过程就是对当前批次的两个流中各自的RDD进行join使与两个RDDjoin效果相同。

代码:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object Join {
  def main(args: Array[String]): Unit = {

    //初始化Spark配置信息
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Join")
    //初始化SparkStreamingContext
    val ssc = new StreamingContext(sparkConf, Seconds(15) )
    //从端口获取数据创建流
    val line01 = ssc.socketTextStream("hadoop01", 9999)
    val line02 = ssc.socketTextStream("hadoop01", 8888)
    //转换为KV类型
    val word01 = line01.flatMap(_.split(" ")).map((_,1))
    val word02 = line02.flatMap(_.split(" "))map((_,"YES"))
    //JOIN
    val join = word01.join(word02)
    //打印
    join.print()
    //启动任务
    ssc.start()
    ssc.awaitTermination()
  }
}

测试结果:

 

transform

transform允许DStream上执行任意的RDD-to-RDD函数;该函数并没有在DStream的API中暴露出来方便扩展Spark API;该函数每一批次调度一次;其实也就是对DStream中的RDD应用转换。

 代码:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object Transform {
  def main(args: Array[String]): Unit = {

    //初始化Spark配置信息
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Traneform")
    //初始化SparkStreamingContext
    val ssc = new StreamingContext(sparkConf, Seconds(15) )
    //创建Dstream
    val line = ssc.socketTextStream("hadoop01", 9999)
    //转化为RDD
    val DStreamToRDD = line.transform(
      rdd => {
        val word = rdd.flatMap(_.split(" "))
        val wordmap = word.map((_, 1))
        val value = wordmap.reduceByKey(_ + _)
        value
      }
    )
    //打印
    DStreamToRDD.print()
    //启动任务
    ssc.start()
    ssc.awaitTermination()
  }
}

 测试结果:

有状态转化操作

UpdateStateByKey

UpdateStateByKey原语用于记录历史记录。

有时我们需要在DStream中跨批次维护状态(如:流计算中累加wordcount);updateStateByKey()提供对一个状态变量的访问,给一个由(键,事件)对构成的DStream,传递一个函数,该函数指定根据新的事件更新每个键对应的状态并构建出一个新的DStream,其内部数据为(键,状态) 对。

updateStateByKey() 的结果是一个新的DStream,其内部的RDD序列是由每个时间区间对应的(键,状态)对组成的。

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object UpdateStateByKey {
  def main(args: Array[String]): Unit = {

    //初始化Spark配置信息
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Count")
    //初始化SparkStreamingContext
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    //使用有状态操作时需要设置检查点路径
    ssc.checkpoint("/ck")
    //创建DStream
    val lines = ssc.socketTextStream("hadoop01", 9999)
    val wordMap = lines.map((_,1))
    //updateStateByKey:根据key进行数据状态更新
    //第一个值:相同key的value数据
    //第二个值:缓冲区相同key的value数据
    val wordCount = wordMap.updateStateByKey(
      (seq: Seq[Int], buff: Option[Int]) => {
        val newCount = buff.getOrElse(0) + seq.sum
        Option(newCount)
      }
    )
    wordCount.print()
    //启动任务
    ssc.start()
    ssc.awaitTermination()
  }

}

Window

Window Operations可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming的允许状态;所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动周期。

窗口时长:计算内容的时间范围;

滑动周期:隔多久触发一次计算

注:这两者都必须为采集周期大小的整数倍。

例1:采集周期为3秒,窗口时长为12秒,滑动周期为6秒。

解:3秒采集一次数据,6秒计算一次,计算的时间范围为12秒也就是4次采集的数据,部分数据可能会重复计算

例2:采集周期为3秒,窗口时长为6秒,滑动周期为6秒。

(3秒采集一次数据,6秒计算一次,计算的时间范围为6秒也就是2次采集的数据,数据不会重复计算,因为计算时间和滑动时间一样)

代码:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object WindowOperations {
  def main(args: Array[String]): Unit = {
    //TODO 采集数据周期为:3秒   窗口时长为:12秒    滑动周期为:6秒

    //初始化Spark配置信息
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    //初始化SparkStreamingContext
    val ssc = new StreamingContext(sparkConf, Seconds(3) )
    //创建Dstream
    val line = ssc.socketTextStream("hadoop01", 9999)
    //window(窗口时长,滑动周期)
    //窗口的范围应该是采集数据周期的整数倍
    //默认以一个采集周期进行滑动
    //为了避免重复数据可以改变滑动的幅度,窗口时长=滑动周期时避免重复计算
    val word = line.map((_, 1))
    //窗口时长为:12秒    滑动周期为:6秒
    val windowDS = word.window(Seconds(12),Seconds(6))
    val Count = windowDS.reduceByKey(_ + _)
    Count.print()
    //启动任务
    ssc.start()
    ssc.awaitTermination()
  }
}

测试结果:

输入13个a

结果有21个a,数据重复计算了。

 

Window操作的其它方法:

1)window(windowLength, slideInterval): 基于对源DStream窗化的批次进行计算返回一个新的 Dstream;

2)countByWindow(windowLength, slideInterval): 返回一个滑动窗口计数流中的元素个数;

3)reduceByWindow(func, windowLength, slideInterval): 通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流;

4)reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]): 当在一个(K,V)对的DStream上调用此函数会返回一个新的(K,V)对的DStream,通过对滑动窗口中的批次数据使用reduce函数来整合每个key的value值;

5)reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]): 上述函数的变化版本,每个窗口的reduce值都是通过用前一个窗的reduce值来递增计算;通过reduce进入到滑动窗口数据并反向reduce离开窗口的旧数据来实现这个操作;

一个例子是随着窗口滑动对keys的“加”“减”计数。

当窗口时长大,滑动周期小时,通过增加数据和删除数据避免重复计算

例1:采集周期为3秒,窗口时长为6秒,滑动周期为3秒。

输入数据(个)(3秒内):0,6,4,3,1,0,0

输出(3秒):

0,6(0+6),10(6-0+4),7(10-6+3),4(7-4+1),1(4-3+0),0(1-1+0)

代码:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object WindowOperations2 {
  def main(args: Array[String]): Unit = {
    //TODO 采集数据周期为:3秒   窗口时长为:12秒    滑动周期为:6秒

    //初始化Spark配置信息
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    //初始化SparkStreamingContext
    val ssc = new StreamingContext(sparkConf, Seconds(3) )
    ssc.checkpoint("ck")
    //创建Dstream
    val line = ssc.socketTextStream("hadoop01", 9999)
    val word = line.map((_, 1))
    //窗口时长为:12秒    滑动周期为:6秒
    val windowDS = word.reduceByKeyAndWindow(
      (x:Int,y:Int) => {x + y},
      (x:Int,y:Int) => {x - y},
      Seconds(6),
      Seconds(3)
    )
    windowDS.print()
    //启动任务
    ssc.start()
    ssc.awaitTermination()
  }
}

DStream输出

DStream输出操作:流数据经转化操作得到的数据所要执行的操作(保存到外部数据库或输出到屏幕)。

RDD中的惰性求值类似,一个DStream及其派生DStream都没有被执行输出操作,则这些DStream就不会被求值;如果StreamingContext中没有设定输出操作,整个context就都不会启动。

输出操作:

(1)print():在运行流程序的驱动结点上打印DStream中每一批次数据的最开始10个元素。

(2)foreachRDD(func):这是最通用的输出操作,将函数func用于产生于Stream的每一个RDD;其中参数传入的函数func应该实现将每一个RDD中数据推送到外部系统(如将RDD存入文件或者通过网络将其写入数据库)。

(3)saveAsTextFiles(prefix, [suffix]):以text文件形式存储这个DStream的内容;每一批次的存储文件名基于参数中的prefixsuffix。”prefix-Time_IN_MS[.suffix]”。

(4)saveAsObjectFiles(prefix, [suffix]):以Java对象序列化的方式将Stream中的数据保存为SequenceFiles,每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]".。

(5)saveAsHadoopFiles(prefix, [suffix]):将Stream中的数据保存为Hadoop files,每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]"

通用的输出操作foreachRDD(),它用来对DStream中的RDD运行任意计算;这和transform()有些类似,都可以让我们访问任意RDD。在foreachRDD()中,可以重用我们在Spark中实现的所有行动操作。

注: 1) 连接不能写在driver层面(序列化);

2) 如果写在foreach则每个RDD中的每一条数据都创建,得不偿失;

3) 增加foreachPartition,在分区创建(获取)。

优雅关闭

优雅关闭:计算节点不再接收新的数据,把已经有的数据处理完毕后关闭。

流式任务需要7*24小时执行,有时升级代码需要主动停止程序,分布式程序没办法做到一个个进程的杀死。 应该使用外部文件系统来控制内部程序关闭。

MonitorStop

代码:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState}

object MonitorStop2 {
  def main(args: Array[String]): Unit = {
    /*
    线程关闭
    val thread = new Thread()
    thread.start()//线程开启
    thread.stop()//强制关闭
    */

    //初始化Spark配置信息
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    //初始化SparkStreamingContext
    val ssc = new StreamingContext(sparkConf, Seconds(3) )
    //创建Dstream
    val line = ssc.socketTextStream("hadoop01", 9999)
    val word = line.map((_, 1))
    word.print()
    //启动任务
    ssc.start()
    //创建新的线程用作关闭线程
    //而且需要在第三方程序中增加关闭程序
    new Thread(
      new Runnable {
        override def run(): Unit = {
          //优雅关闭:计算节点不再接收新的数据,把已经有的数据处理完毕后关闭
          //Mysql:Table(stopSpark) => Row => data
          //Redis:Data(K-V)
          //ZK:/stopSpark
          //HDFS:/stopSpark
            Thread.sleep(5000)
            //获取SparkStreaming状态
            val state = ssc.getState()
            if (state == StreamingContextState.ACTIVE) {
              ssc.stop(true,true)
          }
          //退出线程
          System.exit(0)
        }
      }
    ).start()
    ssc.awaitTermination()    //block阻塞main线程
  }
}

HDFS

代码:

import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.streaming.{StreamingContext, StreamingContextState}

class MonitorStop(ssc: StreamingContext) extends Runnable {
  override def run(): Unit = {
    val fs: FileSystem = FileSystem.get(new URI("hdfs://hadoop01:9000"), new
        Configuration(), "spark")
    while (true) {
      try
        Thread.sleep(5000)
      catch {
        case e: InterruptedException =>
          e.printStackTrace()
      }
      val state: StreamingContextState = ssc.getState
      val bool: Boolean = fs.exists(new Path("hdfs://hadoop01:9000/stopSpark"))
      if (bool) {
        if (state == StreamingContextState.ACTIVE) {
          ssc.stop(stopSparkContext = true, stopGracefully = true)
          System.exit(0)
        }
      }
    }
  }
}

SparkTest

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkTest {
  def createSSC(): _root_.org.apache.spark.streaming.StreamingContext = {
    val update: (Seq[Int], Option[Int]) => Some[Int] = (values: Seq[Int], status:
    Option[Int]) => {
      //当前批次内容的计算
      val sum: Int = values.sum
      //取出状态信息中上一次状态
      val lastStatu: Int = status.getOrElse(0)
      Some(sum + lastStatu)
    }
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkTest")
    //设置优雅的关闭
    sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    ssc.checkpoint("./ck")
    val line: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop01", 9999)
    val word: DStream[String] = line.flatMap(_.split(" "))
    val wordAndOne: DStream[(String, Int)] = word.map((_, 1))
    val wordAndCount: DStream[(String, Int)] = wordAndOne.updateStateByKey(update)
    wordAndCount.print()
    ssc
  }
  def main(args: Array[String]): Unit = {
    val ssc: StreamingContext = StreamingContext.getActiveOrCreate("./ck", () => createSSC())
    new Thread(new MonitorStop(ssc)).start()
    ssc.start()
    ssc.awaitTermination()
  }
}

恢复数据

设置checkpoint用于恢复数据。

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object RecoverData {
  def main(args: Array[String]): Unit = {
    //TODO 恢复数据
    val ssc = StreamingContext.getActiveOrCreate("ck", () => {
      //初始化Spark配置信息
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
      //初始化SparkStreamingContext
      val ssc = new StreamingContext(sparkConf, Seconds(3))
      //创建Dstream
      val line = ssc.socketTextStream("hadoop01", 9999)
      val word = line.map((_, 1))
      word.print()
      ssc
    })
    ssc.checkpoint("ck")
    //启动任务
    ssc.start()
    ssc.awaitTermination()    //block阻塞main线程
  }
}

本文仅仅是学习笔记的记录!!!

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐