Spark源码解读(6)——Shuffle过程

本文深入探讨Spark中的Shuffle过程,以WordCount为例,详细阐述Stage划分、Task执行、ShuffleDependency、HashPartitioner以及SortShuffleWriter的工作原理,揭示了数据重新分布的关键步骤和内存对齐策略。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >

Shuffle应该说是Spark Core中较为复杂的部分,本文主要从一个最简单的WordCount例子出发分析Spark的Shuffle过程:

1,概述

sc.parallelize(1 to 1000).map(i=>(i%5,1)).reduceByKey(_+_).collect()

计算过程中会分成两个Stage,如下图所示:


每个Stage由多个Task组成,同一Stage的各Task并行执行互不影响,但是后一个(Stage 1)需要等待前一个(Stage 0)执行结束才能开始执行,更为详细的执行过程如下图。

在Stage 0 和Stage 1之间存在数据交换,Stage 0 的Task无法确定其所产生的结果最终需要传递给Stage 1的哪个Task,因此数据需要按照一定的规则(Partitioner)重新打乱,这个过程称为Shuffle


同一个Stage内Task的数量由Partition数量决定,对于ParallelCollectionRDD由默认并行度决定,如果设置了spark.default.parallelism则以该参数为准,否则当前Application总可用核心数(小于2时取值2):

  def parallelize[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
    assertNotStopped()
    new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
  }
  override def defaultParallelism(): Int = {
    conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
  }

ReduceByKey()过程如果没有指定partition的数量,则使用defaultPartitioner

这里如果父RDD有Partitioner则沿用父RDD的Partitioner,这里父RDD是map()操作得到的MapPartitionsRDD,Partitioner为None,因此这里Partitioner取默认的HashPartitioner

这里如果设置了spark.default.parallelism则分区数量由这个参数决定,否则由上一个RDD的partition数量决定,这里最终会由ParallelCollectionRDd的Partition数量决定

所以,对着各个转换Stage 1的Partition数量和Stage 0相同

  def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
    reduceByKey(defaultPartitioner(self), func)
  }
  def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
    val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
    for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) {
      return r.partitioner.get
    }
    if (rdd.context.conf.contains("spark.default.parallelism")) {
      new HashPartitioner(rdd.context.defaultParallelism)
    } else {
      new HashPartitioner(bySize.head.partitions.size)
    }
  }

2,详细的分析Shuffle过程

1)在分析Shuffle过程之前首先梳理一下Job的执行过程:

首先是Action触发Job的提交:SparkContext.runJob();

随后,调用DAGScheduler.runJob(),在这里完成了RDD到TaskSet的转换:

a)DAGScheduler最先进行Stage的划分,划分的依据是RDD的Dependency,没遇到一个ShuffleDependency就会划分出一个新的Stage,并递归提交父Stage:

  /** Submits stage, but first recursively submits any missing parents. */
  private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      logDebug("submitStage(" + stage + ")")
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        val missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        if (missing.isEmpty) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          submitMissingTasks(stage, jobId.get)
        } else {
          for (parent <- missing) {
            submitStage(parent)
          }
          waitingStages += stage
        }
      }
    } else {
      abortStage(stage, "No active job for stage " + stage.id, None)
    }
  }

b)而后确定Stage内每个Task的本地化倾向,并把结果传递给Stage:

  /**
   * Recursive implementation for getPreferredLocs.
   *
   * This method is thread-safe because it only accesses DAGScheduler state through thread-safe
   * methods (getCacheLocs()); please be careful when modifying this method, because any new
   * DAGScheduler state accessed by it may require additional synchronization.
   */
  private def getPreferredLocsInternal(
      rdd: RDD[_],
      partition: Int,
      visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
    // If the partition has already been visited, no need to re-visit.
    // This avoids exponential path exploration.  SPARK-695
    if (!visited.add((rdd, partition))) {
      // Nil has already been returned for previously visited partitions.
      return Nil
    }
    // If the partition is cached, return the cache locations
    val cached = getCacheLocs(rdd)(partition)
    if (cached.nonEmpty) {
      return cached
    }
    // If the RDD has some placement preferences (as is the case for input RDDs), get those
    val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
    if (rddPrefs.nonEmpty) {
      return rddPrefs.map(TaskLocation(_))
    }

    // If the RDD has narrow dependencies, pick the first partition of the first narrow dependency
    // that has any placement preferences. Ideally we would choose based on transfer sizes,
    // but this will do for now.
    rdd.dependencies.foreach {
      case n: NarrowDependency[_] =>
        for (inPart <- n.getParents(partition)) {
          val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
          if (locs != Nil) {
            return locs
          }
        }

      case _ =>
    }

    Nil
  }
    stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
c)之后将RDD序列化并broadcast

      // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
      // For ResultTask, serialize and broadcast (rdd, func).
      val taskBinaryBytes: Array[Byte] = stage match {
        case stage: ShuffleMapStage =>
          closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array()
        case stage: ResultStage =>
          closureSerializer.serialize((stage.rdd, stage.func): AnyRef).array()
      }

      taskBinary = sc.broadcast(taskBinaryBytes)
d)生成Tasks,并将broadcast传递给Task

    val tasks: Seq[Task[_]] = try {
      stage match {
        case stage: ShuffleMapStage =>
          partitionsToCompute.map { id =>
            val locs = taskIdToLocations(id)
            val part = stage.rdd.partitions(id)
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
              taskBinary, part, locs, stage.internalAccumulators)
          }

        case stage: ResultStage =>
          val job = stage.activeJob.get
          partitionsToCompute.map { id =>
            val p: Int = stage.partitions(id)
            val part = stage.rdd.partitions(p)
            val locs = taskIdToLocations(id)
            new ResultTask(stage.id, stage.latestInfo.attemptId,
              taskBinary, part, locs, id, stage.internalAccumulators)
          }
      }
    } catch {
      case NonFatal(e) =>
        abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))
        runningStages -= stage
        return
    }
e)最后,提交Task

      taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
DAGScheduler调用了TaskScheduler.submitTasks()之后Task就交由TaskScheduler进行调度和启动,

TaskScheduler将 Task加到队列之后就触发CoarseGrainedSchedulerBachend进行资源调度和LaunchTask操作:

加入队列:

      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
触发调度:
    backend.reviveOffers()
CoarseGrainedSchedulerBachend完成资源调度和LaunchTask:

    // Make fake resource offers on all executors
    private def makeOffers() {
      // Filter out executors under killing
      val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
      val workOffers = activeExecutors.map { case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
      }.toSeq
      launchTasks(scheduler.resourceOffers(workOffers))
    }
这里涉及到两个关键点:1)TaskSet调度的优先级;2)资源如何分配

TaskSet队列通过getSortedTaskSetQueue()来获取:

  override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
    var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
    val sortedSchedulableQueue =
      schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
    for (schedulable <- sortedSchedulableQueue) {
      sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue
    }
    sortedTaskSetQueue
  }
TaskSet的优先级顺序由taskSetSchedulingAlgorithm.comparator决定,这里一共有两个实现类:FIFOSchedulingAlgorithm和FairSchedulingAlgorithm

/**
 * An interface for sort algorithm
 * FIFO: FIFO algorithm between TaskSetManagers
 * FS: FS algorithm between Pools, and FIFO or FS within Pools
 */
private[spark] trait SchedulingAlgorithm {
  def comparator(s1: Schedulable, s2: Schedulable): Boolean
}

private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val priority1 = s1.priority
    val priority2 = s2.priority
    var res = math.signum
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值