“A great talker is a great liar.”
Spark数据分区
在Spark分布式程序中,通信的代价是很大的,因此控制数据分布一伙的最少的网络传输可以极大地提升整体性能。和单节点的程序需要为记录集合选择合适的数据结构一样,Spark程序可以通过控制RDD的分区方式来减少通信开销。分区并不是对所有应用都有好处的——比如,如果给定RDD只需要被扫描一次,我么完全没有必要对其预先进行分区处理。只有当数据集多次在诸如连接这种基于键的操作中使用时,分区才会有帮助。
分区神器:partitionBy
Spark中所有的键值对RDD都可以进行分区。系统会根据一个针对键的函数对元素进行分组。尽管Spark没有给出显式控制每个键具体落在哪一个工作节点上的方法(部分原因是Spark即使在某些节点失败时依然可以工作),但Spark可以确保同一组的键出现在同一个节点上。比如,你可能使用哈希分区将一个RDD分成了100个分区,此时键的哈希值对100取模的结果相同的记录会被放在一个节点上。你也可以使用范围分区法,将键在同一个范围区间内的记录都放在同一个节点上。
举个栗子说明为什么需要分区。现在我们有两个key-value文件,任务要求我们对他们进行一个join操作(需要根据key对数据进行分组)。这两个文件分别是一个user profile(用户画像,大文件)和一个user action(五分钟的用户行为,小文件),其格式为(userID, userInfo)和(userID, userAction),userInfo包含该用户订阅历史,userAction为该用户在一个网站上的操作记录。一个示例应用如下所示:
这段代码虽然可以运行,但是效率不是最高,我们可以对其进行一些优化。低效率的原因是函数processNewLogs包含join()操作,每次调用连接操作会将用到的键的哈希值都求出来,将该哈希值相同的记录通过网络传到同一台机器上,在这台机器上进行连接操作。每次join操作都会计算userData和event的键的哈希值并进行shuffle,虽然这些数据从来都不会变化。解决这个问题很简单:在读入userProfile时,使用partitionBy()转化操作,将这张表转化为哈希分区。
由于在构建userData时调用了partitionBy(),Spark就知道了该RDD根据键的哈希值来分区的,这样Spark就能够利用这一点,寻找到对应的RDD位于哪个分区。具体来说,当调用userData.join(events)时,Spark只会对events进行shuffle操作,把events中特定的userID的记录发送到userData的对应分区所在的那台机器上,程序间通信数据减少,效率大大滴提升了。 在这个问题中有几点注意问题:
- partitionBy()是transform operation
- partitionBy的参数100表示分区数目,这个值至少应该和集群中的总核心数(--total-executor-cores)相等
- partitionBy对RDD进行分区,这个分区方式需要被持久化才能被用于后面的RDD转化操作
获取RDD的分区方式
在Scala中,RDD的partitioner属性可以用来获取RDD的分区方式。它会返回一个scala.Option对象,这是Scala中用来存放可能存在的对象的容器类。如果存在值的话,这个值会是一个spark.Partitioner对象。这本质上是一个告诉我们RDD中各个键属于哪个分区的函数(但是函数细节对我们是透明的)。
在这段spark shell代码中,firstly,创建了一个没有分区方式信息的pairRDD,其partitioner方法返回None的Option对象。之后通过hashPartition的哈希方法给它一个哈希分区方式,使得Spark可以了解到这个RDD是如何存储到不同的节点上的。在实际应用中,通常还要再分区之后增加持久化操作,即persist()。
通过分区节省开销:PageRank
PageRank算法可以参考谷歌链接,这里只是简单介绍一下算法的数据集要求以及实施步骤: 算法会维护两个数据集:一个由(PageID, linkList)的元素组成,包含每个页面的相邻页面的列表;另一个由(pageID, rank)元素组成,包含每个页面的当前排序值。算法步骤如下:
- 将每个页面的排序值初始化为1.0
- 在每次迭代中,对页面p,向其每个相邻页面发送一个值为 rank(p)/numNeighbors(p) 的贡献
- 将每个页面的排序值设为0.15 + 0.85 * contributionReceived
使用Spark实现PageRank:
以上的PageRank算法通过不断更新ranks的值来达到收敛的目的,此处ranks被声明为可变变量。代码通过各种tricks来减小通信开销:
- links每次迭代都会与ranks进行join操作。由于links是一个静态数据集,在这个数据集的基础上进行join操作可以按照links的分区来进行,不需要做shuffle。通常links数据比ranks大很多,因为这个RDD存储的是页面链接的url,而ranks只是存储了一个Double型的页面得分,这个优化操作比不进行分区的实现和mapreduce实现节省了许多通信代价。
- persist方法把links保留在内存中,否则每次join操作用到links的时候都要重新计算一遍。
- 生成ranks的时候使用mapValues可以保留父RDD的分区方式,便于后续join,而且对第一次生成ranks也能有很大帮助。
- 循环体中的reduceByKey可以保留哈希分区的信息,之后接mapValues可以不修改分区信息,提高下一次join的效率。
- 为了减少数据在不同executor之间传输,尽量使用mapValues和flatMapValues,因为他们不改变键值。
引自:Spark快速大数据分析(Holden Karau)_4.5-数据分区