Spark-Core(二) - LogApp日志数据的解析&&Spark的运行架构

本文介绍了Spark进行大数据逻辑处理的方法,包括日志解析App的开发,如处理日志中的错误,计算每个域名的流量之和,以及使用纯真库解析省份访问次数的TopN。此外,详细探讨了Spark的运行架构,强调了Driver Program和Executor的角色,以及集群管理器的重要性。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >

一、Spark-Core基础篇回顾

二、Spark如何进行大数据的逻辑处理

三、Spark的运行架构(重要指数五颗星)

一、Spark-Core基础篇回顾

1、为什么选择Spark?

  • Fast:10倍于disk磁盘、100倍于memory内存
  • Easy code:编码容易、交互式的命令行interactive shell
  • Unified Stack:不管是批处理、流处理、机器学习、图计算都okay
  • Deployment:local、standalone、yarn、k8s
  • Multi Language:支持多语言的:Java、Scala、Python、R

2、RDD的两种创建方式:
textFile:local(本地)、只要是兼容hdfs的都可以
Parallelize:仅适用于本地测试

3、Transformation:
特点:Lazy延迟执行,写一堆代码并不会马上执行

4、Action算子
return a value to Driver(返回结果到Driver)

典型的Action算子:

  • collect、reduce、count、take

二、Spark进行大数据的逻辑处理-日志解析App的开发

2.1、开发Spark代码遇到的典型错误

IDEA下的一个常见错误: A master URL must be set in your configuration;有人不经会问,我在sparkConf中不是已经设置了AppName和Master么;

查看SparkContext的源码:
def this() = this(new SparkConf()) //在未传入参数时,它会new一个SparkConf,使得我们原先定义的sparkConf失效:

1、点击val sc = new SparkContext()中的SparkContext中去:
  /**
   * Create a SparkContext that loads settings from system properties (for instance, when
   * launching with ./bin/spark-submit).
   */
  def this() = this(new SparkConf())

2、再点击这个SparkConf中去:
  /** Create a SparkConf that loads defaults from system properties and the classpath */
  def this() = this(true)

//此处又创建了一个SparkConf使得我们的原先new出来sparkConf不生效了:所以需要把sparkConf传值传进去。

3、如下代码就会出现报错:
import org.apache.spark.{
   SparkConf,SparkContext}

object TestApp {
   
  def main(args: Array[String]): Unit = {
   
      val sparkConf = new SparkConf().setAppName("TestApp").setMaster("local[2]")
      val sc = new SparkContext()

    sc.parallelize(List(1,2,3,4,5,6,7,8)).foreach(println)
      sc.stop()
  }
}

4、在3的基础上把SparkConf传值传进去即可:
val sparkConf = new SparkConf().setAppName("TestApp").setMaster("yarn")
val sc = new SparkContext(sparkConf)

2.2、求每个域名下的流量之和

数据来源:自行准备的日志
主要字段:traffic(流量位于第20个字段)、domain(域名位于第11个字段)
需求分析:按照域名进行分组,然后组内求流量之和
求和:主要使用reduceByKey算子

第一次编码:
import org.apache.spark.{
   SparkConf,SparkContext}

object LogApp {
   
    def main(args: Array[String]): Unit = {
   
      val sparkConf = new SparkConf().setAppName("LogApp").setMaster("local[2]")
      val sc = new SparkContext(sparkConf)

      val lines = sc.textFile("file:///d:/baidu.log")

      val result  = lines.map( x 
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值