spark学习笔记(十二)——sparkStreaming-RDD队列/自定义数据源/kafka数据源/DStream转换/DStream输出/优雅关闭
继承Receiver,实现onStart、onStop方法来自定义数据源采集。//TODO 创建配置环境//采集数据//开始}/*自定义数据采集器1.继承Receiver,定义泛型,传递参数2.重写方法*///最初启动,读数据val message = "采集的数据为:" + new Random().nextInt(10).toString}}}//停止}}}结果://初始化Spark配置信息/
目录
用到的全部依赖:
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.10.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<!--该插件用于把Scala代码编译成为class文件-->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<!--声明绑定到maven的compile阶段-->
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
RDD队列
可以通过使用ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列中的RDD都会作为一个DStream处理。
循环创建几个RDD并放入队列。通过SparkStream创建Dstream计算WordCount。
代码:
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
object RDD {
def main(args: Array[String]): Unit = {
//TODO 创建配置环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val ssc = new StreamingContext(sparkConf,Seconds(5))
//TODO 操作
//创建RDD队列
val Rdd = new mutable.Queue[RDD[Int]]()
//创建queueStream
val imputStream = ssc.queueStream(Rdd, oneAtATime = false)
//处理RDD数据
val mapRDD = imputStream.map((_, 1))
val reduceRDD = mapRDD.reduceByKey(_ + _)
//打印
reduceRDD.print()
//启动任务
ssc.start()
//循环创建RDD
for (i <- 1 to 50) {
Rdd += ssc.sparkContext.makeRDD(1 to 100,5)
Thread.sleep(2000)
}
ssc.awaitTermination()
}
}
结果:
自定义数据源
继承Receiver,实现onStart、onStop方法来自定义数据源采集。
案例1代码:
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.util.Random
object UserDefined_DataSource {
def main(args: Array[String]): Unit = {
//TODO 创建配置环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val ssc = new StreamingContext(sparkConf,Seconds(5))
//采集数据
val messageDS = ssc.receiverStream(new MyReceiver())
messageDS.print()
//开始
ssc.start()
ssc.awaitTermination()
}
/*
自定义数据采集器
1.继承Receiver,定义泛型,传递参数
2.重写方法
*/
class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) {
private var fig = true
//最初启动,读数据
override def onStart(): Unit = {
new Thread(new Runnable {
override def run(): Unit = {
while (true) {
val message = "采集的数据为:" + new Random().nextInt(10).toString
store(message)
Thread.sleep(500)
}
}
}).start()
}
//停止
override def onStop(): Unit = {
fig = false
}
}
}
结果:
案列2代码:
import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}
object UserDefined_DataSource02 {
def main(args: Array[String]): Unit = {
//初始化Spark配置信息
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//初始化SparkStreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(10))
//创建自定义数据接收器
val lineStream = ssc.receiverStream(new MyReceiver("hadoop01", 9999))
//数据切分
val word = lineStream.flatMap(_.split("\t"))
//映射为(word,1)
val word2 = word.map((_, 1))
//统计
val wordCount = word2.reduceByKey(_ + _)
//7.打印
wordCount.print()
//启动
ssc.start()
ssc.awaitTermination()
}
/*
自定义数据采集器
1.继承Receiver,定义泛型,传递参数
2.重写方法
*/
class MyReceiver(host: String,port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {
//最初启动,调用方法
override def onStart(): Unit = {
new Thread("Socket Receiver") {
override def run() {
receive()
}
}.start()
}
//读数据并发送给Spark
def receive(): Unit = {
//创建Socket
var socket: Socket = new Socket(host, port)
//定义变量,接收数据
var input: String = null
//创建BufferedReader,读取数据
val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))
//读数据
input = reader.readLine()
//如果receiver没有关闭且输入数据不为空
//循环发送数据给Spark
while (!isStopped() && input != null) {
store(input)
input = reader.readLine()
}
//跳出循环,关闭资源
reader.close()
socket.close()
//重启
restart("restart")
}
override def onStop(): Unit = {}
}
}
结果:
kafka数据源
ReceiverAPI:需要一个专门的Executor接收数据,然后发送给其他的Executor计算。存在数据的节点内存溢出问题。
DirectAPI:由计算的Executor主动消费Kafka的数据,速度由自身控制。
添加依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.10.1</version>
</dependency>
代码:
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object UserDefined_DataSource03_kafka {
def main(args: Array[String]): Unit = {
//初始化Spark配置信息
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Kafka")
//初始化SparkStreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(15))
//定义kafka参数
val kafkaPara: Map[String, Object] = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop01:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "testkafka",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
)
//读取Kafka数据创建DStream
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =
KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Set("testkafka"), kafkaPara))
//取出KV对
val value = kafkaDStream.map(record => record.value())
//计算
value.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.print()
//启动
ssc.start()
ssc.awaitTermination()
}
}
测试:
1)开启zookeeper
2)开启kafka
3)创建主题
进入kafka目录,运行:
bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 1 --partitions 1 --topic testkafka
4)打开生产者
进入kafka目录,运行:
bin/kafka-console-producer.sh --broker-list hadoop01:9092 --topic testkafka
5)打开消费者
打开另一个窗口,进入kafka目录,运行:
bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic testkafka
6)运行程序
结果
DStream转换
DStream的操作和RDD类似,分为Transformations(转换)和Output Operations(输出)两种,转换操作中还有一些比较特殊的原语:updateStateByKey()、transform()以及各种Window相关的原语。
无状态转化和有状态转化:是否保存一个采集周期的数据,保存就是有状态,不保存就是无状态。
无状态转化操作
无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个 RDD。
注意:1)针对键值对的DStream转化操作要添加import StreamingContext._才能在Scala上使用。
2)每个DStream在内部是由许多RDD组成,且无状态转化操作是分别应用到每个RDD上,例如:reduceByKey()会归约每个时间区间中的数据,但不会归约不同区间之间的数据。
join
两个流之间的join需要两个流的批次大小一致才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的RDD进行join使与两个RDD的join效果相同。
代码:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Join {
def main(args: Array[String]): Unit = {
//初始化Spark配置信息
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Join")
//初始化SparkStreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(15) )
//从端口获取数据创建流
val line01 = ssc.socketTextStream("hadoop01", 9999)
val line02 = ssc.socketTextStream("hadoop01", 8888)
//转换为KV类型
val word01 = line01.flatMap(_.split(" ")).map((_,1))
val word02 = line02.flatMap(_.split(" "))map((_,"YES"))
//JOIN
val join = word01.join(word02)
//打印
join.print()
//启动任务
ssc.start()
ssc.awaitTermination()
}
}
测试结果:
transform
transform允许DStream上执行任意的RDD-to-RDD函数;该函数并没有在DStream的API中暴露出来方便扩展Spark API;该函数每一批次调度一次;其实也就是对DStream中的RDD应用转换。
代码:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Transform {
def main(args: Array[String]): Unit = {
//初始化Spark配置信息
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Traneform")
//初始化SparkStreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(15) )
//创建Dstream
val line = ssc.socketTextStream("hadoop01", 9999)
//转化为RDD
val DStreamToRDD = line.transform(
rdd => {
val word = rdd.flatMap(_.split(" "))
val wordmap = word.map((_, 1))
val value = wordmap.reduceByKey(_ + _)
value
}
)
//打印
DStreamToRDD.print()
//启动任务
ssc.start()
ssc.awaitTermination()
}
}
测试结果:
有状态转化操作
UpdateStateByKey
UpdateStateByKey原语用于记录历史记录。
有时我们需要在DStream中跨批次维护状态(如:流计算中累加wordcount);updateStateByKey()提供对一个状态变量的访问,给一个由(键,事件)对构成的DStream,传递一个函数,该函数指定根据新的事件更新每个键对应的状态并构建出一个新的DStream,其内部数据为(键,状态) 对。
updateStateByKey() 的结果是一个新的DStream,其内部的RDD序列是由每个时间区间对应的(键,状态)对组成的。
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object UpdateStateByKey {
def main(args: Array[String]): Unit = {
//初始化Spark配置信息
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Count")
//初始化SparkStreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(3))
//使用有状态操作时需要设置检查点路径
ssc.checkpoint("/ck")
//创建DStream
val lines = ssc.socketTextStream("hadoop01", 9999)
val wordMap = lines.map((_,1))
//updateStateByKey:根据key进行数据状态更新
//第一个值:相同key的value数据
//第二个值:缓冲区相同key的value数据
val wordCount = wordMap.updateStateByKey(
(seq: Seq[Int], buff: Option[Int]) => {
val newCount = buff.getOrElse(0) + seq.sum
Option(newCount)
}
)
wordCount.print()
//启动任务
ssc.start()
ssc.awaitTermination()
}
}
Window Operations可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming的允许状态;所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动周期。
窗口时长:计算内容的时间范围;
滑动周期:隔多久触发一次计算;
注:这两者都必须为采集周期大小的整数倍。
例1:采集周期为3秒,窗口时长为12秒,滑动周期为6秒。
解:3秒采集一次数据,6秒计算一次,计算的时间范围为12秒也就是4次采集的数据,部分数据可能会重复计算
例2:采集周期为3秒,窗口时长为6秒,滑动周期为6秒。
(3秒采集一次数据,6秒计算一次,计算的时间范围为6秒也就是2次采集的数据,数据不会重复计算,因为计算时间和滑动时间一样)
代码:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WindowOperations {
def main(args: Array[String]): Unit = {
//TODO 采集数据周期为:3秒 窗口时长为:12秒 滑动周期为:6秒
//初始化Spark配置信息
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
//初始化SparkStreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(3) )
//创建Dstream
val line = ssc.socketTextStream("hadoop01", 9999)
//window(窗口时长,滑动周期)
//窗口的范围应该是采集数据周期的整数倍
//默认以一个采集周期进行滑动
//为了避免重复数据可以改变滑动的幅度,窗口时长=滑动周期时避免重复计算
val word = line.map((_, 1))
//窗口时长为:12秒 滑动周期为:6秒
val windowDS = word.window(Seconds(12),Seconds(6))
val Count = windowDS.reduceByKey(_ + _)
Count.print()
//启动任务
ssc.start()
ssc.awaitTermination()
}
}
测试结果:
输入13个a
结果有21个a,数据重复计算了。
Window操作的其它方法:
1)window(windowLength, slideInterval): 基于对源DStream窗化的批次进行计算返回一个新的 Dstream;
2)countByWindow(windowLength, slideInterval): 返回一个滑动窗口计数流中的元素个数;
3)reduceByWindow(func, windowLength, slideInterval): 通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流;
4)reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]): 当在一个(K,V)对的DStream上调用此函数会返回一个新的(K,V)对的DStream,通过对滑动窗口中的批次数据使用reduce函数来整合每个key的value值;
5)reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]): 上述函数的变化版本,每个窗口的reduce值都是通过用前一个窗的reduce值来递增计算;通过reduce进入到滑动窗口数据并反向reduce离开窗口的旧数据来实现这个操作;
一个例子是随着窗口滑动对keys的“加”“减”计数。
当窗口时长大,滑动周期小时,通过增加数据和删除数据避免重复计算;
例1:采集周期为3秒,窗口时长为6秒,滑动周期为3秒。
输入数据(个)(3秒内):0,6,4,3,1,0,0
输出(3秒):
0,6(0+6),10(6-0+4),7(10-6+3),4(7-4+1),1(4-3+0),0(1-1+0)
代码:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WindowOperations2 {
def main(args: Array[String]): Unit = {
//TODO 采集数据周期为:3秒 窗口时长为:12秒 滑动周期为:6秒
//初始化Spark配置信息
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
//初始化SparkStreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(3) )
ssc.checkpoint("ck")
//创建Dstream
val line = ssc.socketTextStream("hadoop01", 9999)
val word = line.map((_, 1))
//窗口时长为:12秒 滑动周期为:6秒
val windowDS = word.reduceByKeyAndWindow(
(x:Int,y:Int) => {x + y},
(x:Int,y:Int) => {x - y},
Seconds(6),
Seconds(3)
)
windowDS.print()
//启动任务
ssc.start()
ssc.awaitTermination()
}
}
DStream输出
DStream输出操作:流数据经转化操作得到的数据所要执行的操作(保存到外部数据库或输出到屏幕)。
与RDD中的惰性求值类似,一个DStream及其派生DStream都没有被执行输出操作,则这些DStream就不会被求值;如果StreamingContext中没有设定输出操作,整个context就都不会启动。
输出操作:
(1)print():在运行流程序的驱动结点上打印DStream中每一批次数据的最开始10个元素。
(2)foreachRDD(func):这是最通用的输出操作,将函数func用于产生于Stream的每一个RDD;其中参数传入的函数func应该实现将每一个RDD中数据推送到外部系统(如将RDD存入文件或者通过网络将其写入数据库)。
(3)saveAsTextFiles(prefix, [suffix]):以text文件形式存储这个DStream的内容;每一批次的存储文件名基于参数中的prefix和suffix。”prefix-Time_IN_MS[.suffix]”。
(4)saveAsObjectFiles(prefix, [suffix]):以Java对象序列化的方式将Stream中的数据保存为SequenceFiles,每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]".。
(5)saveAsHadoopFiles(prefix, [suffix]):将Stream中的数据保存为Hadoop files,每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]"。
通用的输出操作foreachRDD(),它用来对DStream中的RDD运行任意计算;这和transform()有些类似,都可以让我们访问任意RDD。在foreachRDD()中,可以重用我们在Spark中实现的所有行动操作。
注: 1) 连接不能写在driver层面(序列化);
2) 如果写在foreach则每个RDD中的每一条数据都创建,得不偿失;
3) 增加foreachPartition,在分区创建(获取)。
优雅关闭
优雅关闭:计算节点不再接收新的数据,把已经有的数据处理完毕后关闭。
流式任务需要7*24小时执行,有时升级代码需要主动停止程序,分布式程序没办法做到一个个进程的杀死。 应该使用外部文件系统来控制内部程序关闭。
MonitorStop
代码:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState}
object MonitorStop2 {
def main(args: Array[String]): Unit = {
/*
线程关闭
val thread = new Thread()
thread.start()//线程开启
thread.stop()//强制关闭
*/
//初始化Spark配置信息
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
//初始化SparkStreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(3) )
//创建Dstream
val line = ssc.socketTextStream("hadoop01", 9999)
val word = line.map((_, 1))
word.print()
//启动任务
ssc.start()
//创建新的线程用作关闭线程
//而且需要在第三方程序中增加关闭程序
new Thread(
new Runnable {
override def run(): Unit = {
//优雅关闭:计算节点不再接收新的数据,把已经有的数据处理完毕后关闭
//Mysql:Table(stopSpark) => Row => data
//Redis:Data(K-V)
//ZK:/stopSpark
//HDFS:/stopSpark
Thread.sleep(5000)
//获取SparkStreaming状态
val state = ssc.getState()
if (state == StreamingContextState.ACTIVE) {
ssc.stop(true,true)
}
//退出线程
System.exit(0)
}
}
).start()
ssc.awaitTermination() //block阻塞main线程
}
}
HDFS
代码:
import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.streaming.{StreamingContext, StreamingContextState}
class MonitorStop(ssc: StreamingContext) extends Runnable {
override def run(): Unit = {
val fs: FileSystem = FileSystem.get(new URI("hdfs://hadoop01:9000"), new
Configuration(), "spark")
while (true) {
try
Thread.sleep(5000)
catch {
case e: InterruptedException =>
e.printStackTrace()
}
val state: StreamingContextState = ssc.getState
val bool: Boolean = fs.exists(new Path("hdfs://hadoop01:9000/stopSpark"))
if (bool) {
if (state == StreamingContextState.ACTIVE) {
ssc.stop(stopSparkContext = true, stopGracefully = true)
System.exit(0)
}
}
}
}
}
SparkTest
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkTest {
def createSSC(): _root_.org.apache.spark.streaming.StreamingContext = {
val update: (Seq[Int], Option[Int]) => Some[Int] = (values: Seq[Int], status:
Option[Int]) => {
//当前批次内容的计算
val sum: Int = values.sum
//取出状态信息中上一次状态
val lastStatu: Int = status.getOrElse(0)
Some(sum + lastStatu)
}
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkTest")
//设置优雅的关闭
sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint("./ck")
val line: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop01", 9999)
val word: DStream[String] = line.flatMap(_.split(" "))
val wordAndOne: DStream[(String, Int)] = word.map((_, 1))
val wordAndCount: DStream[(String, Int)] = wordAndOne.updateStateByKey(update)
wordAndCount.print()
ssc
}
def main(args: Array[String]): Unit = {
val ssc: StreamingContext = StreamingContext.getActiveOrCreate("./ck", () => createSSC())
new Thread(new MonitorStop(ssc)).start()
ssc.start()
ssc.awaitTermination()
}
}
恢复数据
设置checkpoint用于恢复数据。
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object RecoverData {
def main(args: Array[String]): Unit = {
//TODO 恢复数据
val ssc = StreamingContext.getActiveOrCreate("ck", () => {
//初始化Spark配置信息
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
//初始化SparkStreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(3))
//创建Dstream
val line = ssc.socketTextStream("hadoop01", 9999)
val word = line.map((_, 1))
word.print()
ssc
})
ssc.checkpoint("ck")
//启动任务
ssc.start()
ssc.awaitTermination() //block阻塞main线程
}
}
本文仅仅是学习笔记的记录!!!
更多推荐
所有评论(0)