spark的缓存提升本质以及分区数量和task执行时间的先后

示例代码
import org.apache.spark.storage.StorageLevel

val path="file:///home/hadoop/sogouoneday.txt"
val input=sc.textFile(path).map(x=>x.split("\t")).map(x=>(x(1),1))

// 执行缓存前,先进入spark的web端http://localhost:4040/storage/查看 缓存情况。
input.cache() // 等价于input.persist(StorageLevel.MEMORY_ONLY)
val user=input.reduceByKey((a,b)=>a+b)
user.count

user.count,第一次运行用14s
在这里插入图片描述
缓存界面提示:
因为数据只有145M,因此可以100%在内存里面缓存
在这里插入图片描述

在这里插入图片描述

缓存效果分析
第1次 user.count

可以看到有两个阶段,因为整个代码有一条窄依赖关系链,以及单个宽依赖,stage划分是根据出现宽依赖来划分,可以看到,spark第一次读取数据转为RDD的map阶段耗时最多
在这里插入图片描述
可以看到第一次map这里有5个task工作,因此第一次肯定少不了把数据从其他地方加载到spark的jvm内存!
在这里插入图片描述

第2次 user.count——这里解释了spark缓存提升的本质原因

相当于“前人种树后人乘凉”的效果,这里第二次的user.count已经不需要map这个处理,因为input这个rdd已经缓存了,不需要重新读取再来执行map操作,从下图可以看出“Skipped Stages”,里面是空的,说明没有task线程去跑。对比第一次的同界面,可以看到第一次map这里有5个task工作,因此第一次肯定少不了把数据从其他地方加载到spark的jvm内存!
在这里插入图片描述
在这里插入图片描述

关于分区数量和task数量以及task的执行流程

因为数据是145M,虚拟机有4个核,自动分区为4+1共5个分区,因此executor进程需要开启5个线程,也即5个task
那么这5个task是同时执行代码吗? 显然不是,如下图所示:
因为虚拟机有4个核,因此前面4个分区是并行执行,从47:00开始,到47:11结束,用时11秒
接着第5个task才能开始执行,也即有cpu核数可以用了,因此要等前面4个cpu算完后,才能开始,从47:11秒开始!
在这里插入图片描述
从以下的event timeline可以非常清楚看出,cpu个数决定task并行运行的流程
在这里插入图片描述

有多少个分区就有多少线程task并发执行

例如现在读取时指定10个分区

val inputRDD=sc.textFile(path,10).map(x=>x.split("\t")).map(x=>(x(1),1))

执行后,spark安排了10个tasks,当然每个task负责的数据量从32M降到14.6M,那么这个10个task的运行时序是怎么调度的呢?
在这里插入图片描述
可以看到10个tasks并不是同时执行的,因为只有4个cpu,因此第1组4个task先在33秒进行,第2组task等第1组结束后,从35秒开会执行,最后两个task等第2组完成后,从38秒开始执行。
在这里插入图片描述

不同分区数量对计算效率的提升

显然分区数量越多,task线程虽然固定,但时cpu是交替执行,计算效率有明细提升
145M,5个分区,用时14s,如果指定spark读取数据时,将其逻辑划分10个分区,那么计算时间用时只需要7s

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值