云计算 - 3 - 使用MapReduce处理数据
1.使用 MapReduce 实现对多个文本文件单词总数的统计(WordCount)。2.使用 MapReduce 实现社交网站好友的推荐。
·
云计算 - 3 - 使用MapReduce处理数据
目标
1.使用 MapReduce 实现对多个文本文件单词总数的统计(WordCount)。
2.使用 MapReduce 实现社交网站好友的推荐。
1、使用Mapreduce实现对多个文本文件单词总数的统计(WordCount)。
1.1 启动Hadoop
使用start-dfs.sh
指令启动 hadoop。
1.2 在 HDFS 文件系统 创建文件夹来作为单词统计的输入
1.3 将用来统计的文件上传到刚建立的文件夹中。
这里选取 hadoop 本身的两个txt文件。
使用 hadoop fs -put 文件名 -input
将两个文件都上传。
1.4 使用 hadoop jar
命令,调用 jar 包,对 /input 文件夹进行单词统计
hadoop jar /home/lucky/hadoop/hadoop-2.6.0/share/hadoop/mapreduce/hadoop-mapreduce -examples-2.6.0.jar wordcount /input /output
1.5 查看输出结果,实现了对多文件的字词统计
查看指令如图中所示。
运行结果有多页,这里展示一页:
2、使用 MapReduce 实现社交网站好友的推荐。
2.1 问题分析
好友推荐功能简单的说是这样一个需求:
预测某两个人是否认识,并推荐为好友,并且某两个非好友的用户,他们的共同好友越多,那么他们越可能认识。
以 QQ 好友举例,顶点 A、B、C 到 G 分别是 QQ 用户,两顶点之间的边表示两顶点代表的用户之间相互关注。比如,B、G 有共同好友 A,应该推荐 B、G 认识,而 D、F 有两个共同好友 C、E,那么更加应该推荐 D、F 认识。
因此也可得到输入如下:
2.2 编写推荐代码
使用命令vi FriendRecommenda.scala
开始编写好友推荐程序代码如下:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object FriendRecommendation {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("FriendRecommendation").setMaster("local")
val sc = new SparkContext(sparkConf)
val input = "hdfs://NameNode-1:9000//LiveJournal.txt"
val output = "hdfs://NameNode-1:9000//friendsRecommend"
val records = sc.textFile(input)
val pairs = records.flatMap(line => {
val tokens = line.split(" ")
val person = tokens(0).toLong
val friends = tokens(1).split(",").map(_.toLong).toList
val mapperOutput = friends.map(directFriend => (person, (directFriend, -1.toLong)))
val result = for {
fi <- friends
fj <- friends
possibleFriend1 = (fj, person)
possibleFriend2 = (fi, person)
if (fi != fj)
} yield {
(fi, possibleFriend1) :: (fj, possibleFriend2) :: List()
}
mapperOutput ::: result.flatten
})
val grouped = pairs.groupByKey()
val result = grouped.mapValues(values => {
val mutualFriends = new collection.mutable.HashMap[Long, List[Long]].empty
values.foreach(t2 => {
val toUser = t2._1
val mutualFriend = t2._2
val alreadyFriend = (mutualFriend == -1)
if (mutualFriends.contains(toUser)) {
if (alreadyFriend) {
mutualFriends.put(toUser, List.empty)
} else if (mutualFriends.get(toUser).isDefined && mutualFriends.get(toUser).get.size > 0 && !mutualFriends.get(toUser).get.contains(mutualFriend)) {
val existingList = mutualFriends.get(toUser).get
mutualFriends.put(toUser, (mutualFriend :: existingList))
}
} else {
if (alreadyFriend) {
mutualFriends.put(toUser, List.empty)
} else {
mutualFriends.put(toUser, List(mutualFriend))
}
}
})
mutualFriends.filter(!_._2.isEmpty).toMap
})
result.saveAsTextFile(output)
result.foreach(f => {
val friends = if (f._2.isEmpty) "" else {
val items = f._2.map(tuple => (tuple._1, ":" + tuple._2.size)).toSeq.sortBy(_._2).reverse.map(g => "" + g._1 + " " + g._2)
items.toList.mkString(", ")
}
println(s"${f._1}: ${friends}")
})
// done
sc.stop();
}
}
2.3 程序运行
2.3.1 首先将上面写的 .scala 文件编译
scalac -cp /home/spark/spark-1.6/lib/spark-assembly-1.6.1-hadoop2.6.0. jar ../FriendRecommendation.scala
2.3.2 打包 jar 包
echo Main-class: FriendRecommendation > manifest.txt
jar cvfm FriendRecommendation.jar manifest.txt *
2.3.3 将 jar 包提交运行
/home/spark/spark-1.6/bin/spark-submit --class "FriendRecommendation" --master local[4] FriendRecommendation.jar
得到好友推荐结果如下:
遇到的问题
实验中存在的问题在于,当我使用一个较短的文本进行好友推荐时,可以正常输出结果,而当我使用一个 LiveJournal.txt 文件作为输入时,
出现如下所示的错误:
即文本太长导致了数组越界的问题。
更多推荐
已为社区贡献3条内容
所有评论(0)