Spark RDD算子(二) filter、map、flatMap

本文深入探讨了Spark中三个重要的RDD算子:filter用于过滤数据,map进行一对一元素转换,flatMap则用于一对多的元素映射。通过Scala和Java版本的示例代码,详细解释了这三个操作的使用方法及其在数据处理中的作用。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >

一、filter

过滤算子

scala版本

val conf = new SparkConf().setMaster("local[2]").setAppName("filterscala")
val sc=new SparkContext(conf)

val rdd = sc.textFile("in/sample.txt")
rdd.filter(_.contains("zks")).collect.foreach(println)

java版本

public static void main(String[] args) {
    SparkConf conf=new SparkConf().setMaster("local[*]").setAppName("filterJava");
    JavaSparkContext sc=new JavaSparkContext(conf);

    JavaRDD<String> lines=sc.textFile("in/sample.txt");
    JavaRDD<String> filterRdd=lines.filter(new Function<String,Boolean>(){
        @override
        public Boolean call(String v) throws Exception{
            return v1.contains("zks");
        }
    });
    
    List<String> collect=filterRdd.collect();
    for(String str:collect){
        System.out.println(str);
    }
}

二、map

map() 接收一个函数,把这个函数用于 RDD 中的每个元素,将函数的返回结果作为结果RDD编程
RDD 中对应元素的值 map是一对一的关系

示例:将每个元素变成元组
scala版本

def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local[2]").setAppName("mapScala")
    val sc=new SparkContext(conf)

    val lines =sc.textFile("in/sample.txt")  //装载文件
    val mapRdd =lines.map(x=>(x.split(" ")(0),1))
    mapRdd.collect.foreach(println)
}

java版本

SparkConf conf=new SparkConf().setMaster("local[2]").setAppName("mapJava");
JavaSparkContext sc=new JavaSparkContext(conf);

JavaRDD<String> lines=sc.textFile("in/sample.txt");
JavaRDD<Iterable> mapRdd=lines.map(new Function<String, Iterable>() {
    @Override
    public Iterable call(String v2) throws Exception {
        String[] split=v2.split(" ");
        return Arrays.asList(split);
    }
});

List<Iterable> collect= mapRdd.collect();
for (Iterable it : collect){
    Iterator iterator=it.iterator();
    while (iterator.hasNext()){
        System.out.println(iterator.next());
    }
}

三、flatMap

对某个元素生成多个元素,实现该功能的操作叫作 flatMap()
faltMap的函数应用于每一个元素,对于每一个元素返回的是多个元素组成的迭代器

示例:将数据切分成词
scala版本

def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local[2]").setAppName("mapdemo")
    val sc=new SparkContext(conf)

    val lines=sc.textFile("in/sample.txt")
    val flatMapRdd=lines.flatMap(_.split(" "))
    flatMapRdd.collect.foreach(println)
}

java版本,Spark2.0版本以上

SparkConf conf=new SparkConf().setMaster("local[2]").setAppName("flatMapJava");
JavaSparkContext sc=new JavaSparkContext(conf);

JavaRDD<String> lines=sc.textFile("in/sample.txt");
JavaRdd<String> flatMapRdd=lines.flatMap(new FlatMapFunction<String,String>(){
    @override
    public Iterator<String> call(String s)throws Exception{
        String[] split=s.split("\\s+");
        return Arrays.asList(split).iterator();
    }
});
    List<String> collect =flatMapRdd.collect();
    for(String s:collect){
        System.out.println(s);
    }
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值