Spark基础编程案例

本文介绍了Spark的基础编程案例,包括求解网站访问量、创建自定义分区、访问数据库以及使用jdbcRDD进行数据库操作。通过具体示例展示了如何处理和分析数据。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >

Spark基础编程案例

案例一:求网站的访问量

在这里插入图片描述

  • 求出访问量最高的两个网页
  • 要求显示:网页名称、访问量
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Autho: Administrator and wind
 * @Version: 2019/11/12 & 1.0
 *
 */
object MyLogCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("MyLogCount")

    val sc = new SparkContext(conf)

    /**
     * 192.168.88.1 - - [30/Jul/2017:12:54:42 +0800] "GET /MyDemoWeb/web.jsp HTTP/1.1" 200 239
     *
     * (web.jsp,1)
     */
    val rdd1 = sc.textFile("E:\\test\\localhost_access_log.2017-07-30.txt")
      .map(
        line => {
          //解析字符串
          //1、得到两个引号之间的东西
          val index1 = line.indexOf("\"")
          val index2 = line.lastIndexOf("\"")
          val line1 = line.substring(index1+1,index2)// GET /MyDemoWeb/web.jsp HTTP/1.1
          println(line1)

          //2、得到两个空格之间的东西
          val index3 = line1.indexOf(" ")
          val index4 = line1.lastIndexOf(" ")
          val line2 = line1.substring(index3+1,index4)
          println(line2)


          //得到jsp名字
          val jspName = line2.substring(line2.lastIndexOf("/")+1)

          (jspName,1)
        }
      )

    /**
     * 聚合
     */
      println("-----------------")
      val rdd2 = rdd1.reduceByKey(_+_)

    //按照value排序
    val rdd3 = rdd2.sortBy(_._2,false)

    //取出访问量最高的两个网页
    rdd3.take(2).foreach(println)

    sc.stop()

  }

}

案例二:创建自定义分区

  • 根据jsp文件的名字,将各自的访问日志放入到不同的分区文件中,如下
  • 生成的分区文件
    在这里插入图片描述
  • 例如:part-00000文件中的内容:只包含了web.jsp的访问日志
    在这里插入图片描述
import org.apache.spark.{Partitioner, SparkConf, SparkContext}

import scala.collection.mutable

/**
 * @Autho: Administrator and wind
 * @Version: 2019/11/13 & 1.0
 *
 * 按 Key 分区
 */
object MyLogPartitioner {
  def main(args: Array[String]): Unit = {
    //System.setProperty("hadoop.home.dir","E:\\bin\\hadoop-2.5.2")
    //创建SparkContext对象
    val conf = new SparkConf().setAppName("MyLogPartitioner").setMaster("local")

    val sc = new SparkContext(conf)

    /**
     * 192.168.88.1 - - [30/Jul/2017:12:54:42 +0800] "GET /MyDemoWeb/web.jsp HTTP/1.1" 200 239
     *
     * (web.jsp,1)
     */

    val rdd1 = sc.textFile("E:\\test\\localhost_access_log.2017-07-30.txt")
      .map(
        line=>{

          val index1 = line.indexOf("\"")
          val index2 = line.lastIndexOf("\"")
          val line1 = line.substring(index1+1,index2)

          val index3 = line1.indexOf(" ")
          val index4 = line1.lastIndexOf(" ")
          val line2 = line1.substring(index3+1,index4)

          val jspName = line2.substring(line2.lastIndexOf("/")+1)

          (jspName,line)
        }
      )

    val jspList = rdd1.map(_._1).distinct().collect()

    //自定义分区,新建一个类

    val myPartitioner = new MyWebPartition(jspList)

    val rdd2 = rdd1.partitionBy(myPartitioner)

    //输出
    rdd2.saveAsTextFile("E:\\test\\abc\\test.partition")

    sc.stop()
  }

}
class MyWebPartition(jspList:Array[String]) extends Partitioner{

  //定义一个集合来保存分区的条件
  val partitionMap = new mutable.HashMap[String,Int]

  var partID = 0 //分区号

  for(jsp <- jspList){
    partitionMap.put(jsp,partID)
    partID += 1
  }
  //返回有多少个分区
  def numPartitions : Int = partitionMap.size
  //根据名字,返回对应的分区
  def getPartition(key:Any):Int = partitionMap.getOrElse(key.toString(),0)

}

案例三:访问数据库

  • 将RDD的数据保存到MySQL数据库中
import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Autho: Administrator and wind
 * @Version: 2019/11/13 & 1.0
 *
 * 针对分区的数据库操作
 */
object MyLogCountToMysql {

  def main(args: Array[String]): Unit = {


    val conf = new SparkConf().setMaster("local").setAppName("MyLogCountToMysql")

    val sc = new SparkContext(conf)

    /**
     * 192.168.88.1 - - [30/Jul/2017:12:54:42 +0800] "GET /MyDemoWeb/web.jsp HTTP/1.1" 200 239
     *
     * (web.jsp,1)
     */

    val rdd1 = sc.textFile("E:\\test\\localhost_access_log.2017-07-30.txt")
      .map(
        line => {
          val index1 = line.indexOf("\"")
          val index2 = line.lastIndexOf("\"")
          val line1 = line.substring(index1 + 1, index2)

          val index3 = line1.indexOf(" ")
          val index4 = line1.lastIndexOf(" ")
          val line2 = line1.substring(index3 + 1, index4)

          val jspName = line2.substring(line2.lastIndexOf(".") + 1)

          (jspName, 1)
        })

    rdd1.foreachPartition(savaToMysql)

    sc.stop()
  }
  //针对分区的数据库操作
  def savaToMysql(it:Iterator[(String,Int)]) = {

    var conn: Connection = null
    var pst: PreparedStatement = null

    try{
      conn = DriverManager.getConnection("mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "root", "123456")

      pst = conn.prepareStatement("insert into mydata value(?,?)")

      it.foreach(
        t => {
          pst.setString(1, t._1)
          pst.setInt(2, t._2)
          pst.executeUpdate()
        })
    }catch {
      case t : Throwable => t.printStackTrace()
    }finally {
      if(pst!=null) pst.close()
      if(conn!=null) conn.close()
    }

  }


}

案例四:自定义分区

import org.apache.spark.{Partitioner, SparkConf, SparkContext}

import scala.collection.mutable

/**
 * @Autho: Administrator and wind
 * @Version: 2019/11/13 & 1.0
 *
 * 按 Key 分区
 */
object MyLogPartitioner {
  def main(args: Array[String]): Unit = {
    //System.setProperty("hadoop.home.dir","E:\\bin\\hadoop-2.5.2")
    //创建SparkContext对象
    val conf = new SparkConf().setAppName("MyLogPartitioner").setMaster("local")

    val sc = new SparkContext(conf)

    /**
     * 192.168.88.1 - - [30/Jul/2017:12:54:42 +0800] "GET /MyDemoWeb/web.jsp HTTP/1.1" 200 239
     *
     * (web.jsp,1)
     */

    val rdd1 = sc.textFile("E:\\test\\localhost_access_log.2017-07-30.txt")
      .map(
        line=>{

          val index1 = line.indexOf("\"")
          val index2 = line.lastIndexOf("\"")
          val line1 = line.substring(index1+1,index2)

          val index3 = line1.indexOf(" ")
          val index4 = line1.lastIndexOf(" ")
          val line2 = line1.substring(index3+1,index4)

          val jspName = line2.substring(line2.lastIndexOf("/")+1)

          (jspName,line)
        }
      )

    val jspList = rdd1.map(_._1).distinct().collect()

    //自定义分区,新建一个类

    val myPartitioner = new MyWebPartition(jspList)

    val rdd2 = rdd1.partitionBy(myPartitioner)

    //输出
    rdd2.saveAsTextFile("E:\\test\\abc\\test.partition")

    sc.stop()
  }

}
class MyWebPartition(jspList:Array[String]) extends Partitioner{

  //定义一个集合来保存分区的条件
  val partitionMap = new mutable.HashMap[String,Int]

  var partID = 0 //分区号

  for(jsp <- jspList){
    partitionMap.put(jsp,partID)
    partID += 1
  }
  //返回有多少个分区
  def numPartitions : Int = partitionMap.size
  //根据名字,返回对应的分区
  def getPartition(key:Any):Int = partitionMap.getOrElse(key.toString(),0)

}

案例五:使用jdbcRDD操作数据库

mport java.sql.DriverManager

import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Autho: Administrator and wind
 * @Version: 2019/11/13 & 1.0
 *
 * 使用jdbcRDD操作数据库
 */
object MysqlDemo {

  val connection = () = {
    Class.forName("com.mysql.jdbc.Driver").newInstance()
    DriverManager.getConnection("mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8")
  }

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("MysqlDemo").setMaster("local")

    val sc = new SparkContext(conf)

    val mysqlRDD = new JdbcRDD(sc,connection,"select * from emp where sal > ? and sql <= ?",900,1200,2,r=>{
      val ename = r.getString(2)
      val sal = r.getInt(4)
      (ename,sal)
    })
    val result = mysqlRDD.collect()
    println(result.toBuffer)
    sc.stop()

  }
}

评论 1
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值