avatar

目录
spark系列之spark-core

Spark内核概述

Spark内核泛指Spark的核心运行机制,包括Spark核心组件的运行机制、Spark任务调度机制、Spark内存管理机制、Spark核心功能的运行原理等,熟练掌握Spark内核原理,能够帮助我们更好地完成Spark代码设计,并能够帮助我们准确锁定项目运行过程中出现的问题的症结所在。

Spark核心组件

Driver

Spark驱动器节点,用于执行Spark任务中的main方法,负责实际代码的执行工作。Driver在Spark作业执行时主要负责:

1) 将用户程序转化为作业(Job);

2) 在Executor之间调度任务(Task);

3) 跟踪Executor的执行情况;

4) 通过UI展示查询运行情况;

Executor

Spark Executor节点是负责在Spark作业中运行具体任务,任务彼此之间相互独立。Spark 应用启动时,Executor节点被同时启动,并且始终伴随着整个Spark应用的生命周期而存在。如果有Executor节点发生了故障或崩溃,Spark应用也可以继续执行,会将出错节点上的任务调度到其他Executor节点上继续运行。

Executor有两个核心功能:

1) 负责运行组成Spark应用的任务,并将结果返回给驱动器(Driver);

2) 它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。RDD是直接缓存在Executor进程内的,因此任务可以在运行时充分利用缓存数据加速运算。

Spark通用运行流程

上图为Spark通用运行流程图,体现了基本的Spark应用程序在部署中的基本提交流程。

这个流程是按照如下的核心步骤进行工作的:

1) 任务提交后,都会先启动Driver程序;

2) 随后Driver向集群管理器注册应用程序;

3) 之后集群管理器根据此任务的配置文件分配Executor并启动;

4) Driver开始执行main函数,Spark查询为懒执行,当执行到Action算子时开始反向推算,根据宽依赖进行Stage的划分,随后每一个Stage对应一个Taskset,Taskset中有多个Task,查找可用资源Executor进行调度;

5) 根据本地化原则,Task会被分发到指定的Executor去执行,在任务执行的过程中,Executor也会不断与Driver进行通信,报告任务运行情况。

Spark部署模式

Spark支持多种集群管理器(Cluster Manager),分别为:

1) Standalone:独立模式,Spark原生的简单集群管理器,自带完整的服务,可单独部署到一个集群中,无需依赖任何其他资源管理系统,使用Standalone可以很方便地搭建一个集群;

2) Hadoop YARN:统一的资源管理机制,在上面可以运行多套计算框架,如MR、Storm等。根据Driver在集群中的位置不同,分为yarn client和yarn cluster;

3) Apache Mesos:一个强大的分布式资源管理框架,它允许多种不同的框架部署在其上,包括Yarn。

4) K8S : 容器式部署环境。

实际上,除了上述这些通用的集群管理器外,Spark内部也提供了方便用户测试和学习的本地集群部署模式和Windows环境。由于在实际工厂环境下使用的绝大多数的集群管理器是Hadoop YARN,因此我们关注的重点是Hadoop YARN模式下的Spark集群部署。

Yarn模式运行机制

YARN Cluster模式

1) 执行脚本提交任务,实际是启动一个SparkSubmit的JVM进程;

2) SparkSubmit类中的main方法反射调用YarnClusterApplication的main方法;

3) YarnClusterApplication创建Yarn客户端,然后向Yarn发送执行指令:bin/java ApplicationMaster;

4) Yarn框架收到指令后会在指定的NM中启动ApplicationMaster;

5) ApplicationMaster启动Driver线程,执行用户的作业;

6) AM向RM注册,申请资源;

7) 获取资源后AM向NM发送指令:bin/java CoarseGrainedExecutorBackend;

8) CoarseGrainedExecutorBackend进程会接收消息,跟Driver通信,注册已经启动的Executor;然后启动计算对象Executor等待接收任务

Driver分配任务并监控任务的执行。

注意:SparkSubmit、ApplicationMaster和CoarseGrainedExecutorBackend是独立的进程;Driver是独立的线程;Executor和YarnClusterApplication是对象。

【9步】

YARN Client模式

1) 执行脚本提交任务,实际是启动一个SparkSubmit的JVM进程;

2) SparkSubmit类中的main方法反射调用用户代码的main方法;

3) 启动Driver线程,执行用户的作业,并创建ScheduleBackend;

4) YarnClientSchedulerBackend向RM发送指令:bin/java ExecutorLauncher;

5) Yarn框架收到指令后会在指定的NM中启动ExecutorLauncher(实际上还是调用ApplicationMaster的main方法);

scala
1
2
3
4
5
6
7
object ExecutorLauncher {

def main(args: Array[String]): Unit = {
ApplicationMaster.main(args)
}

}

6) AM向RM注册,申请资源;

7) 获取资源后AM向NM发送指令:bin/java CoarseGrainedExecutorBackend;

8) CoarseGrainedExecutorBackend进程会接收消息,跟Driver通信,注册已经启动的Executor;然后启动计算对象Executor等待接收任务

9) Driver分配任务并监控任务的执行。

注意:SparkSubmit、ApplicationMaster和CoarseGrainedExecutorBackend是独立的进程;Executor和Driver是对象。

Standalone模式运行机制

Standalone集群有2个重要组成部分,分别是:

1) Master(RM):是一个进程,主要负责资源的调度和分配,并进行集群的监控等职责;

2) Worker(NM):是一个进程,一个Worker运行在集群中的一台服务器上,主要负责两个职责,一个是用自己的内存存储RDD的某个或某些partition;另一个是启动其他进程和线程(Executor),对RDD上的partition进行并行的处理和计算。

Standalone Cluster模式

在Standalone Cluster模式下,任务提交后,Master会找到一个Worker启动Driver。Driver启动后向Master注册应用程序,Master根据submit脚本的资源需求找到内部资源至少可以启动一个Executor的所有Worker,然后在这些Worker之间分配Executor,Worker上的Executor启动后会向Driver反向注册,所有的Executor注册完成后,Driver开始执行main函数,之后执行到Action算子时,开始划分Stage,每个Stage生成对应的taskSet,之后将Task分发到各个Executor上执行。

Standalone Client模式

在Standalone Client模式下,Driver在任务提交的本地机器上运行。Driver启动后向Master注册应用程序,Master根据submit脚本的资源需求找到内部资源至少可以启动一个Executor的所有Worker,然后在这些Worker之间分配Executor,Worker上的Executor启动后会向Driver反向注册,所有的Executor注册完成后,Driver开始执行main函数,之后执行到Action算子时,开始划分Stage,每个Stage生成对应的TaskSet,之后将Task分发到各个Executor上执行。

Spark通讯架构

概述

Spark中通信框架的发展:

  • Spark早期版本中采用Akka作为内部通信部件。

  • Spark1.3中引入Netty通信框架,为了解决Shuffle的大数据传输问题使用

  • Spark1.6中Akka和Netty可以配置使用。Netty完全实现了Akka在Spark中的功能。

  • Spark2系列中,Spark抛弃Akka,使用Netty。

RPC通信协议原理图:

RPC1.png

Spark通信终端

Driver:

class DriverEndpoint extends ThreadSafeRpcEndpoint

Executor

class CoarseGrainedExecutorBackend extends ThreadSafeRpcEndpoint

spark通讯架构解析

  • RpcEndpoint:RPC通信终端。Spark针对每个节点(Client/Master/Worker)都称之为一个RPC终端,且都实现RpcEndpoint接口,内部根据不同端点的需求,设计不同的消息和不同的业务处理,如果需要发送(询问)则调用Dispatcher。在Spark中,所有的终端都存在生命周期:

    • Constructor
    • onStart
    • receive*
    • onStop
  • RpcEnv:RPC上下文环境,每个RPC终端运行时依赖的上下文环境称为RpcEnv;在把当前Spark版本中使用的NettyRpcEnv

  • Dispatcher:消息调度(分发)器,针对于RPC终端需要发送远程消息或者从远程RPC接收到的消息,分发至对应的指令收件箱(发件箱)。如果指令接收方是自己则存入收件箱,如果指令接收方不是自己,则放入发件箱;

  • Inbox:指令消息收件箱。一个本地RpcEndpoint对应一个收件箱,Dispatcher在每次向Inbox存入消息时,都将对应EndpointData加入内部ReceiverQueue中,另外Dispatcher创建时会启动一个单独线程进行轮询ReceiverQueue,进行收件箱消息消费;

  • RpcEndpointRef:RpcEndpointRef是对远程RpcEndpoint的一个引用。当我们需要向一个具体的RpcEndpoint发送消息时,一般我们需要获取到该RpcEndpoint的引用,然后通过该应用发送消息。

  • OutBox:指令消息发件箱。对于当前RpcEndpoint来说,一个目标RpcEndpoint对应一个发件箱,如果向多个目标RpcEndpoint发送信息,则有多个OutBox。当消息放入Outbox后,紧接着通过TransportClient将消息发送出去。消息放入发件箱以及发送过程是在同一个线程中进行;

  • RpcAddress:表示远程的RpcEndpointRef的地址,Host + Port。

  • TransportClient:Netty通信客户端,一个OutBox对应一个TransportClient,TransportClient不断轮询OutBox,根据OutBox消息的receiver信息,请求对应的远程TransportServer;

  • TransportServer:Netty通信服务端,一个RpcEndpoint对应一个TransportServer,接受远程消息后调用Dispatcher分发消息至对应收发件箱;

Spark任务调度机制

在生产环境下,Spark集群的部署方式一般为YARN-Cluster模式,之后的内核分析内容中我们默认集群的部署方式为YARN-Cluster模式。在上一章中我们讲解了Spark YARN-Cluster模式下的任务提交流程,但是我们并没有具体说明Driver的工作流程, Driver线程主要是初始化SparkContext对象,准备运行所需的上下文,然后一方面保持与ApplicationMaster的RPC连接,通过ApplicationMaster申请资源,另一方面根据用户业务逻辑开始调度任务,将任务下发到已有的空闲Executor上。

当ResourceManager向ApplicationMaster返回Container资源时,ApplicationMaster就尝试在对应的Container上启动Executor进程,Executor进程起来后,会向Driver反向注册,注册成功后保持与Driver的心跳,同时等待Driver分发任务,当分发的任务执行完毕后,将任务状态上报给Driver。

概述

当Driver起来后,Driver则会根据用户程序逻辑准备任务,并根据Executor资源情况逐步分发任务。在详细阐述任务调度前,首先说明下Spark里的几个概念。一个Spark应用程序包括Job、Stage以及Task三个概念:

1) Job是以Action方法为界,遇到一个Action方法则触发一个Job;

2) Stage是Job的子集,以RDD宽依赖(即Shuffle)为界,遇到Shuffle做一次划分;

3) Task是Stage的子集,以并行度(分区数)来衡量,分区数是多少,则有多少个task。

Spark的任务调度总体来说分两路进行,一路是Stage级的调度,一路是Task级的调度。

【重点】总体调度流程如下图所示

Spark RDD通过其Transactions操作,形成了RDD血缘(依赖)关系图,即DAG,最后通过Action的调用,触发Job并调度执行,执行过程中会创建两个调度器:DAGScheduler和TaskScheduler。

  • DAGScheduler负责Stage级的调度,主要是将job切分成若干Stages,并将每个Stage打包成TaskSet交给TaskScheduler调度。

  • TaskScheduler负责Task级的调度,将DAGScheduler给过来的TaskSet按照指定的调度策略分发到Executor上执行,调度过程中SchedulerBackend负责提供可用资源,其中SchedulerBackend有多种实现,分别对接不同的资源管理系统。

【扩展】EventQueue:双端(阻塞)队列;

​ BlockingQueue:阻塞队列

Driver初始化SparkContext过程中,会分别初始化DAGScheduler、TaskScheduler、SchedulerBackend以及HeartbeatReceiver,并启动SchedulerBackend以及HeartbeatReceiver。

SchedulerBackend通过ApplicationMaster申请资源,并不断从TaskScheduler中拿到合适的Task分发到Executor执行。HeartbeatReceiver负责接收Executor的心跳信息,监控Executor的存活状况,并通知到TaskScheduler。

Spark Stage级调度

Spark的任务调度是从DAG切割开始,主要是由DAGScheduler来完成。当遇到一个Action操作后就会触发一个Job的计算,并交给DAGScheduler来提交,下图是涉及到Job提交的相关方法调用流程图。

1) Job由最终的RDD和Action方法封装而成;

2) SparkContext将Job交给DAGScheduler提交,它会根据RDD的血缘关系构成的DAG进行切分,将一个Job划分为若干Stages,具体划分策略是,由最终的RDD不断通过依赖回溯判断父依赖是否是宽依赖,即以Shuffle为界,划分Stage,窄依赖的RDD之间被划分到同一个Stage中,可以进行pipeline式的计算。划分的Stages分两类,一类叫做ResultStage,为DAG最下游的Stage,由Action方法决定,另一类叫做ShuffleMapStage,为下游Stage准备数据.,下面看一个简单的例子WordCount。

Job由saveAsTextFile触发,该Job由RDD-3和saveAsTextFile方法组成,根据RDD之间的依赖关系从RDD-3开始回溯搜索,直到没有依赖的RDD-0,在回溯搜索过程中,RDD-3依赖RDD-2,并且是宽依赖,所以在RDD-2和RDD-3之间划分Stage,RDD-3被划到最后一个Stage,即ResultStage中,RDD-2依赖RDD-1,RDD-1依赖RDD-0,这些依赖都是窄依赖,所以将RDD-0、RDD-1和RDD-2划分到同一个Stage,形成pipeline操作。即ShuffleMapStage中,实际执行的时候,数据记录会一气呵成地执行RDD-0到RDD-2的转化。不难看出,其本质上是一个深度优先搜索(Depth First Search)算法。

一个Stage是否被提交,需要判断它的父Stage是否执行,只有在父Stage执行完毕才能提交当前Stage,如果一个Stage没有父Stage,那么从该Stage开始提交。Stage提交时会将Task信息(分区信息以及方法等)序列化并被打包成TaskSet交给TaskScheduler,一个Partition对应一个Task,另一方面TaskScheduler会监控Stage的运行状态,只有Executor丢失或者Task由于Fetch失败才需要重新提交失败的Stage以调度运行失败的任务,其他类型的Task失败会在TaskScheduler的调度过程中重试。

相对来说DAGScheduler做的事情较为简单,仅仅是在Stage层面上划分DAG,提交Stage并监控相关状态信息。TaskScheduler则相对较为复杂,下面详细阐述其细节。

Spark Task级调度

Spark Task的调度是由TaskScheduler来完成,由前文可知,DAGScheduler将Stage打包到TaskSet交给TaskScheduler,TaskScheduler会将TaskSet封装为TaskSetManager加入到调度队列中,TaskSetManager结构如下图所示。

TaskSetManager负责监控管理同一个Stage中的Tasks,TaskScheduler就是以TaskSetManager为单元来调度任务。

前面也提到,TaskScheduler初始化后会启动SchedulerBackend,它负责跟外界打交道,接收Executor的注册信息,并维护Executor的状态,所以说SchedulerBackend是管“粮食”的,同时它在启动后会定期地去“询问”TaskScheduler有没有任务要运行,也就是说,它会定期地“问”TaskScheduler“我有这么余粮,你要不要啊”,TaskScheduler在SchedulerBackend“问”它的时候,会从调度队列中按照指定的调度策略选择TaskSetManager去调度运行,大致方法调用流程如下图所示:

TaskSetManager加入rootPool调度池中之后,调用SchedulerBackend的riviveOffers方法给driverEndpoint发送ReviveOffer消息;driverEndpoint收到ReviveOffer消息后调用makeOffers方法,过滤出活跃状态的Executor(这些Executor都是任务启动时反向注册到Driver的Executor),然后将Executor封装成WorkerOffer对象;准备好计算资源(WorkerOffer)后,taskScheduler基于这些资源调用resourceOffer在Executor上分配task。

调度策略

TaskScheduler支持两种调度策略,一种是FIFO,也是默认的调度策略,另一种是FAIR。在TaskScheduler初始化过程中会实例化rootPool,表示树的根节点,是Pool类型。

1) FIFO调度策略

如果是采用FIFO调度策略,则直接简单地将TaskSetManager按照先来先到的方式入队,出队时直接拿出最先进队的TaskSetManager,其树结构如下图所示,TaskSetManager保存在一个FIFO队列中。

2) FAIR调度策略

FAIR调度策略的树结构如下图所示:

FAIR模式中有一个rootPool和多个子Pool,各个子Pool中存储着所有待分配的TaskSetMagager。

在FAIR模式中,需要先对子Pool进行排序,再对子Pool里面的TaskSetManager进行排序,因为Pool和TaskSetMagager都继承了Schedulable特质,因此使用相同的排序算法。

排序过程的比较是基于Fair-share来比较的,每个要排序的对象包含三个属性: runningTasks值(正在运行的Task数)、minShare值(资源合理利用的值,比如核的利用率)、weight值,比较时会综合考量runningTasks值,minShare值以及weight值。

注意,minShare、weight的值均在公平调度配置文件fairscheduler.xml中被指定,调度池在构建阶段会读取此文件的相关配置。

  • 如果A对象的runningTasks大于它的minShare,B对象的runningTasks小于它的minShare,那么B排在A前面;(runningTasks比minShare小的先执行)

  • 如果A、B对象的runningTasks都小于它们的minShare,那么就比较runningTasks与minShare的比值(minShare使用率),谁小谁排前面;(minShare使用率低的先执行)

  • 如果A、B对象的runningTasks都大于它们的minShare,那么就比较runningTasks与weight的比值(权重使用率),谁小谁排前面。(权重使用率低的先执行)

  • 如果上述比较均相等,则比较名字。

整体上来说就是通过minShare和weight这两个参数控制比较过程,可以做到让minShare使用率和权重使用率少(实际运行task比例较少)的先运行。

FAIR模式排序完成后,所有的TaskSetManager被放入一个ArrayBuffer里,之后依次被取出并发送给Executor执行。

从调度队列中拿到TaskSetManager后,由于TaskSetManager封装了一个Stage的所有Task,并负责管理调度这些Task,那么接下来的工作就是TaskSetManager按照一定的规则一个个取出Task给TaskScheduler,TaskScheduler再交给SchedulerBackend去发到Executor上执行。

本地化调度

WHY?【解决任务发给谁的问题】【其实就是计算和数据的相对位置】

【移动数据不如移动计算】

DAGScheduler切割Job,划分Stage, 通过调用submitStage来提交一个Stage对应的tasks,submitStage会调用submitMissingTasks,submitMissingTasks 确定每个需要计算的 task 的preferredLocations,通过调用getPreferrdeLocations()得到partition 的优先位置,由于一个partition对应一个Task,此partition的优先位置就是task的优先位置,对于要提交到TaskScheduler的TaskSet中的每一个Task,该task优先位置与其对应的partition对应的优先位置一致。

从调度队列中拿到TaskSetManager后,那么接下来的工作就是TaskSetManager按照一定的规则一个个取出task给TaskScheduler,TaskScheduler再交给SchedulerBackend去发到Executor上执行。前面也提到,TaskSetManager封装了一个Stage的所有Task,并负责管理调度这些Task。

根据每个Task的优先位置,确定Task的Locality级别,Locality一共有五种,优先级由高到低顺序:

名称 解析
PROCESS_LOCAL 进程本地化,task和数据在同一个Executor中,性能最好。
NODE_LOCAL 节点本地化,task和数据在同一个节点中,但是task和数据不在同一个Executor中,数据需要在进程间进行传输。
RACK_LOCAL 机架本地化,task和数据在同一个机架的两个节点上,数据需要通过网络在节点之间进行传输。
NO_PREF 对于task来说,从哪里获取都一样,没有好坏之分。
ANY task和数据可以在集群的任何地方,而且不在一个机架中,性能最差。

在调度执行时,Spark调度总是会尽量让每个task以最高的本地性级别来启动,当一个task以X本地性级别启动,但是该本地性级别对应的所有节点都没有空闲资源而启动失败,此时并不会马上降低本地性级别启动而是在某个时间长度内再次以X本地性级别来启动该task,若超过限时时间则降级启动,去尝试下一个本地性级别,依次类推。

可以通过调大每个类别的最大容忍延迟时间,在等待阶段对应的Executor可能就会有相应的资源去执行此task,这就在在一定程度上提到了运行性能。

失败重试与黑名单机制

除了选择合适的Task调度运行外,还需要监控Task的执行状态,前面也提到,与外部打交道的是SchedulerBackend,Task被提交到Executor启动执行后,Executor会将执行状态上报给SchedulerBackend,SchedulerBackend则告诉TaskScheduler,TaskScheduler找到该Task对应的TaskSetManager,并通知到该TaskSetManager,这样TaskSetManager就知道Task的失败与成功状态,对于失败的Task,会记录它失败的次数,如果失败次数还没有超过最大重试次数,那么就把它放回待调度的Task池子中,否则整个Application失败。

在记录Task失败次数过程中,会记录它上一次失败所在的Executor Id和Host,这样下次再调度这个Task时,会使用黑名单机制,避免它被调度到上一次失败的节点上,起到一定的容错作用。黑名单记录Task上一次失败所在的Executor Id和Host,以及其对应的“拉黑”时间,“拉黑”时间是指这段时间内不要再往这个节点上调度这个Task了。

Spark shuffle解析

shuffle的本质就是落盘,关键是如何落盘,是排序后落盘还是不排序落盘

核心要点

ShuffleMapStage与ResultStage

在划分stage时,最后一个stage称为finalStage,它本质上是一个ResultStage对象,前面的所有stage被称为ShuffleMapStage。

ShuffleMapStage的结束伴随着shuffle文件的写磁盘。

ResultStage基本上对应代码中的action算子,即将一个函数应用在RDD的各个partition的数据集上,意味着一个job的运行结束。

HashShuffle解析

未优化的HashShuffle

这里我们先明确一个假设前提:每个Executor只有1个CPU core,也就是说,无论这个Executor上分配多少个task线程,同一时间都只能执行一个task线程。

如下图中有3个 Reducer,从Task 开始那边各自把自己进行 Hash 计算(分区器:hash/numreduce取模),分类出3个不同的类别,每个 Task 都分成3种类别的数据,想把不同的数据汇聚然后计算出最终的结果,所以Reducer 会在每个 Task 中把属于自己类别的数据收集过来,汇聚成一个同类别的大集合,每1个 Task 输出3份本地文件,这里有4个 Mapper Tasks,所以总共输出了4个 Tasks x 3个分类文件 = 12个本地小文件。

优化的HashShuffle

优化的HashShuffle过程就是启用合并机制,合并机制就是复用buffer,开启合并机制的配置是spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true即可开启优化机制。通常来说,如果我们使用HashShuffleManager,那么都建议开启这个选项。

这里还是有4个Tasks,数据类别还是分成3种类型,因为Hash算法会根据你的 Key 进行分类,在同一个进程中,无论是有多少过Task,都会把同样的Key放在同一个Buffer里,然后把Buffer中的数据写入以Core数量为单位的本地文件中,(一个Core只有一种类型的Key的数据),每1个Task所在的进程中,分别写入共同进程中的3份本地文件,这里有4个Mapper Tasks,所以总共输出是 2个Cores x 3个分类文件 = 6个本地小文件。

SortShuffle解析

普通SortShuffle

在该模式下,数据会先写入一个数据结构,reduceByKey写入Map,一边通过Map局部聚合,一遍写入内存。Join算子写入ArrayList直接写入内存中。然后需要判断是否达到阈值,如果达到就会将内存数据结构的数据写入到磁盘,清空内存数据结构。

在溢写磁盘前,先根据key进行排序,排序过后的数据,会分批写入到磁盘文件中。默认批次为10000条,数据会以每批一万条写入到磁盘文件。写入磁盘文件通过缓冲区溢写的方式,每次溢写都会产生一个磁盘文件,也就是说一个Task过程会产生多个临时文件。

最后在每个Task中,将所有的临时文件合并,这就是merge过程,此过程将所有临时文件读取出来,一次写入到最终文件。意味着一个Task的所有数据都在这一个文件中。同时单独写一份索引文件,标识下游各个Task的数据在文件中的索引,start offset和end offset。

bypass SortShuffle

bypass运行机制的触发条件如下:

1)不是聚合类的shuffle算子,比如reduceByKey。

2) shuffle reduce task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值,默认为200。

此时task会为每个reduce端的task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。

该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。

而该机制与普通SortShuffleManager运行机制的不同在于:不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

Spark内存管理

堆内和堆外内存规划

作为一个JVM 进程,Executor 的内存管理建立在JVM的内存管理之上,Spark对 JVM的堆内(On-heap)空间进行了更为详细的分配,以充分利用内存。同时,Spark引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用。堆内内存受到JVM统一管理,堆外内存是直接向操作系统进行内存的申请和释放。

堆内内存

堆内内存的大小,由Spark应用程序启动时的 –executor-memory 或 spark.executor.memory 参数配置。Executor 内运行的并发任务共享 JVM 堆内内存,这些任务在缓存 RDD 数据和广播(Broadcast)数据时占用的内存被规划为存储(Storage)内存,而这些任务在执行 Shuffle 时占用的内存被规划为执行(Execution)内存,剩余的部分不做特殊规划,那些Spark内部的对象实例,或者用户定义的 Spark 应用程序中的对象实例,均占用剩余的空间。不同的管理模式下,这三部分占用的空间大小各不相同。
Spark对堆内内存的管理是一种逻辑上的”规划式”的管理,因为对象实例占用内存的申请和释放都由JVM完成,Spark只能在申请后和释放前记录这些内存,我们来看其具体流程:
申请内存流程如下:
Spark 在代码中 new 一个对象实例;
JVM 从堆内内存分配空间,创建对象并返回对象引用;
Spark 保存该对象的引用,记录该对象占用的内存。
释放内存流程如下:

  1. Spark记录该对象释放的内存,删除该对象的引用;
  2. 等待JVM的垃圾回收机制释放该对象占用的堆内内存。
    我们知道,JVM 的对象可以以序列化的方式存储,序列化的过程是将对象转换为二进制字节流,本质上可以理解为将非连续空间的链式存储转化为连续空间或块存储,在访问时则需要进行序列化的逆过程——反序列化,将字节流转化为对象,序列化的方式可以节省存储空间,但增加了存储和读取时候的计算开销。
    对于Spark中序列化的对象,由于是字节流的形式,其占用的内存大小可直接计算,而对于非序列化的对象,其占用的内存是通过周期性地采样近似估算而得,即并不是每次新增的数据项都会计算一次占用的内存大小,这种方法降低了时间开销但是有可能误差较大,导致某一时刻的实际内存有可能远远超出预期。此外,在被Spark标记为释放的对象实例,很有可能在实际上并没有被JVM回收,导致实际可用的内存小于Spark记录的可用内存。所以 Spark并不能准确记录实际可用的堆内内存,从而也就无法完全避免内存溢出(OOM, Out of Memory)的异常。
    虽然不能精准控制堆内内存的申请和释放,但 Spark 通过对存储内存和执行内存各自独立的规划管理,可以决定是否要在存储内存里缓存新的 RDD,以及是否为新的任务分配执行内存,在一定程度上可以提升内存的利用率,减少异常的出现。

堆外内存

为了进一步优化内存的使用以及提高Shuffle时排序的效率,Spark引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,存储经过序列化的二进制数据。
堆外内存意味着把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机)。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。
利用JDK Unsafe API(从Spark 2.0开始,在管理堆外的存储内存时不再基于Tachyon,而是与堆外的执行内存一样,基于 JDK Unsafe API 实现),Spark 可以直接操作系统堆外内存,减少了不必要的内存开销,以及频繁的 GC 扫描和回收,提升了处理性能。堆外内存可以被精确地申请和释放(堆外内存之所以能够被精确的申请和释放,是由于内存的申请和释放不再通过JVM机制,而是直接向操作系统申请,JVM对于内存的清理是无法准确指定时间点的,因此无法实现精确的释放),而且序列化的数据占用的空间可以被精确计算,所以相比堆内内存来说降低了管理的难度,也降低了误差。
在默认情况下堆外内存并不启用,可通过配置spark.memory.offHeap.enabled 参数启用,并由 spark.memory.offHeap.size参数设定堆外空间的大小。除了没有 other 空间,堆外内存与堆内内存的划分方式相同,所有运行中的并发任务共享存储内存和执行内存。

内存空间分配

静态内存管理

在Spark最初采用的静态内存管理机制下,存储内存、执行内存和其他内存的大小在Spark应用程序运行期间均为固定的,但用户可以应用程序启动前进行配置,堆内内存的分配如图所示:

可以看到,可用的堆内内存的大小需要按照下列方式计算:

Code
1
2
3
可用的存储内存 = systemMaxMemory * spark.storage.memoryFraction * spark.storage.safety Fraction

可用的执行内存 = systemMaxMemory * spark.shuffle.memoryFraction * spark.shuffle.safety Fraction

其中systemMaxMemory取决于当前JVM堆内内存的大小,最后可用的执行内存或者存储内存要在此基础上与各自的memoryFraction 参数和safetyFraction 参数相乘得出。上述计算公式中的两个 safetyFraction 参数,其意义在于在逻辑上预留出 1-safetyFraction 这么一块保险区域,降低因实际内存超出当前预设范围而导致 OOM 的风险(上文提到,对于非序列化对象的内存采样估算会产生误差)。值得注意的是,这个预留的保险区域仅仅是一种逻辑上的规划,在具体使用时 Spark 并没有区别对待,和”其它内存”一样交给了 JVM 去管理。

Storage内存和Execution内存都有预留空间,目的是防止OOM,因为Spark堆内内存大小的记录是不准确的,需要留出保险区域。

堆外的空间分配较为简单,只有存储内存和执行内存,如下图所示。可用的执行内存和存储内存占用的空间大小直接由参数spark.memory.storageFraction 决定,由于堆外内存占用的空间可以被精确计算,所以无需再设定保险区域。

静态内存管理机制实现起来较为简单,但如果用户不熟悉Spark的存储机制,或没有根据具体的数据规模和计算任务或做相应的配置,很容易造成”一半海水,一半火焰”的局面,即存储内存和执行内存中的一方剩余大量的空间,而另一方却早早被占满,不得不淘汰或移出旧的内容以存储新的内容。由于新的内存管理机制的出现,这种方式目前已经很少有开发者使用,出于兼容旧版本的应用程序的目的,Spark 仍然保留了它的实现。

统一内存管理

Spark1.6 之后引入的统一内存管理机制,与静态内存管理的区别在于存储内存和执行内存共享同一块空间,可以动态占用对方的空闲区域,统一内存管理的堆内内存结构如图所示:

统一内存管理的堆外内存结构如下图所示:

其中最重要的优化在于动态占用机制,其规则如下:

1) 设定基本的存储内存和执行内存区域(spark.storage.storageFraction参数),该设定确定了双方各自拥有的空间的范围;

2) 双方的空间都不足时,则存储到硬盘;若己方空间不足而对方空余时,可借用对方的空间;(存储空间不足是指不足以放下一个完整的Block)

3) 执行内存的空间被对方占用后,可让对方将占用的部分转存到硬盘,然后”归还”借用的空间;

4) 存储内存的空间被对方占用后,无法让对方”归还”,因为需要考虑 Shuffle过程中的很多因素,实现起来较为复杂。【为了让execution保证计算准确】

统一内存管理的动态占用机制如图所示:

【重点】

凭借统一内存管理机制,Spark在一定程度上提高了堆内和堆外内存资源的利用率,降低了开发者维护Spark内存的难度,但并不意味着开发者可以高枕无忧。如果存储内存的空间太大或者说缓存的数据过多,反而会导致频繁的全量垃圾回收,降低任务执行时的性能,因为缓存的RDD数据通常都是长期驻留内存的。所以要想充分发挥Spark的性能,需要开发者进一步了解存储内存和执行内存各自的管理方式和实现原理。

存储内存管理

RDD的持久化机制

RDD的缓存过程

淘汰与落盘

执行内存管理

Shuffle Write

Shuffle Read

小结

Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
任务的划分:
ShuffleMapStage(1) => ShuffleMapTask(N) => shuffle write
=> shuffle read
ResultStage(1) => ResultTask(N) => shuffle read

任务的封装:
Task => TaskSet => TaskSetManager => TaskPool

任务调度器(默认使用FIFO)
FIFO : 先进先出
FAIR : 公平 (runningTasks, minShare, weight)sortWith

任务本地化级别:
PROCESS_LOCAL : 内存数据的数据处理
NODE_LOCAL : yarn集群的方式访问HDFS文件
RACK_LOCAL
Any

任务的执行
Driver => encode(Task) => RPC => ExecutorBackend => decode(Task) => Executor
Executor => ThreadPool => TaskRunner => run => Task.run => XXXTask.runTask

Shuffle管理器 :
SortShuffleManager

Shuffle Writer:
1.UnsafeShuffleWriter => SerializedShuffleHandle
2.BypassMergeSortShuffleWriter => BypassMergeSortShuffleHandle

没有预聚合功能 & reduce阶段的分区数量 <= 阈值(200)

有预聚合功能的算子 : reduceByKey combineByKey, aggregateByKey, foldByKey
没有预聚合功能的算子 : groupByKey sortByKey

实现方式类似于HashShuffle

3.SortShuffleWriter => BaseShuffleHandle

写磁盘文件时,首席会按照分区进行排序,然后默认按照key.hashCode排序
排序时,如果超过内存阈值 :5m

预聚合的原理: 在shuffle落盘之前的聚合功能
PartitionedAppendOnlyMap => Hashtable => ( (分区ID,Key), value )
不支持预聚合
PartitionedPairBuffer => ( (分区ID,Key), value )

Spark内存
静态内存管理:
存储内存:
val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
(systemMaxMemory * memoryFraction * safetyFraction).toLong
执行内存:
val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
(systemMaxMemory * memoryFraction * safetyFraction).toLong
统一内存管理
存储内存:
val usableMemory = systemMemory - reservedMemory
val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6)
maxMemory = (usableMemory * memoryFraction).toLong
onHeapStorageRegionSize =
(maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong,
执行内存:

Spark配置:
spark.scheduler.mode : 任务调度器,默认为FIFO,可以改为FAIR
spark.locality.wait: 本地化等待时间,默认3s
spark.shuffle.sort.bypassMergeThreshold : 忽略排序的阈值
spark.local.dir : 本地文件存储路径
spark.shuffle.spill.batchSize : 溢写磁盘的数据量 10000
spark.memory.useLegacyMode : 内存管理兼容模式
文章作者: Yang4
文章链接: https://masteryang4.github.io/2020/06/19/spark%E7%B3%BB%E5%88%97%E4%B9%8Bspark-core/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 MasterYangBlog
打赏
  • 微信
    微信
  • 支付宝
    支付宝

评论