概述
- Spark Core
Spark Core中提供了Spark最基础与最核心的功能,Spark其他的功能如:Spark SQL,Spark Streaming,GraphX, MLlib都是在Spark Core的基础上进行扩展的
- Spark SQL
Spark SQL是Spark用来操作结构化数据的组件。通过Spark SQL,用户可以使用SQL或者Apache Hive版本的SQL方言(HQL)来查询数据。
- Spark Streaming
Spark Streaming是Spark平台上针对实时数据进行流式计算的组件,提供了丰富的处理数据流的API。
- Spark MLlib
MLlib是Spark提供的一个机器学习算法库。MLlib不仅提供了模型评估、数据导入等额外的功能,还提供了一些更底层的机器学习原语。
- Spark GraphX
GraphX是Spark面向图计算提供的框架与算法库。
入门
1 | <dependencies> |
WordCount
1 | // 创建Spark运行配置对象 |
spark运行环境
local
所谓的Local模式,就是不需要其他任何节点资源就可以在本地执行Spark代码的环境,一般用于教学,调试,演示等,
standalone
local本地模式毕竟只是用来进行练习演示的,真实工作中还是要将应用提交到对应的集群中去执行,这里我们来看看只使用Spark自身节点运行的集群模式,也就是我们所谓的独立部署(Standalone)模式。Spark的Standalone模式体现了经典的master-slave模式。
集群规划:
| Linux1 | Linux2 | Linux3 | |
|---|---|---|---|
| Spark | Worker Master | Worker | Worker |
yarn
独立部署(Standalone)模式由Spark自身提供计算资源,无需其他框架提供资源。这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但是你也要记住,Spark主要是计算框架,而不是资源调度框架,所以本身提供的资源调度并不是它的强项,所以还是和其他专业的资源调度框架集成会更靠谱一些。所以接下来我们来学习在强大的Yarn环境下Spark是如何工作的(其实是因为在国内工作中,Yarn使用的非常多)。
k8s & Mesos
Mesos是Apache下的开源分布式资源管理框架,它被称为是分布式系统的内核,在Twitter得到广泛使用,管理着Twitter超过30,0000台服务器上的应用部署,但是在国内,依然使用着传统的Hadoop大数据框架,所以国内使用Mesos框架的并不多,但是原理其实都差不多。
windows
一般教学演示使用。
对比
| 模式 | Spark安装机器数 | 需启动的进程 | 所属者 | 应用场景 |
|---|---|---|---|---|
| Local | 1 | 无 | Spark | 测试 |
| Standalone | 3 | Master及Worker | Spark | 单独部署 |
| Yarn | 1 | Yarn及HDFS | Hadoop | 混合部署 |
端口号
Spark查看当前Spark-shell运行任务情况端口号:4040(计算)
Spark Master内部通信服务端口号:7077
Standalone模式下,Spark Master Web端口号:8080(资源)
Spark历史服务器端口号:18080
Hadoop YARN任务运行情况查看端口号:8088
spark运行架构
核心组件
Spark框架有两个核心组件:
1、Driver
Spark驱动器节点,用于执行Spark任务中的main方法,负责实际代码的执行工作。Driver在Spark作业执行时主要负责:
将用户程序转化为作业(job)
在Executor之间调度任务(task)
跟踪Executor的执行情况
通过UI展示查询运行情况
实际上,我们无法准确地描述Driver的定义,因为在整个的编程过程中没有看到任何有关Driver的字眼。所以简单理解,所谓的Driver就是驱使整个应用运行起来的程序,也称之为Driver类。
2、Executor
Spark Executor是集群中工作节点(Worker)中的一个JVM进程,负责在 Spark 作业中运行具体任务(Task),任务彼此之间相互独立。Spark 应用启动时,Executor节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。如果有Executor节点发生了故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他Executor节点上继续运行。
Executor有两个核心功能:
负责运行组成Spark应用的任务,并将结果返回给驱动器进程
它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。RDD 是直接缓存在Executor进程内的,因此任务可以在运行时充分利用缓存数据加速运算。
3、ApplicationMaster
Hadoop用户向YARN集群提交应用程序时,提交程序中应该包含ApplicationMaster,用于向资源调度器申请执行任务的资源容器Container,运行用户自己的程序任务job,监控整个任务的执行,跟踪整个任务的状态,处理任务失败等异常情况。
说的简单点就是,RM(资源)和Driver(计算)之间的解耦合靠的就是ApplicationMaster。
提交流程
Spark应用程序提交到Yarn环境中执行的时候,一般会有两种部署执行的方式:Client和Cluster。
两种模式,主要区别在于:Driver程序的运行节点。
Yarn Client模式
Client模式将用于监控和调度的Driver模块在客户端执行,而不是Yarn中,所以一般用于测试。
Driver在任务提交的本地机器上运行
Driver启动后会和ResourceManager通讯申请启动ApplicationMaster
ResourceManager分配container,在合适的NodeManager上启动ApplicationMaster,负责向ResourceManager申请Executor内存
ResourceManager接到ApplicationMaster的资源申请后会分配container,然后ApplicationMaster在资源分配指定的NodeManager上启动Executor进程
Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main函数
之后执行到Action算子时,触发一个Job,并根据宽依赖开始划分stage,每个stage生成对应的TaskSet,之后将task分发到各个Executor上执行。
Yarn Cluster模式
Cluster模式将用于监控和调度的Driver模块启动在Yarn集群资源中执行。一般应用于实际生产环境。
在YARN Cluster模式下,任务提交后会和ResourceManager通讯申请启动ApplicationMaster,
随后ResourceManager分配container,在合适的NodeManager上启动ApplicationMaster,此时的ApplicationMaster就是Driver。
Driver启动后向ResourceManager申请Executor内存,ResourceManager接到ApplicationMaster的资源申请后会分配container,然后在合适的NodeManager上启动Executor进程
Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main函数,
之后执行到Action算子时,触发一个Job,并根据宽依赖开始划分stage,每个stage生成对应的TaskSet,之后将task分发到各个Executor上执行。
spark核心编程
Spark计算框架为了能够对数据进行高并发和高吞吐的处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是:
RDD : 弹性分布式数据集
累加器:分布式共享只写变量
广播变量:分布式共享只读变量
RDD
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
数据集:RDD封装了计算逻辑,并不保存数据
RDD并行度与分区
默认情况下,Spark可以切分任务,并将任务发送给Executor节点并行计算,而这个并行计算的任务数量我们称之为并行度。这个数量可以在构建RDD时指定。
1 | val sparkConf = |
读取内存数据时,数据可以按照并行度的设定进行数据的分区操作,数据分区规则的Spark源码如下
1 | def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = { |
读取文件数据时,数据是按照Hadoop文件读取的规则进行切片分区,而切片规则和数据读取的规则有些差异,具体Spark源码如下
1 | public InputSplit[] getSplits(JobConf job, int numSplits) |
RDD创建
1、从集合(内存)中创建RDD
parallelize和makeRDD
1 | val sparkConf = |
makeRDD方法其实就是parallelize方法
1 | def makeRDD[T: ClassTag]( |
2、从外部存储(文件)创建RDD
由外部存储系统的数据集创建RDD包括:本地的文件系统,所有Hadoop支持的数据集,比如HDFS、HBase等。
1 | val sparkConf = |
3、从其他RDD创建
主要是通过一个RDD运算完后,再产生新的RDD。
4、直接创建RDD(new)
使用new的方式直接构造RDD
转换算子
RDD整体上分为Value类型、双Value类型和Key-Value类型
value类型
map
mapPartitions
mapPartitionsWithIndex
flatMap
glom
- 将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变
groupBy
filter
- 将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃
sample
- 根据指定的规则从数据集中抽取数据
distinct
coalesce
- 根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率
repartition
小问题:coalesce和repartition区别?
repartition算子其实底层调用的就是coalesce算子,只不过固定使用了shuffle的操作,可以让数据更均衡一下,可以有效防止数据倾斜问题。
如果缩减分区,一般就采用coalesce,如果想扩大分区,就采用repartition
sortBy
pipe
- 管道,针对每个分区,都调用一次shell脚本,返回输出的RDD。
双Value类型
intersection
- 对源RDD和参数RDD求交集后返回一个新的RDD
union
- 对源RDD和参数RDD求并集后返回一个新的RDD
subtract
- 以一个RDD元素为主,去除两个RDD中重复元素,将其他元素保留下来。求差集
zip
- 将两个RDD中的元素,以键值对的形式进行合并。其中,键值对中的Key为第1个RDD中的元素,Value为第2个RDD中的元素。
Key - Value类型
partitionBy
- 将数据按照指定Partitioner重新进行分区。Spark默认的分区器是HashPartitioner
reduceByKey
groupByKey
reduceByKey和groupByKey的区别?
两个算子没有使用上的区别。所以使用的时候需要根据应用场景来选择。
从性能上考虑,reduceByKey存在预聚合功能,这样,在shuffle的过程中,落盘的数据量会变少,所以读写磁盘的速度会变快。性能更高
aggregateByKey
- 将数据根据不同的规则进行分区内计算和分区间计算
dataRDD1.aggregateByKey(0)(_+_,_+_)
foldByKey
- 当分区内计算规则和分区间计算规则相同时,aggregateByKey就可以简化为foldByKey
dataRDD1.foldByKey(0)(_+_)
combineByKey
- 最通用的对key-value型rdd进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致。
reduceByKey、foldByKey、aggregateByKey、combineByKey的区别?
从源码的角度来讲,四个算子的底层逻辑是相同的。
aggregateByKey的算子会将初始值和第一个value使用分区内的计算规则进行计算
foldByKey的算子的分区内和分区间的计算规则相同,并且初始值和第一个value使用的规则相同
combineByKey第一个参数就是对第一个value进行处理,所以无需初始值。
reduceByKey不会对第一个value进行处理,分区内和分区间计算规则相同
上面的四个算子都支持预聚合功能。所以shuffle性能比较高
上面的四个算子都可以实现WordCount
sortByKey
join
leftOuterJoin
cogroup
- 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable
,Iterable ))类型的RDD
行动算子
reduce
- 聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据
collect
- 在驱动程序中,以数组Array的形式返回数据集的所有元素
count
first
take
takeOrdered
aggregate
- 分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合
fold
- 折叠操作,aggregate的简化版操作
countByKey
save相关算子
1 | // 保存成Text文件 |
foreach
- 分布式遍历RDD中的每一个元素,调用指定函数
RDD序列化
1) 闭包检查
从计算的角度, 算子以外的代码都是在Driver端执行, 算子里面的代码都是在Executor端执行。那么在scala的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给Executor端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测。
2) Kryo序列化框架
Java的序列化能够序列化任何的类。但是比较重,序列化后,对象的提交也比较大。
Spark出于性能的考虑,Spark2.0开始支持另外一种Kryo序列化机制。Kryo速度是Serializable的10倍。当RDD在Shuffle数据的时候,简单数据类型、数组和字符串类型已经在Spark内部使用Kryo来序列化。
RDD依赖关系
1、RDD血缘关系
RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
1 | val fileRDD: RDD[String] = sc.textFile("input/1.txt") |
2、RDD依赖关系
这里所谓的依赖关系,其实就是RDD之间的关系
1 | val sc: SparkContext = new SparkContext(conf) |
3、RDD窄依赖
窄依赖表示每一个父RDD的Partition最多被子RDD的一个Partition使用,窄依赖我们形象的比喻为独生子女。
4、RDD宽依赖
宽依赖表示同一个父RDD的Partition被多个子RDD的Partition依赖,会引起Shuffle,总结:宽依赖我们形象的比喻为超生。
5、RDD任务划分
RDD任务切分中间分为:Application、Job、Stage和Task
Application:初始化一个SparkContext即生成一个Application;
Job:一个Action算子就会生成一个Job;
Stage:Stage等于宽依赖(ShuffleDependency)的个数加1;
Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数。
注意:Application->Job->Stage->Task每一层都是1对n的关系。
RDD持久化
1、RDD Cache缓存
RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据以序列化的形式缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的action算子时,该RDD将会被缓存在计算节点的内存中,并供后面重用。
1 | // cache操作会增加血缘关系,不改变原有的血缘关系 |
存储级别
1 | object StorageLevel { |
缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。
Spark会自动对一些Shuffle操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点Shuffle失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用persist或cache。
2、RDD CheckPoint检查点
所谓的检查点其实就是通过将RDD中间结果写入磁盘
由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。
对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。
1 | // 设置检查点路径 |
缓存和检查点区别
1 | 1)Cache缓存只是将数据保存起来,不切断血缘依赖。Checkpoint检查点切断血缘依赖。 |
1 | package com.ysss.bigdata.spark.core.cache |
RDD分区器
Spark目前支持Hash分区和Range分区,和用户自定义分区。
Hash分区为当前的默认分区。
分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle后进入哪个分区,进而决定了Reduce的个数。
只有Key-Value类型的RDD才有分区器,非Key-Value类型的RDD分区的值是None
每个RDD的分区ID范围:0 ~ (numPartitions - 1),决定这个值是属于那个分区的。
1) Hash分区:对于给定的key,计算其hashCode,并除以分区个数取余
2) Range分区:将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀,而且分区间有序
文件读取与保存
Spark的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。
文件格式分为:text文件、json文件、csv文件、sequence文件以及Object文件;
文件系统分为:本地文件系统、HDFS、HBASE以及数据库。
累加器
累加器用来把Executor端变量信息聚合到Driver端。
在Driver程序中定义的变量,在Executor端的每个Task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge。
系统累加器
1 | val rdd = sc.makeRDD(List(1,2,3,4,5)) |
自定义累加器
1 | // 自定义累加器 |
广播变量
广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个Spark操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,广播变量用起来都很顺手。在多个并行操作中使用同一个变量,但是 Spark会为每个任务分别发送。
1 | val rdd1 = sc.makeRDD(List( ("a",1), ("b", 2), ("c", 3), ("d", 4) ),4) |




