Spark程序中的shuffle操作非常耗时,在spark程序优化过程中会专门针对shuffle问题进行优化,从而减少不必要的shuffle操作,提高运行效率;但程序中有些逻辑操作必须有shuffle操作才能完成,常见的如groupByKey、reduceByKey操作等。上述两个算子是Spark处理(key,value)类型数据最常用到的函数,那么这两个算子有什么区别,在使用时该如何选择?下面从python源码的角度分析两者的区别及使用原则,具体如下所述:
在Spark程序中,两个算子均是实现(key,value)类型数据的重排及聚合操作,聚合过程包含两个阶段:分区内相同key值的数据聚合和分区间相同key值的数据聚合。
一、python源码中,两个算子的定义如下:
1、groupByKey
定义如下:
def groupByKey(self, numPartitions=None, partitionFunc=portable_hash): “”” Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with numPartitions partitions. Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will provide much better performance. >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> sorted(rdd.groupByKey().mapValues(len).collect()) [('a', 2), ('b', 1)] >>> sorted(rdd.groupByKey().mapValues(list).collect()) [('a', [1, 1]), ('b', [1])] """
根据定义可知:groupByKey()函数的功能是将RDD中相同key值的数据合并成一个序列,同时也特别提示,如果是进行求和或求平均值的操作,建议选择reduceByKey 或者 aggregateByKey函数(后面解释原因)。
2、reduceByKey
定义如下:
def reduceByKey(self, func, numPartitions=None, partitionFunc=portable_hash): “”” Merge the values for each key using an associative reduce function. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce. Output will be partitioned with C{numPartitions} partitions, or the default parallelism level if C{numPartitions} is not specified. Default partitioner is hash-partition. >>> from operator import add >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> sorted(rdd.reduceByKey(add).collect()) [('a', 2), ('b', 1)] """ return self.combineByKey(lambda x: x, func, func, numPartitions, partitionFunc)
可以看到,reduceByKey函数通过输入参数(函数)指定了的数据合并方式,且数据合并操作在分区内也可进行。函数内部调用了combineByKey函数,那么就先了解下combineByKey函数的主要功能。
3、combineByKey
定义如下:
def combineByKey(self, createCombiner, mergeValue, mergeCombiners, numPartitions=None, partitionFunc=portable_hash): “”” Generic function to combine the elements for each key using a custom set of aggregation functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C. Note that V and C can be different -- for example, one might group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]). Users provide three functions: - C{createCombiner}, which turns a V into a C (e.g., creates a one-element list) - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of a list) - C{mergeCombiners}, to combine two C's into a single one. In addition, users can control the partitioning of the output RDD. >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> def add(a, b): return a + str(b) >>> sorted(x.combineByKey(str, add, add).collect()) [('a', '11'), ('b', '1')] """
可以看到,函数调用时需要传入三个参数,因此理解combineByKey函数功能的核心是参数列表中的三个函数(createCombiner, mergeValue和mergeCombiners)的含义,具体如下所述:
(1)createCombiner :定义(key,value)聚合后,返回value的数据结构
如:createCombiner = (lambda x : [x])/ (lambda x : str(x))
(2)mergeValue : 定义分区内相同key值数据的聚合方法
如:mergeValue = (lambda xs,x: xs+str(s))
(3) mergeCombiners :定义分区间相同key值数据的聚合方法
(注:在某些操作条件下,mergeValue和mergeCombiners函数可以一致)
因此,combineByKey函数非常灵活,功能很强大, reduceByKey函数限定了其中两个参数,在使用时会存在一些功能限制,具体表现如下:
(1)(key,value)数据聚合后,value的数据类型不能变
(2)mergeValue 和mergeCombiners函数必须相同
二、groupByKey与reduceByKey的主要区别:
1、groupByKey算子的功能固定,只能输出相同key值的序列,reduceByKey适用于分组排序过程中有数据聚合操作(sum)的情形,在其他场景下可能不适用。
2、reduceByKey算子在分区内会进行数据聚合操作,因此针对有sum的数据聚合操作,效率会更高一些。(groupByKey算子也能实现类似sum的数据聚合操作,相当于进行groupByKey操作后还需进行map类算子的sum操作)