groupByKey和ReduceByKey均是有combineByKey实现
作用
对数据集按照 Key 进行聚合
调用
combineByKey(createCombiner, mergeValue, mergeCombiners, [partitioner], [mapSideCombiner], [serializer])
参数
createCombiner 将 Value 进行初步转换
mergeValue 在每个分区把上一步转换的结果聚合
mergeCombiners 在所有分区上把每个分区的聚合结果聚合
partitioner 可选, 分区函数
mapSideCombiner 可选, 是否在 Map 端 Combine
serializer 序列化器
注意点
combineByKey 的要点就是三个函数的意义要理解
groupByKey, reduceByKey 的底层都是 combineByKey
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.junit.Test
class Test01 {
private val sc = new SparkContext(new SparkConf().setMaster("local[5]").setAppName("RDDTest"))
@Test
def combineByKeyTest(){
val rdd1: RDD[(String, Double)] = sc.parallelize(Seq(
("zs", 99.0),
("zs", 96.0),
("lisi", 78),
("lisi", 67),
("zs", 90)
))
val rdd2: RDD[(String, (Double, Int))] = rdd1.combineByKey(
createCombiner = (curr: Double) => (curr, 1),
mergeValue = (curr: (Double, Int), nextValue: Double) => (curr._1 + nextValue, 1 + curr._2),
mergeCombiners = (curr: (Double, Int), aggr: (Double, Int)) => (curr._1 + aggr._1, curr._2 + aggr._2)
)
val rdd3 = rdd2.map(item => item._1 -> item._2._1/item._2._2)
rdd3.foreach(println)
}
}