Spark的数据结构——RDD

RDD 的 5 个特征

下面来说一下 RDD 这东西,它是 Resilient Distributed Datasets 的简写。

咱们来看看 RDD 在源码的解释。

  • A list of partitions: 在大数据领域,大数据都是分割成若干个部分,放到多个服务器上,这样就能做到多线程的处理数据,这对处理大数据量是非常重要的。分区意味着,可以使用多个线程了处理。
  • A function for computing each split:作用在每个分区里面的函数,当我们读取数据之后,当然是要对其加工的,加工的定义就是我们编写的函数,这些函数主要包含转化算子、控制算子、行动算子。
  • A list of dependencies on other RDDs。一个 Spark Application 下面可以有多个 Job ,一个 action 算子就可以分出一个 job ,一个 job 里面又可以分出若干个 stage , 一个 stage 中又有多个 RDD ,RDD 之间是用上下游关系的,就像流水线的工序,公休之间也会有先后之分的,例如,手机装壳之后才能上螺丝,这种上下游关系,使用依赖描述的,依赖又分为窄依赖和宽依赖。 那两个 RDD 为例,rdd2 依赖于 rdd1 ,
  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for
  • an HDFS file)

RDD 源代码

RDD 的代码是非常多的,一个 RDD.scala 类就有 2000 多行。我们只捡能说明问题的就行了。

abstract class RDD[T: ClassTag](
    // SparkContext 是代码的运行环境,SparkContext 中有一个 TaskSchedule 和 DAGSchedule ,前者是申请资源,后者是将 job 分割为多个 Stage ,然后提交给相应的 Executor
    @transient private var _sc: SparkContext,
    // deps 代表了上游算子依赖,上游可能有多个依赖,所以这里是一个 Seq .
    // 这个 Seq 就是 RDD 中依赖的具体体现
    @transient private var deps: Seq[Dependency[_]]
  ) extends Serializable with Logging {
   
  // compute 函数代表了 RDD 第二个特征,作用在 partition 上面的函数。
  @DeveloperApi
  def compute(split: Partition, context: TaskContext): Iterator[T]
  // 此函数是 RDD 第一个特征的具体表现,各个 RDD 的具体实现,可以根据它获得 RDD 中的分区
  protected def getPartitions: Array[Partition]
  // 还是依赖相关的函数
  protected def getDependencies: Seq[Dependency[_]] = deps
  // 此函数对应了 RDD 的第 5 个特征。各个 RDD 的实现类,在此函数中,实现就近数据的查找。
  protected def getPreferredLocations(split: Partition): Seq[String] = Nil
  // 此函数对应了 RDD 的第四个特征,针对 PairRDDFunction 的分区器。
  @transient val partitioner: Option[Partitioner] = None

  def sparkContext: SparkContext = sc

  val id: Int = sc.newRddId()

  final def dependencies: Seq[Dependency[_]] = {
   
    ... 
  }

  final private def internalDependencies: Option[Seq[Dependency[_]]] = {
   
    ... 
  }

  final def partitions: Array[Partition] = {
   
     ...
  }

  final def preferredLocations(split: Partition): Seq[String] = {
   
    checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
   
      getPreferredLocations(split)
    }
  }

  final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
   
  }
}

object RDD {
   

  private[spark] val CHECKPOINT_ALL_MARKED_ANCESTORS =
    "spark.checkpoint.checkpointAllMarkedAncestors"
  implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
    (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
   
    new PairRDDFunctions(rdd)
  }
  // 此方法对应了 RDD 的第四个特征,有了它,只要将 RDD 中的数据转化为 tuple2 的数据格式,就能自动调用 PairRDDFunction 中的函数。
  implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]): AsyncRDDActions[T] = {
   
    new AsyncRDDActions(rdd)
  
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值