Repartition与Coalesce区别及源码解析
一、核心区别概述
维度 | Coalesce | Repartition |
---|---|---|
主要用途 | 减少分区数(可避免Shuffle) | 增加或减少分区数(强制Shuffle) |
Shuffle触发 | 默认不触发(可选参数shuffle=true 触发) |
始终触发Shuffle |
数据分布 | 可能不均衡(合并相邻分区) | 均匀分布(通过HashPartitioner) |
性能开销 | 低(无Shuffle时) | 高(需全量数据重分布) |
分区数变化 | 只能减少或保持 | 可增加或减少 |
二、源码级实现解析
1. Repartition源码路径
repartition
本质调用coalesce
并强制开启Shuffle:
// RDD.scala
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true) // 强制Shuffle
}
2. Coalesce源码逻辑
根据shuffle
参数选择不同实现方式:
// RDD.scala
def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T] = {
if (shuffle) {
// 启用Shuffle时,使用ShuffledRDD均匀分配数据
val distributedRDD = new ShuffledRDD(...)
new CoalescedRDD(distributedRDD, numPartitions).values
} else {
// 默认无Shuffle,直接合并相邻分区
new CoalescedRDD(this, numPartitions)
}
}
无Shuffle时的CoalescedRDD
- 分区合并策略:按分区索引顺序合并
// CoalescedRDD.getPartitions override def getPartitions: Array[Partition] = { val partitions = parent.partitions val maxPartitions = math.min(partitions.length, numPartitions) val groupSize = partitions.length / maxPartitions (0 until maxPartitions).map { i => val start = i * groupSize val end = if (i < maxPartitions - 1) start + groupSize else partitions.length new CoalescedPartition