Spark源码解读(8)——累加器

本文深入探讨Spark累加器(Accumulator)的工作原理,通过源码分析解释为什么Executor中的累加不会影响Driver的值,以及累加器在执行过程中涉及的关键步骤。文章揭示了在Executor端如何进行累加以及Driver端累加的时机。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >

首先,看下以下代码:

var value = 0
sc.parallelize(1 to 10).foreach(i=>value += i)
value
这里value的结果仍然为0

这个比较好解释:

因为 i => value += i 这个Function在运算之前需要经历:序列化 --> broadcase --> 被Executor获取 --> 反序列化 --> 调用   等一系列过程,最终执行 value += i操作的是Executor进程中的value变量,虽然这个变量来自于Driver的value,但经历了序列化、网络传输等一系列操作之后已经是运行在不同进程中的两个完全不同的变量了,所以这里在Executor对value进行累加,并不会影响Driver中value的值


下面,再看看累加器的使用

val accum = sc.accumulator(0)
sc.parallelize(1 to 10).foreach(i=>accum += i)
accum.value

对于累加器我开始也比较疑惑:

为什么使用了Spark提供了Accumulator API就可以实现累加了呢?accum 在Executor中进行累加是如何改变Driver中accu的值的?


翻看了源码之后才发现,背后做了很多工作:

首先,看看Driver端的accumulator究竟是何时进行累加的?

  /** Merge updates from a task to our local accumulator values */
  private def updateAccumulators(event: CompletionEvent): Unit = {
    val task = event.task
    val stage = stageIdToStage(task.stageId)
    if (event.accumUpdates != null) {
      try {
        Accumulators.add(event.accumUpdates)

        event.accumUpdates.foreach { case (id, partialValue) =>
          // In this instance, although the reference in Accumulators.originals is a WeakRef,
          // it's guaranteed to exist since the event.accumUpdates Map exists

          val acc = Accumulators.originals(id).get match {
            case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]]
            case None => throw new NullPointerException("Non-existent reference to Accumulator")
          }

          // To avoid UI cruft, ignore cases where value wasn't updated
          if (acc.name.isDefined && partialValue != acc.zero) {
            val name = acc.name.get
            val value = s"${acc.value}"
            stage.latestInfo.accumulables(id) =
              new AccumulableInfo(id, name, None, value, acc.isInternal)
            event.taskInfo.accumulables +=
              new AccumulableInfo(id, name, Some(s"$partialValue"), value, acc.isInternal)
          }
        }
      } catch {
        // If we see an exception during accumulator update, just log the
        // error and move on.
        case e: Exception =>
          logError(s"Failed to update accumulators for $task", e)
      }
    }
  }
这里的关键代码是:

        Accumulators.add(event.accumUpdates)
  // Add values to the original accumulators with some given IDs
  def add(values: Map[Long, Any]): Unit = synchronized {
    for ((id, value) <- values) {
      if (originals.contains(id)) {
        // Since we are now storing weak references, we must check whether the underlying data
        // is valid.
        originals(id).get match {
          case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value
          case None =>
            throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
        }
      } else {
        logWarning(s"Ignoring accumulator update for unknown accumulator id $id")
      }
    }
  }
而Accumulators在初始化的时候对自己进行了注册:

  Accumulators.register(this)
这次,明白了Driver中的Accumulator在哪里被累加了,从源码中找到了如下调用链:


以上的调用链可以简化为如下流程:


评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值