RDD 的 5 个特征
下面来说一下 RDD 这东西,它是 Resilient Distributed Datasets 的简写。
咱们来看看 RDD 在源码的解释。
- A list of partitions: 在大数据领域,大数据都是分割成若干个部分,放到多个服务器上,这样就能做到多线程的处理数据,这对处理大数据量是非常重要的。分区意味着,可以使用多个线程了处理。
- A function for computing each split:作用在每个分区里面的函数,当我们读取数据之后,当然是要对其加工的,加工的定义就是我们编写的函数,这些函数主要包含转化算子、控制算子、行动算子。
- A list of dependencies on other RDDs。一个 Spark Application 下面可以有多个 Job ,一个 action 算子就可以分出一个 job ,一个 job 里面又可以分出若干个 stage , 一个 stage 中又有多个 RDD ,RDD 之间是用上下游关系的,就像流水线的工序,公休之间也会有先后之分的,例如,手机装壳之后才能上螺丝,这种上下游关系,使用依赖描述的,依赖又分为窄依赖和宽依赖。 那两个 RDD 为例,rdd2 依赖于 rdd1 ,
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
- Optionally, a list of preferred locations to compute each split on (e.g. block locations for
- an HDFS file)
RDD 源代码
RDD 的代码是非常多的,一个 RDD.scala 类就有 2000 多行。我们只捡能说明问题的就行了。
abstract class RDD[T: ClassTag](
// SparkContext 是代码的运行环境,SparkContext 中有一个 TaskSchedule 和 DAGSchedule ,前者是申请资源,后者是将 job 分割为多个 Stage ,然后提交给相应的 Executor
@transient private var _sc: SparkContext,
// deps 代表了上游算子依赖,上游可能有多个依赖,所以这里是一个 Seq .
// 这个 Seq 就是 RDD 中依赖的具体体现
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {
// compute 函数代表了 RDD 第二个特征,作用在 partition 上面的函数。
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]
// 此函数是 RDD 第一个特征的具体表现,各个 RDD 的具体实现,可以根据它获得 RDD 中的分区
protected def getPartitions: Array[Partition]
// 还是依赖相关的函数
protected def getDependencies: Seq[Dependency[_]] = deps
// 此函数对应了 RDD 的第 5 个特征。各个 RDD 的实现类,在此函数中,实现就近数据的查找。
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
// 此函数对应了 RDD 的第四个特征,针对 PairRDDFunction 的分区器。
@transient val partitioner: Option[Partitioner] = None
def sparkContext: SparkContext = sc
val id: Int = sc.newRddId()
final def dependencies: Seq[Dependency[_]] = {
...
}
final private def internalDependencies: Option[Seq[Dependency[_]]] = {
...
}
final def partitions: Array[Partition] = {
...
}
final def preferredLocations(split: Partition): Seq[String] = {
checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
getPreferredLocations(split)
}
}
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
}
}
object RDD {
private[spark] val CHECKPOINT_ALL_MARKED_ANCESTORS =
"spark.checkpoint.checkpointAllMarkedAncestors"
implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
new PairRDDFunctions(rdd)
}
// 此方法对应了 RDD 的第四个特征,有了它,只要将 RDD 中的数据转化为 tuple2 的数据格式,就能自动调用 PairRDDFunction 中的函数。
implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]): AsyncRDDActions[T] = {
new AsyncRDDActions(rdd)