1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 | | package com.lyzx.day09
import org.apache.spark.{SparkContext, SparkConf}
/** * 一个scala版本的wordCount小栗子 * scala的版本是2.10.4 * spark的版本是1.6.1 * 其中引入的jar包是 * spark-1.6.1-bin-hadoop2.6\lib\spark-assembly-1.6.1-hadoop2.6.0.jar * 只有这一个jar包,注意不要把lib下面的所有jar包都引入,会报错 */ class T1 {
}
object T1{ def main(args: Array[String]): Unit = { //spark配置对象 val conf = new SparkConf() //设置这个程序的名字 conf.setAppName("myFisrtSparkProgramer...") //设置运行模式 conf.setMaster("local")
//spark上下文对象,需要使用到spark配置文件 val sc = new SparkContext(conf)
/** * 通过上下文变量sc读取文本文件wc * 文本文件wc中存放的原始内容 * hello world * hello lyzx * hello c * hello java * hello c++ */ val lineRdd = sc.textFile("wc")
/** * 通过flatMap把每一行数据全部读取出来后放到一个RDD里面 * 经过flatMap后的数据如下: * [hello,world,hello,lyzx,hello,c,hello,java,hello,c++] */ val worlds = lineRdd.flatMap(item=>item.split(" "))
/** * 把每一个单词作为key,值为1映射为一个元组 * 经过Map操作后的数据如下 * [(hello,1),(world,1),(hello,1),(lyzx,1),(hello,1),(c,1),(hello,1),(java,1),(hello,1),(c++,1)] */ val worldTupe = worlds.map(item=>(item,1))
/** * 通过reduceByKey做规约 * 也就是把key相同的做_+_的操作,其中_+_是scala的一种简单写法,完整写法为(x,y)=>x+y * 经过这一个步骤后数据如下 * [(hello,5),(world,1),(lyzx,1),(c,1),(java,1),(c++,1)] */ val world = worldTupe.reduceByKey(_+_)
/** * 对最后的结果使用foreach算子把所有的结果的键值循环打印出来 */ world.foreach(item=>println(item._1+" "+item._2))
//最后释放资源,类似于java中inputStream.close() sc.stop() } } |