avatar

目录
[spark]十一种方式实现WordCount

[Spark]十一种方式实现WordCount

使用Spark中的11种方法实现经典的WordCount算法。

其中,10种SparkRDD(算子)+ 1种自定义累加器实现。

特朗普:没人比我更懂WordCount!(滑稽)

Why WordCount?

  • 大数据中最经典的算法,相当于编程语言中的“Hello World”。
  • 在大数据处理中,大多数复杂的问题通常被拆分成一个个小问题,这些小问题一般都是基于WordCount算法。所以,WordCount是重中之重,是大数据处理算法的基石。

10种Spark算子实现

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

/**
* spark-使用十种[算子]实现wordcount
*/
object RDDWordcount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sc = new SparkContext(sparkConf)
// val rdd = sc.textFile("input/wc.txt").flatMap(datas => {
// datas.split(" ")
// })
val rdd = sc.makeRDD(List("hadoop", "hello", "spark", "hello", "scala", "hello", "scala", "spark"))

println("=================1====================")

rdd.countByValue().foreach(println)

println("=================2====================")

rdd.map((_, 1)).countByKey().foreach(println)

println("=================3====================")

rdd.map((_, 1)).reduceByKey(_ + _).collect().foreach(println)

println("=================4====================")

rdd.map((_, 1)).groupByKey().mapValues(_.size).collect().foreach(println)

println("=================5====================")

rdd.map((_, 1)).aggregateByKey(0)(_ + _, _ + _).collect().foreach(println)

println("=================6====================")

rdd.map((_, 1)).foldByKey(0)(_ + _).collect().foreach(println)

println("=================7====================")

rdd.map((_, 1)).combineByKey(
(num: Int) => num,
(x: Int, y: Int) => {
x + y
},
(x: Int, y: Int) => {
x + y
}
).collect().foreach(println)

println("=================8====================")

rdd.map((_, 1)).groupBy(_._1).map(kv => {
(kv._1, kv._2.size)
}).collect().foreach(println)

println("=================9====================")

rdd.aggregate(mutable.Map[String, Int]())(
(map, word) => {
map(word) = map.getOrElse(word, 0) + 1
map
},
(map1, map2) => {
map1.foldLeft(map2)(
(finalMap, kv) => {
finalMap(kv._1) = finalMap.getOrElse(kv._1, 0) + kv._2
finalMap
}
)
}
).foreach(println)

println("=================10====================")

rdd.map(s => mutable.Map(s -> 1)).fold(mutable.Map[String, Int]())(
(map1, map2) => {
map1.foldLeft(map2)(
(finalMap, kv) => {
finalMap(kv._1) = finalMap.getOrElse(kv._1, 0) + kv._2
finalMap
}
)
}
).foreach(println)

sc.stop()
}
}

输出结果:

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
=================1====================
(hello,3)
(spark,2)
(hadoop,1)
(scala,2)
=================2====================
(hello,3)
(spark,2)
(hadoop,1)
(scala,2)
=================3====================
(hello,3)
(spark,2)
(hadoop,1)
(scala,2)
=================4====================
(hello,3)
(spark,2)
(hadoop,1)
(scala,2)
=================5====================
(hello,3)
(spark,2)
(hadoop,1)
(scala,2)
=================6====================
(hello,3)
(spark,2)
(hadoop,1)
(scala,2)
=================7====================
(hello,3)
(spark,2)
(hadoop,1)
(scala,2)
=================8====================
(hello,3)
(spark,2)
(hadoop,1)
(scala,2)
=================9====================
(hadoop,1)
(spark,2)
(scala,2)
(hello,3)
=================10====================
(hadoop,1)
(spark,2)
(scala,2)
(hello,3)

自定义累加器实现

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

object MyAccTest {

def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf().setAppName("acc").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)

// TODO Spark - 自定义累加器 - wordcount
// 累加器可以不使用shuffle就完成数据的聚合功能
val rdd: RDD[String] = sc.makeRDD(List("hadoop spark", "hello", "spark", "hello", "scala", "hello", "scala", "spark"))

// TODO 1. 创建累加器
val acc = new WordCountAccumulator

// TODO 2. 向Spark注册累加器
sc.register(acc, "wordcount")

// TODO 3. 使用累加器
rdd.foreach(
words => {
val ws = words.split(" ")
ws.foreach(
word => {
acc.add(word)
}
)

}
)

println(acc.value) //Map(hadoop -> 1, spark -> 3, scala -> 2, hello -> 3)

sc.stop()

}

// 自定义累加器 Map{(Word - Count), (Word - Count)}
// 1, 继承AccumulatorV2, 定义泛型
// IN : 向累加器传递的值的类型 , Out : 累加器的返回结果类型
// 2. 重写方法
class WordCountAccumulator extends AccumulatorV2[String, mutable.Map[String, Int]] {

var innerMap = mutable.Map[String, Int]()

// TODO 累加器是否初始化
// Z
override def isZero: Boolean = innerMap.isEmpty

// TODO 复制累加器
override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = {
new WordCountAccumulator
}

// TODO 重置累加器
override def reset(): Unit = {
innerMap.clear()
}

// TODO 累加数据
override def add(word: String): Unit = {
val cnt = innerMap.getOrElse(word, 0)
innerMap.update(word, cnt + 1)
}

// TODO 合并累加器
override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {
// 两个Map的合并
var map1 = this.innerMap
var map2 = other.value

innerMap = map1.foldLeft(map2)(
(map, kv) => {
val k = kv._1
val v = kv._2
map(k) = map.getOrElse(k, 0) + v
map
}
)
}

// TODO 获取累加器的值,就是累加器的返回结果
override def value: mutable.Map[String, Int] = innerMap
}

}

输出结果:

scala
1
Map(spark -> 3, hadoop -> 1, scala -> 2, hello -> 3)
文章作者: Yang4
文章链接: https://masteryang4.github.io/2020/04/27/spark-%E5%8D%81%E4%B8%80%E7%A7%8D%E6%96%B9%E5%BC%8F%E5%AE%9E%E7%8E%B0WordCount/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 MasterYangBlog
打赏
  • 微信
    微信
  • 支付宝
    支付宝

评论