创建spark上下文环境

def main(args: Array[String]): Unit = {


        val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("wc"))

        val rdd: RDD[String] = sc.makeRDD(List(
            "spark hello", "hive", "hadoop hbase", "spark hadoop", "hbase"
        ))
       sc.stop()

}

第一种:reduceBykey

    // reduceBykey
    def wordCount1(rdd: RDD[String]): Unit = {

        // 扁平化操作,拆分出数据
        val value: RDD[String] = rdd.flatMap(_.split(" "))

        // map转换为(key,1)
        val mapRDD: RDD[(String, Int)] = value.map((_, 1))

        // reduceByKey根据key进行聚合
        val result: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)

        result.collect().foreach(println)
    }

第二种:groupByKey

// groupByKey
def wordCount2(rdd: RDD[String]): Unit = {

        val value: RDD[String] = rdd.flatMap(_.split(" "))

        val mapRDD: RDD[(String, Int)] = value.map((_, 1))

        // 根据key进行分组(key,(1,1,......))
        val groupRDD: RDD[(String, Iterable[Int])] = mapRDD.groupByKey()

        // 取得集合的value值(这里是个迭代器,所以直接取迭代器的长度)
        val result: RDD[(String, Int)] = groupRDD.mapValues(
            iter => iter.size
        )

        result.collect().foreach(println)
    }

第三种:aggregateBykey

    // aggregateBykey
    def wordCount3(rdd: RDD[String]): Unit = {

        val value: RDD[String] = rdd.flatMap(_.split(" "))

        val mapRDD: RDD[(String, Int)] = value.map((_, 1))

        // zeroValue为指定的初始化值,所有的数据都会和这个进行对比操作,操作的逻辑分为分区内和分区间(wordcount就是求和的逻辑)
        val result: RDD[(String, Int)] = mapRDD.aggregateByKey(0)(_ + _, _ + _)

        result.collect().foreach(println)
    }

 第四种:foldByKey

    // foldByKey
    def wordCount4(rdd: RDD[String]): Unit = {

        val value: RDD[String] = rdd.flatMap(_.split(" "))

        val mapRDD: RDD[(String, Int)] = value.map((_, 1))

        // 当aggregateByKey的分区内和分区间的逻辑一致时就可以简写为foldByKey
        val result: RDD[(String, Int)] = mapRDD.foldByKey(0)(_ + _)

        result.collect().foreach(println)
    }

第五种:combineByKey

    // combineByKey
    def wordCount5(rdd: RDD[String]): Unit = {

        val value: RDD[String] = rdd.flatMap(_.split(" "))

        val mapRDD: RDD[(String, Int)] = value.map((_, 1))

        // combineByKey()的功能类似aggregateByKey的功能(分三个参数,第一个是对分区内第一个值进行操作,第二个是分区内操作逻辑,第三个是分区间操作逻辑),但是combineByKey()允许用户返回值的类型与输入不一致
        val result: RDD[(String, Int)] = mapRDD.combineByKey(
            v => v,
            (x, y) => x + y,
            (x, y) => x + y
        )

        result.collect().foreach(println)
    }

第六种:groupBy

    // groupBy
    def wordCount6(rdd: RDD[String]): Unit = {

        val value: RDD[String] = rdd.flatMap(_.split(" "))

        // 和groupByKey类似,只是它不需要必须是key-value的集合形式数据,它可以的单条数据进行聚合,(spark,CompactBuffer(spark, spark))
        val groupRDD: RDD[(String, Iterable[String])] = value.groupBy(v => v)

        // 获取value
        val result: RDD[(String, Int)] = groupRDD.mapValues(
            iter => iter.size
        )

        result.collect().foreach(println)
    }

第七种:countByKey

    // countByKey(行动算子)
    def wordCount7(rdd: RDD[String]): Unit = {

        val value: RDD[String] = rdd.flatMap(_.split(" "))

        val mapRDD: RDD[(String, Int)] = value.map((_, 1))

        // countByKey直接根据key(要去数据为key-value形式)进行统计数量(这是个行动算子,所以不需要collect来进行runjob)
        val result: collection.Map[String, Long] = mapRDD.countByKey()

        result.foreach(println)
    }

第八种:countByValue

    // countByValue(行动算子)
    def wordCount8(rdd: RDD[String]): Unit = {

        val value: RDD[String] = rdd.flatMap(_.split(" "))

        // 直接对数据进行统计
        val result: collection.Map[String, Long] = value.countByValue()

        result.foreach(println)
    }

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐