avatar

目录
spark系列之spark基础

概述

  • Spark Core

Spark Core中提供了Spark最基础与最核心的功能,Spark其他的功能如:Spark SQL,Spark Streaming,GraphX, MLlib都是在Spark Core的基础上进行扩展的

  • Spark SQL

Spark SQL是Spark用来操作结构化数据的组件。通过Spark SQL,用户可以使用SQL或者Apache Hive版本的SQL方言(HQL)来查询数据。

  • Spark Streaming

Spark Streaming是Spark平台上针对实时数据进行流式计算的组件,提供了丰富的处理数据流的API。

  • Spark MLlib

MLlib是Spark提供的一个机器学习算法库。MLlib不仅提供了模型评估、数据导入等额外的功能,还提供了一些更底层的机器学习原语。

  • Spark GraphX

GraphX是Spark面向图计算提供的框架与算法库。

入门

xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 该插件用于将Scala代码编译成class文件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<!-- 声明绑定到maven的compile阶段 -->
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

WordCount

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 创建Spark运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")

// 创建Spark上下文环境对象(连接对象)
val sc : SparkContext = new SparkContext(sparkConf)

// 读取文件数据
val fileRDD: RDD[String] = sc.textFile("input/word.txt")

// 将文件中的数据进行分词
val wordRDD: RDD[String] = fileRDD.flatMap( _.split(" ") )

// 转换数据结构 word => (word, 1)
val word2OneRDD: RDD[(String, Int)] = wordRDD.map((_,1))

// 将转换结构后的数据按照相同的单词进行分组聚合
val word2CountRDD: RDD[(String, Int)] = word2OneRDD.reduceByKey(_+_)

// 将数据聚合结果采集到内存中
val word2Count: Array[(String, Int)] = word2CountRDD.collect()

// 打印结果
word2Count.foreach(println)

//关闭Spark连接
sc.stop()

spark运行环境

local

所谓的Local模式,就是不需要其他任何节点资源就可以在本地执行Spark代码的环境,一般用于教学,调试,演示等,

standalone

local本地模式毕竟只是用来进行练习演示的,真实工作中还是要将应用提交到对应的集群中去执行,这里我们来看看只使用Spark自身节点运行的集群模式,也就是我们所谓的独立部署(Standalone)模式。Spark的Standalone模式体现了经典的master-slave模式。

集群规划:

Linux1 Linux2 Linux3
Spark Worker Master Worker Worker

yarn

独立部署(Standalone)模式由Spark自身提供计算资源,无需其他框架提供资源。这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但是你也要记住,Spark主要是计算框架,而不是资源调度框架,所以本身提供的资源调度并不是它的强项,所以还是和其他专业的资源调度框架集成会更靠谱一些。所以接下来我们来学习在强大的Yarn环境下Spark是如何工作的(其实是因为在国内工作中,Yarn使用的非常多)。

k8s & Mesos

Mesos是Apache下的开源分布式资源管理框架,它被称为是分布式系统的内核,在Twitter得到广泛使用,管理着Twitter超过30,0000台服务器上的应用部署,但是在国内,依然使用着传统的Hadoop大数据框架,所以国内使用Mesos框架的并不多,但是原理其实都差不多。

windows

一般教学演示使用。

对比

模式 Spark安装机器数 需启动的进程 所属者 应用场景
Local 1 Spark 测试
Standalone 3 Master及Worker Spark 单独部署
Yarn 1 Yarn及HDFS Hadoop 混合部署

端口号

  • Spark查看当前Spark-shell运行任务情况端口号:4040(计算)

  • Spark Master内部通信服务端口号:7077

  • Standalone模式下,Spark Master Web端口号:8080(资源)

  • Spark历史服务器端口号:18080

  • Hadoop YARN任务运行情况查看端口号:8088

spark运行架构

核心组件

Spark框架有两个核心组件:

1、Driver

Spark驱动器节点,用于执行Spark任务中的main方法,负责实际代码的执行工作。Driver在Spark作业执行时主要负责:

  • 将用户程序转化为作业(job)

  • 在Executor之间调度任务(task)

  • 跟踪Executor的执行情况

  • 通过UI展示查询运行情况

实际上,我们无法准确地描述Driver的定义,因为在整个的编程过程中没有看到任何有关Driver的字眼。所以简单理解,所谓的Driver就是驱使整个应用运行起来的程序,也称之为Driver类。

2、Executor

Spark Executor是集群中工作节点(Worker)中的一个JVM进程,负责在 Spark 作业中运行具体任务(Task),任务彼此之间相互独立。Spark 应用启动时,Executor节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。如果有Executor节点发生了故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他Executor节点上继续运行。

Executor有两个核心功能:

  • 负责运行组成Spark应用的任务,并将结果返回给驱动器进程

  • 它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。RDD 是直接缓存在Executor进程内的,因此任务可以在运行时充分利用缓存数据加速运算。

3、ApplicationMaster

Hadoop用户向YARN集群提交应用程序时,提交程序中应该包含ApplicationMaster,用于向资源调度器申请执行任务的资源容器Container,运行用户自己的程序任务job,监控整个任务的执行,跟踪整个任务的状态,处理任务失败等异常情况。

说的简单点就是,RM(资源)和Driver(计算)之间的解耦合靠的就是ApplicationMaster。

提交流程

Spark应用程序提交到Yarn环境中执行的时候,一般会有两种部署执行的方式:Client和Cluster。

两种模式,主要区别在于:Driver程序的运行节点。

Yarn Client模式

Client模式将用于监控和调度的Driver模块在客户端执行,而不是Yarn中,所以一般用于测试。

  • Driver在任务提交的本地机器上运行

  • Driver启动后会和ResourceManager通讯申请启动ApplicationMaster

  • ResourceManager分配container,在合适的NodeManager上启动ApplicationMaster,负责向ResourceManager申请Executor内存

  • ResourceManager接到ApplicationMaster的资源申请后会分配container,然后ApplicationMaster在资源分配指定的NodeManager上启动Executor进程

  • Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main函数

  • 之后执行到Action算子时,触发一个Job,并根据宽依赖开始划分stage,每个stage生成对应的TaskSet,之后将task分发到各个Executor上执行。

Yarn Cluster模式

Cluster模式将用于监控和调度的Driver模块启动在Yarn集群资源中执行。一般应用于实际生产环境。

  • 在YARN Cluster模式下,任务提交后会和ResourceManager通讯申请启动ApplicationMaster,

  • 随后ResourceManager分配container,在合适的NodeManager上启动ApplicationMaster,此时的ApplicationMaster就是Driver。

  • Driver启动后向ResourceManager申请Executor内存,ResourceManager接到ApplicationMaster的资源申请后会分配container,然后在合适的NodeManager上启动Executor进程

  • Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main函数,

  • 之后执行到Action算子时,触发一个Job,并根据宽依赖开始划分stage,每个stage生成对应的TaskSet,之后将task分发到各个Executor上执行。

spark核心编程

Spark计算框架为了能够对数据进行高并发和高吞吐的处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是:

  • RDD : 弹性分布式数据集

  • 累加器:分布式共享只写变量

  • 广播变量:分布式共享只读变量

RDD

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

数据集:RDD封装了计算逻辑,并不保存数据

RDD并行度与分区

默认情况下,Spark可以切分任务,并将任务发送给Executor节点并行计算,而这个并行计算的任务数量我们称之为并行度。这个数量可以在构建RDD时指定。

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
val sparkConf =
new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val dataRDD: RDD[Int] =
sparkContext.makeRDD(
List(1,2,3,4),
4)
val fileRDD: RDD[String] =
sparkContext.textFile(
"input",
2)
fileRDD.collect().foreach(println)
sparkContext.stop()

读取内存数据时,数据可以按照并行度的设定进行数据的分区操作,数据分区规则的Spark源码如下

scala
1
2
3
4
5
6
7
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map { i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}
}

读取文件数据时,数据是按照Hadoop文件读取的规则进行切片分区,而切片规则和数据读取的规则有些差异,具体Spark源码如下

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {

long totalSize = 0; // compute total size
for (FileStatus file: files) { // check we have valid files
if (file.isDirectory()) {
throw new IOException("Not a file: "+ file.getPath());
}
totalSize += file.getLen();
}

long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

...

for (FileStatus file: files) {

...

if (isSplitable(fs, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(goalSize, minSize, blockSize);

...

}
protected long computeSplitSize(long goalSize, long minSize,
long blockSize) {
return Math.max(minSize, Math.min(goalSize, blockSize));
}

RDD创建

1、从集合(内存)中创建RDD

parallelize和makeRDD

scala
1
2
3
4
5
6
7
8
9
10
11
12
val sparkConf =
new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val rdd1 = sparkContext.parallelize(
List(1,2,3,4)
)
val rdd2 = sparkContext.makeRDD(
List(1,2,3,4)
)
rdd1.collect().foreach(println)
rdd2.collect().foreach(println)
sparkContext.stop()

makeRDD方法其实就是parallelize方法

scala
1
2
3
4
5
def makeRDD[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
parallelize(seq, numSlices)
}

2、从外部存储(文件)创建RDD

由外部存储系统的数据集创建RDD包括:本地的文件系统,所有Hadoop支持的数据集,比如HDFS、HBase等。

scala
1
2
3
4
5
6
val sparkConf =
new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val fileRDD: RDD[String] = sparkContext.textFile("input")
fileRDD.collect().foreach(println)
sparkContext.stop()

3、从其他RDD创建

主要是通过一个RDD运算完后,再产生新的RDD。

4、直接创建RDD(new)

使用new的方式直接构造RDD

转换算子

RDD整体上分为Value类型、双Value类型和Key-Value类型

value类型

map

mapPartitions

mapPartitionsWithIndex

flatMap

glom

  • 将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变

groupBy

filter

  • 将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃

sample

  • 根据指定的规则从数据集中抽取数据

distinct

coalesce

  • 根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率

repartition

小问题:coalesce和repartition区别?

repartition算子其实底层调用的就是coalesce算子,只不过固定使用了shuffle的操作,可以让数据更均衡一下,可以有效防止数据倾斜问题。

如果缩减分区,一般就采用coalesce,如果想扩大分区,就采用repartition

sortBy

pipe

  • 管道,针对每个分区,都调用一次shell脚本,返回输出的RDD。

双Value类型

intersection

  • 对源RDD和参数RDD求交集后返回一个新的RDD

union

  • 对源RDD和参数RDD求并集后返回一个新的RDD

subtract

  • 以一个RDD元素为主,去除两个RDD中重复元素,将其他元素保留下来。求差集

zip

  • 将两个RDD中的元素,以键值对的形式进行合并。其中,键值对中的Key为第1个RDD中的元素,Value为第2个RDD中的元素。

Key - Value类型

partitionBy

  • 将数据按照指定Partitioner重新进行分区。Spark默认的分区器是HashPartitioner

reduceByKey

groupByKey

reduceByKey和groupByKey的区别?

两个算子没有使用上的区别。所以使用的时候需要根据应用场景来选择。

从性能上考虑,reduceByKey存在预聚合功能,这样,在shuffle的过程中,落盘的数据量会变少,所以读写磁盘的速度会变快。性能更高

aggregateByKey

  • 将数据根据不同的规则进行分区内计算和分区间计算
  • dataRDD1.aggregateByKey(0)(_+_,_+_)

foldByKey

  • 当分区内计算规则和分区间计算规则相同时,aggregateByKey就可以简化为foldByKey
  • dataRDD1.foldByKey(0)(_+_)

combineByKey

  • 最通用的对key-value型rdd进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致。

reduceByKey、foldByKey、aggregateByKey、combineByKey的区别?

从源码的角度来讲,四个算子的底层逻辑是相同的。

aggregateByKey的算子会将初始值和第一个value使用分区内的计算规则进行计算

foldByKey的算子的分区内和分区间的计算规则相同,并且初始值和第一个value使用的规则相同

combineByKey第一个参数就是对第一个value进行处理,所以无需初始值。

reduceByKey不会对第一个value进行处理,分区内和分区间计算规则相同

上面的四个算子都支持预聚合功能。所以shuffle性能比较高

上面的四个算子都可以实现WordCount

sortByKey

join

leftOuterJoin

cogroup

  • 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD

行动算子

reduce

  • 聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据

collect

  • 在驱动程序中,以数组Array的形式返回数据集的所有元素

count

first

take

takeOrdered

aggregate

  • 分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合

fold

  • 折叠操作,aggregate的简化版操作

countByKey

save相关算子

scala
1
2
3
4
5
6
7
8
// 保存成Text文件
rdd.saveAsTextFile("output")

// 序列化成对象保存到文件
rdd.saveAsObjectFile("output1")

// 保存成Sequencefile文件
rdd.map((_,1)).saveAsSequenceFile("output2")

foreach

  • 分布式遍历RDD中的每一个元素,调用指定函数

RDD序列化

1) 闭包检查

从计算的角度, 算子以外的代码都是在Driver端执行, 算子里面的代码都是在Executor端执行。那么在scala的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给Executor端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测。

2) Kryo序列化框架

参考地址: https://github.com/EsotericSoftware/kryo

Java的序列化能够序列化任何的类。但是比较重,序列化后,对象的提交也比较大。

Spark出于性能的考虑,Spark2.0开始支持另外一种Kryo序列化机制。Kryo速度是Serializable的10倍。当RDD在Shuffle数据的时候,简单数据类型、数组和字符串类型已经在Spark内部使用Kryo来序列化。

RDD依赖关系

1、RDD血缘关系

RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
val fileRDD: RDD[String] = sc.textFile("input/1.txt")
println(fileRDD.toDebugString)
println("----------------------")

val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))
println(wordRDD.toDebugString)
println("----------------------")

val mapRDD: RDD[(String, Int)] = wordRDD.map((_,1))
println(mapRDD.toDebugString)
println("----------------------")

val resultRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_+_)
println(resultRDD.toDebugString)

resultRDD.collect()

2、RDD依赖关系

这里所谓的依赖关系,其实就是RDD之间的关系

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
val sc: SparkContext = new SparkContext(conf)

val fileRDD: RDD[String] = sc.textFile("input/1.txt")
println(fileRDD.dependencies)
println("----------------------")

val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))
println(wordRDD.dependencies)
println("----------------------")

val mapRDD: RDD[(String, Int)] = wordRDD.map((_,1))
println(mapRDD.dependencies)
println("----------------------")

val resultRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_+_)
println(resultRDD.dependencies)

resultRDD.collect()

3、RDD窄依赖

窄依赖表示每一个父RDD的Partition最多被子RDD的一个Partition使用,窄依赖我们形象的比喻为独生子女。

4、RDD宽依赖

宽依赖表示同一个父RDD的Partition被多个子RDD的Partition依赖,会引起Shuffle,总结:宽依赖我们形象的比喻为超生。

5、RDD任务划分

RDD任务切分中间分为:Application、Job、Stage和Task

  • Application:初始化一个SparkContext即生成一个Application;

  • Job:一个Action算子就会生成一个Job;

  • Stage:Stage等于宽依赖(ShuffleDependency)的个数加1;

  • Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数。

注意:Application->Job->Stage->Task每一层都是1对n的关系。

RDD持久化

1、RDD Cache缓存

RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据以序列化的形式缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的action算子时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

scala
1
2
3
4
5
6
7
8
// cache操作会增加血缘关系,不改变原有的血缘关系
println(wordToOneRdd.toDebugString)

// 数据缓存。
wordToOneRdd.cache()

// 可以更改存储级别
//mapRdd.persist(StorageLevel.MEMORY_AND_DISK_2)

存储级别

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。

Spark会自动对一些Shuffle操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点Shuffle失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用persist或cache。

2、RDD CheckPoint检查点

所谓的检查点其实就是通过将RDD中间结果写入磁盘

由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。

对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 设置检查点路径
sc.setCheckpointDir("./checkpoint1")

// 创建一个RDD,读取指定位置文件:hello ys ys
val lineRdd: RDD[String] = sc.textFile("input/1.txt")

// 业务逻辑
val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))

val wordToOneRdd: RDD[(String, Long)] = wordRdd.map {
word => {
(word, System.currentTimeMillis())
}
}

// 增加缓存,避免再重新跑一个job做checkpoint
wordToOneRdd.cache()
// 数据检查点:针对wordToOneRdd做检查点计算
wordToOneRdd.checkpoint()

// 触发执行逻辑
wordToOneRdd.collect().foreach(println)

缓存和检查点区别

Code
1
2
3
4
5
1)Cache缓存只是将数据保存起来,不切断血缘依赖。Checkpoint检查点切断血缘依赖。

2)Cache缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint的数据通常存储在HDFS等容错、高可用的文件系统,可靠性高。

3)建议对checkpoint()的RDD使用Cache缓存,这样checkpoint的job只需从Cache缓存中读取数据即可,否则需要再从头计算一次RDD。
scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.ysss.bigdata.spark.core.cache

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark02_Checkpoint {

def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
// 设置检查点路径, 一般路径应该为分布式存储路径,HDFS
sc.setCheckpointDir("cp")

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))

// TODO 检查点

// RDD的持久化可能会导致数据丢失,如果数据丢失,那么需要重新再次计算,性能不高
// 所以如果能够保证数据不丢,那么是一个好的选择
// 可以将数据保存到检查点中,这样是分布式存储,所以比较安全。
// 所以将数据保存到检查点前,需要设定检查点路径
val rdd1 = rdd.map(
num => {
//println("num.....")
num
}
)

// 检查点
// 检查点为了准确,需要重头再执行一遍,就等同于开启一个新的作业
// 为了提高效率,一般情况下,是先使用cache后在使用检查点

// 检查点会切断RDD的血缘关系。将当前检查点当成数据计算的起点。
// 持久化操作是不能切断血缘关系,因为一旦内存中数据丢失,无法恢复数据
val rdd2: RDD[Int] = rdd1.cache()
rdd2.checkpoint()
println(rdd2.toDebugString)
println(rdd2.collect().mkString(","))
println(rdd2.toDebugString)
println("**********************")
println(rdd2.collect().mkString(","))

sc.stop()
}
}

RDD分区器

Spark目前支持Hash分区和Range分区,和用户自定义分区。

Hash分区为当前的默认分区。

分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle后进入哪个分区,进而决定了Reduce的个数。

  • 只有Key-Value类型的RDD才有分区器,非Key-Value类型的RDD分区的值是None

  • 每个RDD的分区ID范围:0 ~ (numPartitions - 1),决定这个值是属于那个分区的。

1) Hash分区:对于给定的key,计算其hashCode,并除以分区个数取余

2) Range分区:将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀,而且分区间有序

文件读取与保存

Spark的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。

文件格式分为:text文件、json文件、csv文件、sequence文件以及Object文件;

文件系统分为:本地文件系统、HDFS、HBASE以及数据库。

累加器

累加器用来把Executor端变量信息聚合到Driver端。

在Driver程序中定义的变量,在Executor端的每个Task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge。

系统累加器

scala
1
2
3
4
5
6
7
8
9
10
11
val rdd = sc.makeRDD(List(1,2,3,4,5))
// 声明累加器
var sum = sc.longAccumulator("sum");
rdd.foreach(
num => {
// 使用累加器
sum.add(num)
}
)
// 获取累加器的值
println("sum = " + sum.value)

自定义累加器

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// 自定义累加器
// 1. 继承AccumulatorV2,并设定泛型
// 2. 重写累加器的抽象方法
class WordCountAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]]{

var map : mutable.Map[String, Long] = mutable.Map()

// 累加器是否为初始状态
override def isZero: Boolean = {
map.isEmpty
}

// 复制累加器
override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
new WordCountAccumulator
}

// 重置累加器
override def reset(): Unit = {
map.clear()
}

// 向累加器中增加数据 (In)
override def add(word: String): Unit = {
// 查询map中是否存在相同的单词
// 如果有相同的单词,那么单词的数量加1
// 如果没有相同的单词,那么在map中增加这个单词
map(word) = map.getOrElse(word, 0L) + 1L
}

// 合并累加器
override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {

val map1 = map
val map2 = other.value

// 两个Map的合并
map = map1.foldLeft(map2)(
( innerMap, kv ) => {
innerMap(kv._1) = innerMap.getOrElse(kv._1, 0L) + kv._2
innerMap
}
)
}

// 返回累加器的结果 (Out)
override def value: mutable.Map[String, Long] = map
}

广播变量

广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个Spark操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,广播变量用起来都很顺手。在多个并行操作中使用同一个变量,但是 Spark会为每个任务分别发送。

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
val rdd1 = sc.makeRDD(List( ("a",1), ("b", 2), ("c", 3), ("d", 4) ),4)
val list = List( ("a",4), ("b", 5), ("c", 6), ("d", 7) )
// 声明广播变量
val broadcast: Broadcast[List[(String, Int)]] = sc.broadcast(list)

val resultRDD: RDD[(String, (Int, Int))] = rdd1.map {
case (key, num) => {
var num2 = 0
// 使用广播变量
for ((k, v) <- broadcast.value) {
if (k == key) {
num2 = v
}
}
(key, (num, num2))
}
}
文章作者: Yang4
文章链接: https://masteryang4.github.io/2020/06/17/spark%E7%B3%BB%E5%88%97%E4%B9%8Bspark%E5%9F%BA%E7%A1%80/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 MasterYangBlog
打赏
  • 微信
    微信
  • 支付宝
    支付宝

评论