一、Spark-Core基础篇回顾
1、为什么选择Spark?
- Fast:10倍于disk磁盘、100倍于memory内存
- Easy code:编码容易、交互式的命令行interactive shell
- Unified Stack:不管是批处理、流处理、机器学习、图计算都okay
- Deployment:local、standalone、yarn、k8s
- Multi Language:支持多语言的:Java、Scala、Python、R
2、RDD的两种创建方式:
textFile:local(本地)、只要是兼容hdfs的都可以
Parallelize:仅适用于本地测试
3、Transformation:
特点:Lazy延迟执行,写一堆代码并不会马上执行
4、Action算子
return a value to Driver(返回结果到Driver)
典型的Action算子:
- collect、reduce、count、take
二、Spark进行大数据的逻辑处理-日志解析App的开发
2.1、开发Spark代码遇到的典型错误
IDEA下的一个常见错误: A master URL must be set in your configuration;有人不经会问,我在sparkConf中不是已经设置了AppName和Master么;
查看SparkContext的源码:
def this() = this(new SparkConf()) //在未传入参数时,它会new一个SparkConf,使得我们原先定义的sparkConf失效:
1、点击val sc = new SparkContext()中的SparkContext中去:
/**
* Create a SparkContext that loads settings from system properties (for instance, when
* launching with ./bin/spark-submit).
*/
def this() = this(new SparkConf())
2、再点击这个SparkConf中去:
/** Create a SparkConf that loads defaults from system properties and the classpath */
def this() = this(true)
//此处又创建了一个SparkConf使得我们的原先new出来sparkConf不生效了:所以需要把sparkConf传值传进去。
3、如下代码就会出现报错:
import org.apache.spark.{
SparkConf,SparkContext}
object TestApp {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("TestApp").setMaster("local[2]")
val sc = new SparkContext()
sc.parallelize(List(1,2,3,4,5,6,7,8)).foreach(println)
sc.stop()
}
}
4、在3的基础上把SparkConf传值传进去即可:
val sparkConf = new SparkConf().setAppName("TestApp").setMaster("yarn")
val sc = new SparkContext(sparkConf)
2.2、求每个域名下的流量之和
数据来源:自行准备的日志
主要字段:traffic(流量位于第20个字段)、domain(域名位于第11个字段)
需求分析:按照域名进行分组,然后组内求流量之和
求和:主要使用reduceByKey算子
第一次编码:
import org.apache.spark.{
SparkConf,SparkContext}
object LogApp {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("LogApp").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val lines = sc.textFile("file:///d:/baidu.log")
val result = lines.map( x