本文共 4591 字,大约阅读时间需要 15 分钟。
目标:如果用户A与用户C同时都跟B是好友,但用户A与用户C又不是好友,则向用户A推荐C,向用户C推荐A,同时说明A与C的共同好友有哪些
例如:
有如下的好友关系:
1 2,3,4,5,6,7,8
2 1,3,4,5,7
3 1,2
4 1,2,6
5 1,2
6 1,4
7 1,2
8 1
其中每一行空格前的元素为用户ID,空格后的元素为用户的好友ID列表
其对应的好友关系图为
期望输出为:
1
2 6(2:[4, 1]),8(1:[1]),
3 4(2:[1, 2]),5(2:[2, 1]),6(1:[1]),7(2:[1, 2]),8(1:[1]),
4 3(2:[2, 1]),5(2:[1, 2]),7(2:[1, 2]),8(1:[1]),
5 3(2:[2, 1]),4(2:[1, 2]),6(1:[1]),7(2:[1, 2]),8(1:[1]),
6 2(2:[1, 4]),3(1:[1]),5(1:[1]),7(1:[1]),8(1:[1]),
7 3(2:[1, 2]),4(2:[2, 1]),5(2:[2, 1]),6(1:[1]),8(1:[1]),
8 2(1:[1]),3(1:[1]),4(1:[1]),5(1:[1]),6(1:[1]),7(1:[1]),
即
对于用户1,因为它以及跟2,3,4,5,6,7,8都是好友,则不向其推荐任何好友
对于用户2,向其推荐6,因为2跟6可以通过4或者1认识;向其推荐8,因为2和8可以通过1认识
对于用户3,向其推荐4,因为3跟4可以通过1或者2认识;向其推荐5,因为3和5可以通过2或者1认识;向其推荐6,因为3和6可以通过1认识;向其推荐7,因为3和7可以通过1或者2认识;想起推荐8,因为3跟8可以通过1认识
...
思路:
对于每一行,例如4 1,2,6
map操作:
生成直接好友键值对(4,[1,-1]) (4,[2,-1]) (4,[6,-1])
生成间接好友键值对(1,[2,4]) (2,[1,4]) (1,[6,4]) (6,[1,4]) (2,[6,4]) (6,[2,4]]),其中(1,[2,4]),连接为向1推荐2,因为可以通过4认识,其他类似
reduce操作:
所有对于同一个用户的直接好友键值对和间接好友键值对能够到达同一个规约器
例如:对于用户4
key=4
以下键值对集合会到达同一个reduce
t2= FriendPair [user1=7, user2=1]
t2= FriendPair [user1=3, user2=2]
t2= FriendPair [user1=2, user2=-1]
t2= FriendPair [user1=6, user2=-1]
t2= FriendPair [user1=1, user2=2]
t2= FriendPair [user1=8, user2=1]
t2= FriendPair [user1=6, user2=1]
t2= FriendPair [user1=5, user2=1]
t2= FriendPair [user1=3, user2=1]
t2= FriendPair [user1=1, user2=6]
t2= FriendPair [user1=2, user2=1]
t2= FriendPair [user1=1, user2=-1]
t2= FriendPair [user1=7, user2=2]
t2= FriendPair [user1=5, user2=2]
对于用户4,维护一个Map<Long,List<Long>>,用来保存用户4的推荐好友以及跟该好友的共同好友列表
显然,对于4的直接好友:即user2为-1的,应该直接不对其推荐,只需要将<user1,null>放入Map中
对于4的间接好友,应该把推荐ID相同的记录的共同好友进行累加,如
t2= FriendPair [user1=3, user2=2]
t2= FriendPair [user1=3, user2=1]
则应将给用户4推荐的用户3的所有共同好友:用户2和用户1进行累加,将<3,[2,1]>放入Map中
import org.apache.spark.SparkConfimport org.apache.spark.SparkContextobject FriendRecommendation { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("FriendRecommendation").setMaster("local") val sc = new SparkContext(sparkConf) val input = "file:///media/chenjie/0009418200012FF3/ubuntu/friends2.txt" val output = "file:///media/chenjie/0009418200012FF3/ubuntu/friends2" val records = sc.textFile(input) val pairs = records.flatMap(line => { val tokens = line.split("\\s")//用空格隔开 val person = tokens(0).toLong val friends = tokens(1).split(",").map(_.toLong).toList val mapperOutput = friends.map(directFriend => (person, (directFriend, -1.toLong))) val result = for { fi <- friends fj <- friends possibleFriend1 = (fj, person) possibleFriend2 = (fi, person) if (fi != fj) } yield { (fi, possibleFriend1) :: (fj, possibleFriend2) :: List() } mapperOutput ::: result.flatten //flatten可以把嵌套的结构展开. //scala> List(List(1,2),List(3,4)).flatten //res0: List[Int] = List(1, 2, 3, 4) }) // // note that groupByKey() provides an expensive solution // [you must have enough memory/RAM to hold all values for // a given key -- otherwise you might get OOM error], but // combineByKey() and reduceByKey() will give a better // scale-out performance // val grouped = pairs.groupByKey() val result = grouped.mapValues(values => { val mutualFriends = new collection.mutable.HashMap[Long, List[Long]].empty values.foreach(t2 => { val toUser = t2._1 val mutualFriend = t2._2 val alreadyFriend = (mutualFriend == -1) if (mutualFriends.contains(toUser)) { if (alreadyFriend) { mutualFriends.put(toUser, List.empty) } else if (mutualFriends.get(toUser).isDefined && mutualFriends.get(toUser).get.size > 0 && !mutualFriends.get(toUser).get.contains(mutualFriend)) { val existingList = mutualFriends.get(toUser).get mutualFriends.put(toUser, (mutualFriend :: existingList)) } } else { if (alreadyFriend) { mutualFriends.put(toUser, List.empty) } else { mutualFriends.put(toUser, List(mutualFriend)) } } }) mutualFriends.filter(!_._2.isEmpty).toMap }) result.saveAsTextFile(output) // // formatting and printing it to console for debugging purposes... // result.foreach(f => { val friends = if (f._2.isEmpty) "" else { val items = f._2.map(tuple => (tuple._1, "(" + tuple._2.size + ": " + tuple._2.mkString("[", ",", "]") + ")")).map(g => "" + g._1 + " " + g._2) items.toList.mkString(",") } println(s"${f._1}: ${friends}") }) // done sc.stop(); }}