Spark-Core(三) - Stage剖析&&Spark on yarn的两种模式

本文深入探讨Spark的Stage剖析,包括action触发job、Stage划分、RDD缓存与StorageLevel、宽窄依赖等。同时,回顾了Hadoop Yarn,详细讲解Spark on Yarn的Client和Cluster模式,包括测试过程与常见报错排查。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >

一、Spark-Core(二)回顾

二、Stage剖析

三、回顾Hadoop中的Yarn

一、Spark-Core(二)回顾

  • Spark的运行架构,各种关键术语的解释:Driver、Cluster Manager、Executor

RDD中有很多方法,a list of partitions,有一个函数:a function for computing each split,有一系列的依赖: a list of dependencies on other RDD;

五大特性 对应的在源码中的方法 运行在driver端还是executor端 Input Output
A list of partitions getPartitions Partition
A function for compting each split compute Iterable可迭代的
A list of dependencies on other RDD getDependencies Dependency

可选的操作:a Partitioner for key-value RDDs、A list of preferred locations to compute each split

二、Stage剖析:

2.1、遇到action产生job

1、每遇到一个action就会触发一个job,每一个job又会被拆分成更小的task

在Spark-shell中执行如下:

scala> sc.parallelize(List(1,2,3,4,4,4,55,55,2,1)).map((_,1)).reduceByKey(_+_).collect
res1: Array[(Int, Int)] = Array((4,3), (2,2), (55,2), (1,2), (3,1))             

scala> sc.parallelize(List(1,2,3,4,4,4,55,55,2,1)).map((_,1)).collect
res2: Array[(Int, Int)] = Array((1,1), (2,1), (3,1), (4,1), (4,1), (4,1), (55,1), (55,1), (2,1), (1,1))
  • WebUI界面的结果:
    在这里插入图片描述
    Job的定义:
    1、 一种由多个task组成的并行计算,job由Spark中相应算子进行触发(例如save、collect);你会在driver’s log中看到这个术语:
  • A parallel computation consisting of multiple tasks that gets spawned in response to a Spark action (e.g. save, collect); you’ll see this term used in the driver’s logs
  1. 每遇到一个action算子如collect就会变成一个job
  2. 如上图所示:我们两个计算中使用了两个collect算子,所以生成了两个job
  3. Job id的初始值是0,为什么接下来是1,然后是2;因为每遇到一个action后job id的值都会依次递增1;

2.2、Job如何产生Stage

1、每一个job都会被拆分成更小的task ==>称为Stages;如果Stage之间有依赖,必须前一个Stage执行完后才执行后一个Stage;

  • Each job get divided into smaller sets of tasks called stages that depend on each other(similar to the map and reduce stages in MapReduce)
1、下图中的DAG图就是这句语句中出现的:
sc.parallelize(List(1,1,2,2,3,3,3,3,4,4,4,5)).map((,1)).reduceByKey(+_).collect
res2: Array[(Int, Int)] = Array((4,3), (2,2), (1,2), (3,4), (5,1))

2、reduceByKey算子会产生shuffle,一遇到shuffle就会产生Stage;stage2需要等stage1执行完才执行

3、如上语句中分别是如下的3个算子:parallelize、map、reduceByKey;reduceByKey算子会产生shuffle,shuffle会产生stage。举例:原来是一个stage,当我们遇到shuffle后,就会被切一刀,变成2个stage。

遇到collect触发成为1个job,然后job中有带shuffle的reduceByKey又被拆分为2个stages;如下图中的collect at和map at都是以stage中的最后一个算子进行命名的。
在这里插入图片描述

2.3、Rdd中的Cache

Rdd中的缓存主要是用于提升速度使用,扩充:JVM:java memory model,计算是通过cpu来处理的,数据是存在内存中的,现在很多地方都很耗费cpu;

Spark中最重要的一个功能是持久化数据到内存中,内存存储在executor中

1、Spark非常重要的一个功能是将rdd持久化在内存中。当对rdd执行持久化操作的时候,每个节点都会将自己操作的RDD的partition持久化到内存中;这样的话,针对于一个rdd反复操作的场景,就只要对rdd进行一次计算即可,后面再使用该rdd不需要反复计算了。

2、 要持久化RDD,只要调用其cache()或者persist()方法即可。在该rdd第一次被计算出来的时候,就会直接缓存在每个节点中。而且持久化机制是容错的,如果持久化的rdd中的任何partition丢失了,那么spark还会通过其源rdd,使用transformation操作重新计算该partition。

3、 cache和persist的区别在于,cache调用的就是persist,而persist调用的是persist(memory_only);如果需要在内存中清除缓存,采用unpersist方法。

2.4、Spark-shell中测试rdd缓存 && StorageLevel

1、读取文件:
scala
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值