Spark程序中的shuffle操作非常耗时,在spark程序优化过程中会专门针对shuffle问题进行优化,从而减少不必要的shuffle操作,提高运行效率;但程序中有些逻辑操作必须有shuffle操作才能完成,常见的如groupByKey、reduceByKey操作等。上述两个算子是Spark处理(key,value)类型数据最常用到的函数,那么这两个算子有什么区别,在使用时该如何选择?下面从python源码的角度分析两者的区别及使用原则,具体如下所述:
在Spark程序中,两个算子均是实现(key,value)类型数据的重排及聚合操作,聚合过程包含两个阶段:分区内相同key值的数据聚合和分区间相同key值的数据聚合。
一、python源码中,两个算子的定义如下:

1、groupByKey
定义如下:

def groupByKey(self, numPartitions=None, partitionFunc=portable_hash):
“””
Group the values for each key in the RDD into a single sequence.
Hash-partitions the resulting RDD with numPartitions partitions.

Note: If you are grouping in order to perform an aggregation (such as a
sum or average) over each key, using reduceByKey or aggregateByKey will
provide much better performance.

>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.groupByKey().mapValues(len).collect())
[('a', 2), ('b', 1)]
>>> sorted(rdd.groupByKey().mapValues(list).collect())
[('a', [1, 1]), ('b', [1])]
"""

 

根据定义可知:groupByKey()函数的功能是将RDD中相同key值的数据合并成一个序列,同时也特别提示,如果是进行求和或求平均值的操作,建议选择reduceByKey 或者 aggregateByKey函数(后面解释原因)。

2、reduceByKey
定义如下:

def reduceByKey(self, func, numPartitions=None, partitionFunc=portable_hash):
“””
Merge the values for each key using an associative reduce function.

This will also perform the merging locally on each mapper before
sending results to a reducer, similarly to a "combiner" in MapReduce.

Output will be partitioned with C{numPartitions} partitions, or
the default parallelism level if C{numPartitions} is not specified.
Default partitioner is hash-partition.

>>> from operator import add
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.reduceByKey(add).collect())
[('a', 2), ('b', 1)]
"""
return self.combineByKey(lambda x: x, func, func, numPartitions, partitionFunc)

 

可以看到,reduceByKey函数通过输入参数(函数)指定了的数据合并方式,且数据合并操作在分区内也可进行。函数内部调用了combineByKey函数,那么就先了解下combineByKey函数的主要功能。

3、combineByKey
定义如下:

def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
numPartitions=None, partitionFunc=portable_hash):
“””
Generic function to combine the elements for each key using a custom
set of aggregation functions.

Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined
type" C. Note that V and C can be different -- for example, one might
group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]).

Users provide three functions:

- C{createCombiner}, which turns a V into a C (e.g., creates
a one-element list)
- C{mergeValue}, to merge a V into a C (e.g., adds it to the end of
a list)
- C{mergeCombiners}, to combine two C's into a single one.

In addition, users can control the partitioning of the output RDD.

>>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> def add(a, b): return a + str(b)
>>> sorted(x.combineByKey(str, add, add).collect())
[('a', '11'), ('b', '1')]
"""

 

可以看到,函数调用时需要传入三个参数,因此理解combineByKey函数功能的核心是参数列表中的三个函数(createCombiner, mergeValue和mergeCombiners)的含义,具体如下所述:
(1)createCombiner :定义(key,value)聚合后,返回value的数据结构
如:createCombiner = (lambda x : [x])/ (lambda x : str(x))
(2)mergeValue : 定义分区内相同key值数据的聚合方法
如:mergeValue = (lambda xs,x: xs+str(s))
(3) mergeCombiners :定义分区间相同key值数据的聚合方法
(注:在某些操作条件下,mergeValue和mergeCombiners函数可以一致)
因此,combineByKey函数非常灵活,功能很强大, reduceByKey函数限定了其中两个参数,在使用时会存在一些功能限制,具体表现如下:
(1)(key,value)数据聚合后,value的数据类型不能变
(2)mergeValue 和mergeCombiners函数必须相同

二、groupByKey与reduceByKey的主要区别:
1、groupByKey算子的功能固定,只能输出相同key值的序列,reduceByKey适用于分组排序过程中有数据聚合操作(sum)的情形,在其他场景下可能不适用。
2、reduceByKey算子在分区内会进行数据聚合操作,因此针对有sum的数据聚合操作,效率会更高一些。(groupByKey算子也能实现类似sum的数据聚合操作,相当于进行groupByKey操作后还需进行map类算子的sum操作)