首先,看下以下代码:
var value = 0
sc.parallelize(1 to 10).foreach(i=>value += i)
value
这里value的结果仍然为0
这个比较好解释:
因为 i => value += i 这个Function在运算之前需要经历:序列化 --> broadcase --> 被Executor获取 --> 反序列化 --> 调用 等一系列过程,最终执行 value += i操作的是Executor进程中的value变量,虽然这个变量来自于Driver的value,但经历了序列化、网络传输等一系列操作之后已经是运行在不同进程中的两个完全不同的变量了,所以这里在Executor对value进行累加,并不会影响Driver中value的值
下面,再看看累加器的使用
val accum = sc.accumulator(0)
sc.parallelize(1 to 10).foreach(i=>accum += i)
accum.value
对于累加器我开始也比较疑惑:
为什么使用了Spark提供了Accumulator API就可以实现累加了呢?accum 在Executor中进行累加是如何改变Driver中accu的值的?
翻看了源码之后才发现,背后做了很多工作:
首先,看看Driver端的accumulator究竟是何时进行累加的?
/** Merge updates from a task to our local accumulator values */
private def updateAccumulators(event: CompletionEvent): Unit = {
val task = event.task
val stage = stageIdToStage(task.stageId)
if (event.accumUpdates != null) {
try {
Accumulators.add(event.accumUpdates)
event.accumUpdates.foreach { case (id, partialValue) =>
// In this instance, although the reference in Accumulators.originals is a WeakRef,
// it's guaranteed to exist since the event.accumUpdates Map exists
val acc = Accumulators.originals(id).get match {
case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]]
case None => throw new NullPointerException("Non-existent reference to Accumulator")
}
// To avoid UI cruft, ignore cases where value wasn't updated
if (acc.name.isDefined && partialValue != acc.zero) {
val name = acc.name.get
val value = s"${acc.value}"
stage.latestInfo.accumulables(id) =
new AccumulableInfo(id, name, None, value, acc.isInternal)
event.taskInfo.accumulables +=
new AccumulableInfo(id, name, Some(s"$partialValue"), value, acc.isInternal)
}
}
} catch {
// If we see an exception during accumulator update, just log the
// error and move on.
case e: Exception =>
logError(s"Failed to update accumulators for $task", e)
}
}
}
这里的关键代码是:
Accumulators.add(event.accumUpdates)
// Add values to the original accumulators with some given IDs
def add(values: Map[Long, Any]): Unit = synchronized {
for ((id, value) <- values) {
if (originals.contains(id)) {
// Since we are now storing weak references, we must check whether the underlying data
// is valid.
originals(id).get match {
case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value
case None =>
throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
}
} else {
logWarning(s"Ignoring accumulator update for unknown accumulator id $id")
}
}
}
而Accumulators在初始化的时候对自己进行了注册:
Accumulators.register(this)
这次,明白了Driver中的Accumulator在哪里被累加了,从源码中找到了如下调用链:
以上的调用链可以简化为如下流程: