看到了一段非常有趣的关于spark中aggregate算子的代码,需要很细心才能给出正确答案。
在这里和大家分享。
代码示例
scala
1 | import org.apache.spark.{SparkConf, SparkContext} |
前方高能
输出结果1
Code
1 | 043 |
输出结果2
Code
1 | 034 |
惊不惊喜,刺不刺激(手动狗头)。
解析
aggregate:行动算子,意为【聚合】
函数签名
def aggregate
[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
函数说明
分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合
- 第一个括号内的参数为初始值
- 第二个括号中
- 第一个参数为分区内要执行的函数,初始值和分区内元素依次聚合
- 第二个参数为分区间要执行的函数,初始值和分区间元素依次聚合
代码详解:
scala
1 | val rdd = sc.makeRDD(Array("12", "234", "345", "4567"), 2) |
scala
1 | rdd.aggregate("0")((a, b) => Math.max(a.length, b.length).toString, (x, y) => x + y) |
首先注意rdd是两个分区,”12”, “234”一个分区,”345”, “4567”一个分区
执行分区内函数
Math.max(a.length, b.length).toString- 分区一
- “0”,“12”执行函数,输出“2”,【注意:函数后面有个toString】【聚合:上一步输出作为下一步输入】
- “2”,”234”执行函数,最终输出“3”
- 分区二
- “0”,“345” => “3”
- “3”,”4567” => 最终 “4”
- 分区一
执行分区间函数
(x, y) => x + y,其实就是一个字符串拼接,但是因为分区的原因- 不一定哪个分区先执行完,所以会出现两种情况的字符串拼接:“034” or “043”
scala
1 | rdd.aggregate("")((a, b) => Math.min(a.length, b.length).toString, (x, y) => x + y) |
rdd是两个分区,”12”, “234”一个分区,”345”, “4567”一个分区
执行分区内函数
Math.min(a.length, b.length).toString- 分区一
- “”,“12”执行函数,输出“0”,【注意:函数后面有个toString】【聚合:上一步输出作为下一步输入】
- “0”,”234”执行函数,最终输出“1”,【注意:“0”的长度是1】
- 分区二
- “”,“345” => “0”
- “0”,”4567” => 最终 “1”
- 分区一
执行分区间函数
(x, y) => x + y,字符串拼接,“”+“1”+“1” => “11”







