Repartition与Coalesce区别及源码解析

Repartition与Coalesce区别及源码解析

一、核心区别概述

维度 Coalesce Repartition
主要用途 减少分区数(可避免Shuffle) 增加或减少分区数(强制Shuffle)
Shuffle触发 默认不触发(可选参数shuffle=true触发) 始终触发Shuffle
数据分布 可能不均衡(合并相邻分区) 均匀分布(通过HashPartitioner)
性能开销 低(无Shuffle时) 高(需全量数据重分布)
分区数变化 只能减少或保持 可增加或减少

二、源码级实现解析

1. Repartition源码路径

repartition本质调用coalesce并强制开启Shuffle:

// RDD.scala
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
   
  coalesce(numPartitions, shuffle = true) // 强制Shuffle
}

2. Coalesce源码逻辑

根据shuffle参数选择不同实现方式:

// RDD.scala
def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T] = {
   
  if (shuffle) {
    
    // 启用Shuffle时,使用ShuffledRDD均匀分配数据
    val distributedRDD = new ShuffledRDD(...)
    new CoalescedRDD(distributedRDD, numPartitions).values
  } else {
   
    // 默认无Shuffle,直接合并相邻分区
    new CoalescedRDD(this, numPartitions)
  }
}
无Shuffle时的CoalescedRDD
  • 分区合并策略:按分区索引顺序合并
    // CoalescedRDD.getPartitions
    override def getPartitions: Array[Partition] = {
         
      val partitions = parent.partitions
      val maxPartitions = math.min(partitions.length, numPartitions)
      val groupSize = partitions.length / maxPartitions
      (0 until maxPartitions).map {
          i =>
        val start = i * groupSize
        val end = if (i < maxPartitions - 1) start + groupSize else partitions.length
        new CoalescedPartition
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值