Spark之RDD常用操作

本文分享了 SPARK 操作RDD的一些代码案例

1. 什么是RDD?

Resilient Distributed Datasets (resilient 的分布式数据集)
RDDs are immutable, fault-tolerant,parallel data structures that let users explicitly persist intermediate results in memory,control their partitioning to optimize data placement, and manipulate them using a rich set of operators. (个人觉得英文写的很清楚,不必翻译了)

2. RDD的特性

  • Immutable (不可变的)
  • Fault Tolerant(容错性)
  • Parallel Data Structures(并行的数据结构)
  • In-Memory Computing(内存计算)
  • Data Partitioning and Placement(数据分区和存放)
  • Rich Set of Operations (丰富的操作方法)

3. RDD的操作

transformation vs action ?
In short, RDDs are immutable, RDD transformations are lazily evaluated, and RDD actions are eagerly evaluated and trigger the computation of your data processing logic.(个人觉得英文写的很清楚,不必翻译了) .

4. RDD的创建方式

总的来说有三种
  • 从创建的Object collection 中
  • 从外部文件例如 HDFS中获取
  • 从其他的RDD中通过transformation 获取
案例1. 从创建的Object collection 中 获取RDD
 def main(args: Array[String]): Unit = {
    val conf = new SparkConf();
    conf.setMaster("local[2]");
    conf.setAppName("create RDD by Array Object");
    conf.set("spark.testing.memory", "2147480000");
    val str_list = Array[String]("How to create RDD", "funny game","Spark is cool!");
    val sc = new SparkContext(conf);
    val RDD_str = sc.parallelize(str_list,2);
    print(RDD_str.count());

  }
案例2. 从HDFS 中 获取RDD
def main(args: Array[String]): Unit = {
    //初始化conf配置
    val sparkConf = new SparkConf()
      .setAppName("WordCount sample")
     // .setMaster("192.168.1.10:4040")
      .setMaster("local[2]")
      .set("spark.testing.memory", "2147480000");
    val sc = new SparkContext(sparkConf);
    val rdd = sc.textFile("/user/hadoop/worddir/word.txt");
    val tupleRDD = rdd.flatMap(line => {line.split(" ")
        .toList.map(word => (word.trim,1))
    });
    val resultRDD :RDD[(String,Int)] =tupleRDD.reduceByKey((a,b)=> a + b);
    resultRDD.foreach(elm => println(elm._1+"="+elm._2));

    Thread.sleep(10000);
    sc.stop();


  }
案例3. 从其他的RDD中获取RDD
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf();
    conf.setAppName("Get RDD from existed RDD");
    conf.setMaster("local[2]");
    conf.set("spark.testing.memory","2147480000");
    //conf.set("","")
    val str_array = Array("one","two","three");
    val sc = new SparkContext(conf);
    val RDD_list = sc.parallelize(str_array,2);
    val RDD_list1 = RDD_list.map(l=>{l+" ******"});
    RDD_list1.foreach(elm=>{println(elm)});

  }

5. RDD transformation 操作

一些常用的RDD操作如下
名称描述
map(func)this applies the provided function to each row as iterating through the rows in the dataset. the returned rDD will contain whatever the provided func returns
flatMap(func)similar to map(func), the func should return a collection rather than a single element, and this method will flatten out the returned collection. this allows an input item to map to zero or more output items.
filter(func)Only the elements that the func function returns true will be collected in the returned rDD. in other words, collect only the rows that meet the condition defined in the given func function.
mapPartitions(func)similar to map(func), but this applies at the partition (chunk) level. this requires the func function to take the input as an iterator to iterate through each row in the partition.
mapParitionsWithIndex(func)this is similar to mapPartitions, but an additional partition index number is provided to the func function.
mapParitionsWithIndex(func)this is similar to mapPartitions, but an additional partition index number is provided to the func function.
union(otherRDD)this is similar to mapPartitions, but an additional partition index number is provided to the func function.
intersection(otherRDD)Only the rows that exist in both the source rDD and Only the rows that exist in both the source rDD and
substract(otherRDD)this subtracts the rows in otherRDD from the source rDD.
distinct([numTasks])this removes duplicate rows from the source rDD.
sample(withReplace, fraction,seed)this is usually used to reduce a large dataset to a smaller one by randomly selecting a fraction of rows using the given seed and with or without replacements.
案例1. map(func) : 将集合中的每一行进行迭代,返回的RDD类型包含了func中的逻辑.
object RDDTest08 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf();
    conf.setMaster("local[2]");
    conf.setAppName("RDD map transformation");
    conf.set("spark.testing.memory","2147480000");
    // define class Person
    case class Person(id:Int,name:String,phoneNum:String);
    val personArray = Array("1#Jason#1233242432","2#Mike#1232131312");
    val sc = new SparkContext(conf);
    val personRDD = sc.parallelize(personArray);
    val personObjectRDD = personRDD.map(person=>{
      val personInfo = person.split("#");
      Person(personInfo(0).toInt,personInfo(1),personInfo(2));
    })
    personObjectRDD.collect.foreach(println);
  }
输出结果
Person(1,Jason,1233242432)
Person(2,Mike,1232131312)
案例2. flatMap(func) : 类似于map的操作,把每行返回的对象再次扁平化
***map vs flatMap
例如 string 类型的 collection array(“this is one”,“this is two”) 同样是以空格分割字符串,返回的结果不同
  • map 的话 返回的是 二个集合 array(“this”,“is”,“one”),array(“this”,“is”,“two”)
  • flatMap 的话 返回的是 是一个大的集合(“this”,“is”,“one”,“this”,“is”,“two”) 即把map 的返回结果扁平化输出
map 返回的类型是 Array[Array[String]]
val strArray = Array("this is one","this is two");
   val sc = new SparkContext(conf);
   val strRDD = sc.parallelize(strArray);
   val resultRDD = strRDD.map(line=>{
     line.split(" ")
   })
  val array:Array[Array[String]] = resultRDD.collect();
   array.foreach(print);
flatMap 返回的类型是 Array[String]
 val strArray = Array("this is one","this is two");
    val sc = new SparkContext(conf);
    val strRDD = sc.parallelize(strArray);
    val resultRDD = strRDD.flatMap(line=>{
      line.split(" ")
    })
   val array:Array[String] = resultRDD.collect();
    array.foreach(print);
案例3. filter(func) : 基于行的过滤,返回是RDD的结构
    val strArray = Array("this is one","this is two");
    val sc = new SparkContext(conf);
    val strRDD = sc.parallelize(strArray);
    val resultRDD = strRDD.filter(line=>{line.contains("two")});
    resultRDD.foreach(print);
案例4. mapPartitions(func): 将map 进行分区处理
 case class Person(id:Int,name:String,phoneNum:String);
    val personArray = Array("1#Jason#1233242432","2#Mike#1232131312","3#James#01902992888","4#Tom#1231232222");
    val sc = new SparkContext(conf);
    val personRDD = sc.parallelize(personArray,2);
    val personObjectRDD = personRDD.mapPartitions((iter: Iterator[String])=> {
      iter.map(person=>{
        val personInfo = person.split("#");
        Person(personInfo(0).toInt,personInfo(1),personInfo(2))
      });

    });

    personObjectRDD.collect.foreach(println);
案例5. mapPartitionsWithIndex: 将map 进行分区处理(带上分区键)
case class Person(id:Int,name:String,phoneNum:String,key:Int);
    val personArray = Array("1#Jason#1233242432","2#Mike#1232131312","3#James#01902992888","4#Tom#1231232222");
    val sc = new SparkContext(conf);
    val personRDD = sc.parallelize(personArray,2);
    val personObjectRDD = personRDD.mapPartitionsWithIndex((idx:Int,iter: Iterator[String])=> {
      iter.map(person=>{
        val personInfo = person.split("#");
        Person(personInfo(0).toInt,personInfo(1),personInfo(2),idx);
      });

    });

    personObjectRDD.collect.foreach(println);
查询4个person 对象的partition : Jason和Mike 在partiton 0 , James和Tom 在partiton 1
Person(1,Jason,1233242432,0)
Person(2,Mike,1232131312,0)
Person(3,James,01902992888,1)
Person(4,Tom,1231232222,1)
案例6. union(otherRDD) : 并集操作。 注意和传统的SQL的union不同,此操作不去重
 val intArray1 = Array(0,1,3,5,7,9);
 val intArray2 = Array(0,2,4,6,8,10);
 val sc = new SparkContext(conf);
 val intRDD1 = sc.parallelize(intArray1);
 val intRDD2 = sc.parallelize(intArray2);
 val unionRDD = intRDD1.union(intRDD2);
 println(unionRDD.collect().toList);
输出结果
List(0, 1, 3, 5, 7, 9, 0, 2, 4, 6, 8, 10) 
案例6. intersection(otherRDD) : 交集操作
 val sc = new SparkContext(conf);
 val strRDD1 = sc.parallelize(Array("one","two","three"));
 val strRDD2 = sc.parallelize(Array("two","three"));
 val intersectionRDD = strRDD1.intersection(strRDD2);
 println(intersectionRDD.collect().toList);
输出结果
List(two, three)
案例7. substract(otherRDD) : 差集操作
 val sc = new SparkContext(conf);
 val strRDD1 = sc.parallelize(Array("one","two","three"));
 val strRDD2 = sc.parallelize(Array("two","three"));
 val subtractRDD = strRDD1.subtract(strRDD2);
 println(subtractRDD.collect().toList);
输出结果
List(one)
案例8. distinct( ) 去除重复的值
val sc = new SparkContext(conf);
val duplicatedRDD = sc.parallelize(List("one",1,"two",2,"three",3,"four","four"));
print(duplicatedRDD.distinct().collect().toList);
输出结果
List(two, one, three, four, 2, 1, 3)
案例9. sample(withReplacement, fraction, seed)
这个transform 用于 分析的数据的统计信息:
withReplacement 必选项: true 表示可以重复取值, false 表示不能重复取值
fraction 必选项: 在0-1之间,便是取sample 的百分比
seed 可选项: 定义随机取值的函数,不传参的话,会默认指定一个
val sc = new SparkContext(conf);
val intArray= sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10));
print(intArray.sample(false,0.2).collect().toList);
输出结果
List(1, 4)

6. RDD action 操作

案例1. collect( ): 把RDD转换为集合
***注意如果你的结果集过大,会引起out-of-memory
***最好把结果集清洗过滤之后再转换
val sc = new SparkContext(conf);
print(sc.parallelize(Array(1,2,3,4,5)).collect().toList);
输出结果
List(1, 2, 3, 4, 5)
案例2. count( ): 计数
val sc = new SparkContext(conf);
print(sc.parallelize(Array(1,2,3,4,5)).count());
输出结果
5
案例3. first( ): 取第一个
val sc = new SparkContext(conf);
print(sc.parallelize(Array(1,2,3,4,5)).first());
输出结果
1
案例4. take(n): 取N个元素
val sc = new SparkContext(conf);
sc.parallelize(Array(1,2,3,4,5)).take(3).foreach(print);
输出结果
123
案例6. reduce(func): 合并数据处理
val sc = new SparkContext(conf);
val sumRDD = sc.parallelize(Array(1,2,3,4,5)).reduce((a1:Int,a2:Int)=>{a1+a2});
print(sumRDD.toInt);
输出结果
15
案例7. takeSample(withReplacement, n, [seed])
和transform 的sample 类似
val sc = new SparkContext(conf);
print(sc.parallelize(Array(1,2,3,4,5)).takeSample(false,2).toList);
输出结果
List(1, 5)
案例8. takeOrdered(n, [ordering])
排序输出
val sc = new SparkContext(conf);
sc.setLogLevel("ERROR");
println(sc.parallelize(Array(1,2,3,4,5)).takeOrdered(3) (Ordering[Int].reverse).toList);
println(sc.parallelize(Array(1,2,3,4,5)).takeOrdered(3).toList);
输出结果
List(5, 4, 3)
List(1, 2, 3)
案例9. top(n, [ordering])
输出top N个元素
val sc = new SparkContext(conf);
sc.setLogLevel("ERROR");
println(sc.parallelize(Array(1,2,3,4,5)).top(3).toList);
println(sc.parallelize(Array(1,2,3,4,5)).top(3)(Ordering[Int]).toList);
输出结果
List(5, 4, 3)
List(5, 4, 3)
内容概要:本文档详细介绍了在三台CentOS 7服务器(IP地址分别为192.168.0.157、192.168.0.158和192.168.0.159)上安装和配置Hadoop、Flink及其他大数据组件(如Hive、MySQL、Sqoop、Kafka、Zookeeper、HBase、Spark、Scala)的具体步骤。首先,文档说明了环境准备,包括配置主机名映射、SSH免密登录、JDK安装等。接着,详细描述了Hadoop集群的安装配置,包括SSH免密登录、JDK配置、Hadoop环境变量设置、HDFS和YARN配置文件修改、集群启动与测试。随后,依次介绍了MySQL、Hive、Sqoop、Kafka、Zookeeper、HBase、Spark、Scala和Flink的安装配置过程,包括解压、环境变量配置、配置文件修改、服务启动等关键步骤。最后,文档提供了每个组件的基本测试方法,确保安装成功。 适合人群:具备一定Linux基础和大数据组件基础知识的运维人员、大数据开发工程师以及系统管理员。 使用场景及目标:①为大数据平台建提供详细的安装指南,确保各组件能够顺利安装和配置;②帮助技术人员快速掌握Hadoop、Flink等大数据组件的安装与配置,提升工作效率;③适用于企业级大数据平台的建与维护,确保集群稳定运行。 其他说明:本文档不仅提供了详细的安装步骤,还涵盖了常见的配置项解释和故障排查建议。建议读者在安装过程中仔细阅读每一步骤,并根据实际情况调整配置参数。此外,文档中的命令和配置文件路径均为示例,实际操作时需根据具体环境进行适当修改。
在无线通信领域,天线阵列设计对于信号传播方向和覆盖范围的优化至关重要。本题要求设计一个广播电台的天线布局,形成特定的水平面波瓣图,即在东北方向实现最大辐射强度,在正东到正北的90°范围内辐射衰减最小且无零点;而在其余270°范围内允许出现零点,且正西和西南方向必须为零。为此,设计了一个由4个铅垂铁塔组成的阵列,各铁塔上的电流幅度相等,相位关系可自由调整,几何布置和间距不受限制。设计过程如下: 第一步:构建初级波瓣图 选取南北方向上的两个点源,间距为0.2λ(λ为电磁波波长),形成一个端射阵。通过调整相位差,使正南方向的辐射为零,计算得到初始相位差δ=252°。为了满足西南方向零辐射的要求,整体相位再偏移45°,得到初级波瓣图的表达式为E1=cos(36°cos(φ+45°)+126°)。 第二步:构建次级波瓣图 再选取一个点源位于正北方向,另一个点源位于西南方向,间距为0.4λ。调整相位差使西南方向的辐射为零,计算得到相位差δ=280°。同样整体偏移45°,得到次级波瓣图的表达式为E2=cos(72°cos(φ+45°)+140°)。 最终组合: 将初级波瓣图E1和次级波瓣图E2相乘,得到总阵的波瓣图E=E1×E2=cos(36°cos(φ+45°)+126°)×cos(72°cos(φ+45°)+140°)。通过编程实现计算并绘制波瓣图,可以看到三个阶段的波瓣图分别对应初级波瓣、次级波瓣和总波瓣,最终得到满足广播电台需求的总波瓣图。实验代码使用MATLAB编写,利用polar函数在极坐标下绘制波瓣图,并通过subplot分块显示不同阶段的波瓣图。这种设计方法体现了天线阵列设计的基本原理,即通过调整天线间的相对位置和相位关系,控制电磁波的辐射方向和强度,以满足特定的覆盖需求。这种设计在雷达、卫星通信和移动通信基站等无线通信系统中得到了广泛应用。
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值