- 2.1、遇到action产生job
- 2.2、job产生stage
- 2.3、rdd中的cache
- 2.4、Spark-shell中测试rdd缓存 && StorageLevel
- 2.5、Spark-Core中的框架选择(MEMORY_ONLY)
- 2.6、recomputing重算概念
- 2.7、Spark中的宽窄依赖
- 3.1、 Spark on yarn的描述
- 3.2、 Spark on yarn的使用
- 3.3、 Spark-shell上测试client
- 3.4、 spark on yarn上测试cluster模式 && 报错信息排查
一、Spark-Core(二)回顾
- Spark的运行架构,各种关键术语的解释:Driver、Cluster Manager、Executor
RDD中有很多方法,a list of partitions,有一个函数:a function for computing each split,有一系列的依赖: a list of dependencies on other RDD;
五大特性 | 对应的在源码中的方法 | 运行在driver端还是executor端 | Input | Output |
---|---|---|---|---|
A list of partitions | getPartitions | ? | Partition | |
A function for compting each split | compute | ? | Iterable可迭代的 | |
A list of dependencies on other RDD | getDependencies | ? | Dependency |
可选的操作:a Partitioner for key-value RDDs、A list of preferred locations to compute each split
二、Stage剖析:
2.1、遇到action产生job
1、每遇到一个action就会触发一个job,每一个job又会被拆分成更小的task
在Spark-shell中执行如下:
scala> sc.parallelize(List(1,2,3,4,4,4,55,55,2,1)).map((_,1)).reduceByKey(_+_).collect
res1: Array[(Int, Int)] = Array((4,3), (2,2), (55,2), (1,2), (3,1))
scala> sc.parallelize(List(1,2,3,4,4,4,55,55,2,1)).map((_,1)).collect
res2: Array[(Int, Int)] = Array((1,1), (2,1), (3,1), (4,1), (4,1), (4,1), (55,1), (55,1), (2,1), (1,1))
- WebUI界面的结果:
Job的定义:
1、 一种由多个task组成的并行计算,job由Spark中相应算子进行触发(例如save、collect);你会在driver’s log中看到这个术语: - A parallel computation consisting of multiple tasks that gets spawned in response to a Spark action (e.g. save, collect); you’ll see this term used in the driver’s logs
- 每遇到一个action算子如collect就会变成一个job
- 如上图所示:我们两个计算中使用了两个collect算子,所以生成了两个job
- Job id的初始值是0,为什么接下来是1,然后是2;因为每遇到一个action后job id的值都会依次递增1;
2.2、Job如何产生Stage
1、每一个job都会被拆分成更小的task ==>称为Stages;如果Stage之间有依赖,必须前一个Stage执行完后才执行后一个Stage;
- Each job get divided into smaller sets of tasks called stages that depend on each other(similar to the map and reduce stages in MapReduce)
1、下图中的DAG图就是这句语句中出现的:
sc.parallelize(List(1,1,2,2,3,3,3,3,4,4,4,5)).map((,1)).reduceByKey(+_).collect
res2: Array[(Int, Int)] = Array((4,3), (2,2), (1,2), (3,4), (5,1))
2、reduceByKey算子会产生shuffle,一遇到shuffle就会产生Stage;stage2需要等stage1执行完才执行
3、如上语句中分别是如下的3个算子:parallelize、map、reduceByKey;reduceByKey算子会产生shuffle,shuffle会产生stage。举例:原来是一个stage,当我们遇到shuffle后,就会被切一刀,变成2个stage。
遇到collect触发成为1个job,然后job中有带shuffle的reduceByKey又被拆分为2个stages;如下图中的collect at和map at都是以stage中的最后一个算子进行命名的。
2.3、Rdd中的Cache
Rdd中的缓存主要是用于提升速度使用,扩充:JVM:java memory model,计算是通过cpu来处理的,数据是存在内存中的,现在很多地方都很耗费cpu;
Spark中最重要的一个功能是持久化数据到内存中,内存存储在executor中
1、Spark非常重要的一个功能是将rdd持久化在内存中。当对rdd执行持久化操作的时候,每个节点都会将自己操作的RDD的partition持久化到内存中;这样的话,针对于一个rdd反复操作的场景,就只要对rdd进行一次计算即可,后面再使用该rdd不需要反复计算了。
2、 要持久化RDD,只要调用其cache()或者persist()方法即可。在该rdd第一次被计算出来的时候,就会直接缓存在每个节点中。而且持久化机制是容错的,如果持久化的rdd中的任何partition丢失了,那么spark还会通过其源rdd,使用transformation操作重新计算该partition。
3、 cache和persist的区别在于,cache调用的就是persist,而persist调用的是persist(memory_only);如果需要在内存中清除缓存,采用unpersist方法。
2.4、Spark-shell中测试rdd缓存 && StorageLevel
1、读取文件:
scala