Spark之combineByKey算子

groupByKey和ReduceByKey均是有combineByKey实现

作用

    对数据集按照 Key 进行聚合

调用

    combineByKey(createCombiner, mergeValue, mergeCombiners, [partitioner], [mapSideCombiner], [serializer])

参数

    createCombiner 将 Value 进行初步转换

    mergeValue 在每个分区把上一步转换的结果聚合

    mergeCombiners 在所有分区上把每个分区的聚合结果聚合

    partitioner 可选, 分区函数

    mapSideCombiner 可选, 是否在 Map 端 Combine

    serializer 序列化器

注意点

    combineByKey 的要点就是三个函数的意义要理解

    groupByKey, reduceByKey 的底层都是 combineByKey
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.junit.Test

class Test01 {
    private val sc = new SparkContext(new SparkConf().setMaster("local[5]").setAppName("RDDTest"))

    @Test
    def combineByKeyTest(){
        val rdd1: RDD[(String, Double)] = sc.parallelize(Seq(
            ("zs", 99.0),
            ("zs", 96.0),
            ("lisi", 78),
            ("lisi", 67),
            ("zs", 90)
        ))
        val rdd2: RDD[(String, (Double, Int))] = rdd1.combineByKey(
            // 只作用第一条数据
            createCombiner = (curr: Double) => (curr, 1),
            mergeValue = (curr: (Double, Int), nextValue: Double) => (curr._1 + nextValue, 1 + curr._2),
            mergeCombiners = (curr: (Double, Int), aggr: (Double, Int)) => (curr._1 + aggr._1, curr._2 + aggr._2)
        )
        val rdd3 = rdd2.map(item => item._1 -> item._2._1/item._2._2)

        rdd3.foreach(println)
    }
}
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值