创建spark上下文环境
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("wc"))
val rdd: RDD[String] = sc.makeRDD(List(
"spark hello", "hive", "hadoop hbase", "spark hadoop", "hbase"
))
sc.stop()
}
第一种:reduceBykey
// reduceBykey
def wordCount1(rdd: RDD[String]): Unit = {
// 扁平化操作,拆分出数据
val value: RDD[String] = rdd.flatMap(_.split(" "))
// map转换为(key,1)
val mapRDD: RDD[(String, Int)] = value.map((_, 1))
// reduceByKey根据key进行聚合
val result: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)
result.collect().foreach(println)
}
第二种:groupByKey
// groupByKey
def wordCount2(rdd: RDD[String]): Unit = {
val value: RDD[String] = rdd.flatMap(_.split(" "))
val mapRDD: RDD[(String, Int)] = value.map((_, 1))
// 根据key进行分组(key,(1,1,......))
val groupRDD: RDD[(String, Iterable[Int])] = mapRDD.groupByKey()
// 取得集合的value值(这里是个迭代器,所以直接取迭代器的长度)
val result: RDD[(String, Int)] = groupRDD.mapValues(
iter => iter.size
)
result.collect().foreach(println)
}
第三种:aggregateBykey
// aggregateBykey
def wordCount3(rdd: RDD[String]): Unit = {
val value: RDD[String] = rdd.flatMap(_.split(" "))
val mapRDD: RDD[(String, Int)] = value.map((_, 1))
// zeroValue为指定的初始化值,所有的数据都会和这个进行对比操作,操作的逻辑分为分区内和分区间(wordcount就是求和的逻辑)
val result: RDD[(String, Int)] = mapRDD.aggregateByKey(0)(_ + _, _ + _)
result.collect().foreach(println)
}
第四种:foldByKey
// foldByKey
def wordCount4(rdd: RDD[String]): Unit = {
val value: RDD[String] = rdd.flatMap(_.split(" "))
val mapRDD: RDD[(String, Int)] = value.map((_, 1))
// 当aggregateByKey的分区内和分区间的逻辑一致时就可以简写为foldByKey
val result: RDD[(String, Int)] = mapRDD.foldByKey(0)(_ + _)
result.collect().foreach(println)
}
第五种:combineByKey
// combineByKey
def wordCount5(rdd: RDD[String]): Unit = {
val value: RDD[String] = rdd.flatMap(_.split(" "))
val mapRDD: RDD[(String, Int)] = value.map((_, 1))
// combineByKey()的功能类似aggregateByKey的功能(分三个参数,第一个是对分区内第一个值进行操作,第二个是分区内操作逻辑,第三个是分区间操作逻辑),但是combineByKey()允许用户返回值的类型与输入不一致
val result: RDD[(String, Int)] = mapRDD.combineByKey(
v => v,
(x, y) => x + y,
(x, y) => x + y
)
result.collect().foreach(println)
}
第六种:groupBy
// groupBy
def wordCount6(rdd: RDD[String]): Unit = {
val value: RDD[String] = rdd.flatMap(_.split(" "))
// 和groupByKey类似,只是它不需要必须是key-value的集合形式数据,它可以的单条数据进行聚合,(spark,CompactBuffer(spark, spark))
val groupRDD: RDD[(String, Iterable[String])] = value.groupBy(v => v)
// 获取value
val result: RDD[(String, Int)] = groupRDD.mapValues(
iter => iter.size
)
result.collect().foreach(println)
}
第七种:countByKey
// countByKey(行动算子)
def wordCount7(rdd: RDD[String]): Unit = {
val value: RDD[String] = rdd.flatMap(_.split(" "))
val mapRDD: RDD[(String, Int)] = value.map((_, 1))
// countByKey直接根据key(要去数据为key-value形式)进行统计数量(这是个行动算子,所以不需要collect来进行runjob)
val result: collection.Map[String, Long] = mapRDD.countByKey()
result.foreach(println)
}
第八种:countByValue
// countByValue(行动算子)
def wordCount8(rdd: RDD[String]): Unit = {
val value: RDD[String] = rdd.flatMap(_.split(" "))
// 直接对数据进行统计
val result: collection.Map[String, Long] = value.countByValue()
result.foreach(println)
}
更多推荐