一、filter
过滤算子
scala版本
val conf = new SparkConf().setMaster("local[2]").setAppName("filterscala")
val sc=new SparkContext(conf)
val rdd = sc.textFile("in/sample.txt")
rdd.filter(_.contains("zks")).collect.foreach(println)
java版本
public static void main(String[] args) {
SparkConf conf=new SparkConf().setMaster("local[*]").setAppName("filterJava");
JavaSparkContext sc=new JavaSparkContext(conf);
JavaRDD<String> lines=sc.textFile("in/sample.txt");
JavaRDD<String> filterRdd=lines.filter(new Function<String,Boolean>(){
@override
public Boolean call(String v) throws Exception{
return v1.contains("zks");
}
});
List<String> collect=filterRdd.collect();
for(String str:collect){
System.out.println(str);
}
}
二、map
map() 接收一个函数,把这个函数用于 RDD 中的每个元素,将函数的返回结果作为结果RDD编程
RDD 中对应元素的值 map是一对一的关系
示例:将每个元素变成元组
scala版本
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local[2]").setAppName("mapScala")
val sc=new SparkContext(conf)
val lines =sc.textFile("in/sample.txt") //装载文件
val mapRdd =lines.map(x=>(x.split(" ")(0),1))
mapRdd.collect.foreach(println)
}
java版本
SparkConf conf=new SparkConf().setMaster("local[2]").setAppName("mapJava");
JavaSparkContext sc=new JavaSparkContext(conf);
JavaRDD<String> lines=sc.textFile("in/sample.txt");
JavaRDD<Iterable> mapRdd=lines.map(new Function<String, Iterable>() {
@Override
public Iterable call(String v2) throws Exception {
String[] split=v2.split(" ");
return Arrays.asList(split);
}
});
List<Iterable> collect= mapRdd.collect();
for (Iterable it : collect){
Iterator iterator=it.iterator();
while (iterator.hasNext()){
System.out.println(iterator.next());
}
}
三、flatMap
对某个元素生成多个元素,实现该功能的操作叫作 flatMap()
faltMap的函数应用于每一个元素,对于每一个元素返回的是多个元素组成的迭代器
示例:将数据切分成词
scala版本
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local[2]").setAppName("mapdemo")
val sc=new SparkContext(conf)
val lines=sc.textFile("in/sample.txt")
val flatMapRdd=lines.flatMap(_.split(" "))
flatMapRdd.collect.foreach(println)
}
java版本,Spark2.0版本以上
SparkConf conf=new SparkConf().setMaster("local[2]").setAppName("flatMapJava");
JavaSparkContext sc=new JavaSparkContext(conf);
JavaRDD<String> lines=sc.textFile("in/sample.txt");
JavaRdd<String> flatMapRdd=lines.flatMap(new FlatMapFunction<String,String>(){
@override
public Iterator<String> call(String s)throws Exception{
String[] split=s.split("\\s+");
return Arrays.asList(split).iterator();
}
});
List<String> collect =flatMapRdd.collect();
for(String s:collect){
System.out.println(s);
}