avatar

目录
Spark的WordCount到底有几个RDD

简介

本文转载自 https://blog.csdn.net/zhongqi2513/article/details/81513587

这样的一句标准的sparkcore的wordcount的代码到底能要产生几个RDD呢。相信大家对于一个标准的WordCount的代码一定不陌生:

scala
1
2
3
4
5
sc.textFile("hdfs://myha01/wc/input/words.txt")
.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.saveAsTextFile("hdfs://myha01/wc/output/")

这局代码:

1、开始使用了一个textFile用来读取数据的方法

2、中间使用了三个标准的RDD的操作算子:

flatMap(_.split(" ")) 负责把由每一行组成的RDD按照空格切开压平成标准的由单词组成的RDD

map((_,1))负责把每个单词word变成(word,1)每个单词出现一次

reduceByKey(_+_)负责把按照key相同也就是单词相同的key-value划分成一组,然后每一组做count聚合,最终就得出了输入文件中,每个单词出现了多少次。

3、最后,使用了一个saveAsTextFile的方法来存储数据

那到底这句代码中执行过程中,是不是刚好每个算子生成一个RDD呢? 很不幸,不是的。如果需要知晓答案,最好的方式,就是翻阅参与运算的每个算子到底做了什么事情。

解析

接下来是详细分析:

1、首先看sc.textFile(“hdfs://myha01/wc/input/words.txt”):textFile方法在SparkContext类中

接着看textFile中的hadoopFile方法的实现:

通过这个代码可以得知,在hadoopFile的内部产生了第一个RDD:HadoopRDD

接着回到textFile方法:

发现,其实返回的HadoopRDD又调用了map算子,看map算子的实现:

map算子的内部实现中,又创建了一个RDD,这就是第二个RDD: MapPartitionsRDD

那也就是说,textFile算子的最终返回值就是第二个RDD:MapPartitionsRDD

接着看:flatMap(_.split(“ “))算子的操作实现:flatMap算子在RDD中

所以flatMap(_.split(“ “))算子操作产生了第三个RDD:MapPartitionsRDD

接着看map((_,1))算子操作:map算子在RDD类中

map((_,1))算子的具体实现依然是简单的new MapPartitionRDD的方式生成第四个RDD:MapPartitionsRDD

接着看:reduceByKey(+)算子的具体实现:reduceByKey在PairRDDFunctions类中

跳到:

跳到:

到这个地方说明:reduceByKey算子的返回值其实是创建了第五个RDD:ShuffledRDD

接着看:saveAsTextFile(“hdfs://myha01/wc/output/“)算子的具体实现:saveAsTextFile算子在RDD类中

this.mapPartitions这句代码在调用的时候,在mapPartitions的内部,其实又创建了第六个RDD:MapPartitionRDD

接着回到:saveAsTextFile方法的实现,其实返现,最后一句话在调用中,也会生成一个RDD

这就是第七个RDD:MapPartitionRDD

到底为止,其他的地方,是没有再产生RDD的。

所以按照刚才的分析得出的最终结论是:

Code
1
2
3
4
5
6
7
第一个RDD:HadoopRDD
第二个RDD:MapPartitionsRDD
第三个RDD:MapPartitionsRDD
第四个RDD:MapPartitionsRDD
第五个RDD:ShuffledRDD
第六个RDD:MapPartitionRDD
第七个RDD:MapPartitionRDD

其实,在执行saveAsTextFile之前,我们可以通过RDD提供的toDebugString看到这些个算子在调用的时候到底产生了多少个RDD:

望各位仁兄牢记。如果不记得,请翻阅源码。本篇文章是基于最新的Spark-2.3.1的版本

小结

7个RDD,2+1+1+1+2

scala
1
2
3
4
5
sc.textFile("hdfs://myha01/wc/input/words.txt")
.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.saveAsTextFile("hdfs://myha01/wc/output/")
算子 产生的RDD
sc.textFile("hdfs://myha01/wc/input/words.txt") 第一个RDD:HadoopRDD
第二个RDD:MapPartitionsRDD
.flatMap(_.split(" ")) 第三个RDD:MapPartitionsRDD
.map((_,1)) 第四个RDD:MapPartitionsRDD
.reduceByKey(_+_) 第五个RDD:ShuffledRDD
.saveAsTextFile("hdfs://myha01/wc/output/") 第六个RDD:MapPartitionRDD
第七个RDD:MapPartitionRDD
文章作者: Yang4
文章链接: https://masteryang4.github.io/2020/06/16/Spark%E7%9A%84WordCount%E5%88%B0%E5%BA%95%E6%9C%89%E5%87%A0%E4%B8%AARDD/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 MasterYangBlog
打赏
  • 微信
    微信
  • 支付宝
    支付宝

评论