import org.apache.spark.{SparkConf, SparkContext}
/**
* @Autho: Administrator and wind
* @Version: 2019/11/12 & 1.0
*
*/
object MyLogCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("MyLogCount")
val sc = new SparkContext(conf)
/**
* 192.168.88.1 - - [30/Jul/2017:12:54:42 +0800] "GET /MyDemoWeb/web.jsp HTTP/1.1" 200 239
*
* (web.jsp,1)
*/
val rdd1 = sc.textFile("E:\\test\\localhost_access_log.2017-07-30.txt")
.map(
line => {
//解析字符串
//1、得到两个引号之间的东西
val index1 = line.indexOf("\"")
val index2 = line.lastIndexOf("\"")
val line1 = line.substring(index1+1,index2)// GET /MyDemoWeb/web.jsp HTTP/1.1
println(line1)
//2、得到两个空格之间的东西
val index3 = line1.indexOf(" ")
val index4 = line1.lastIndexOf(" ")
val line2 = line1.substring(index3+1,index4)
println(line2)
//得到jsp名字
val jspName = line2.substring(line2.lastIndexOf("/")+1)
(jspName,1)
}
)
/**
* 聚合
*/
println("-----------------")
val rdd2 = rdd1.reduceByKey(_+_)
//按照value排序
val rdd3 = rdd2.sortBy(_._2,false)
//取出访问量最高的两个网页
rdd3.take(2).foreach(println)
sc.stop()
}
}
案例二:创建自定义分区
根据jsp文件的名字,将各自的访问日志放入到不同的分区文件中,如下
生成的分区文件
例如:part-00000文件中的内容:只包含了web.jsp的访问日志
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
import scala.collection.mutable
/**
* @Autho: Administrator and wind
* @Version: 2019/11/13 & 1.0
*
* 按 Key 分区
*/
object MyLogPartitioner {
def main(args: Array[String]): Unit = {
//System.setProperty("hadoop.home.dir","E:\\bin\\hadoop-2.5.2")
//创建SparkContext对象
val conf = new SparkConf().setAppName("MyLogPartitioner").setMaster("local")
val sc = new SparkContext(conf)
/**
* 192.168.88.1 - - [30/Jul/2017:12:54:42 +0800] "GET /MyDemoWeb/web.jsp HTTP/1.1" 200 239
*
* (web.jsp,1)
*/
val rdd1 = sc.textFile("E:\\test\\localhost_access_log.2017-07-30.txt")
.map(
line=>{
val index1 = line.indexOf("\"")
val index2 = line.lastIndexOf("\"")
val line1 = line.substring(index1+1,index2)
val index3 = line1.indexOf(" ")
val index4 = line1.lastIndexOf(" ")
val line2 = line1.substring(index3+1,index4)
val jspName = line2.substring(line2.lastIndexOf("/")+1)
(jspName,line)
}
)
val jspList = rdd1.map(_._1).distinct().collect()
//自定义分区,新建一个类
val myPartitioner = new MyWebPartition(jspList)
val rdd2 = rdd1.partitionBy(myPartitioner)
//输出
rdd2.saveAsTextFile("E:\\test\\abc\\test.partition")
sc.stop()
}
}
class MyWebPartition(jspList:Array[String]) extends Partitioner{
//定义一个集合来保存分区的条件
val partitionMap = new mutable.HashMap[String,Int]
var partID = 0 //分区号
for(jsp <- jspList){
partitionMap.put(jsp,partID)
partID += 1
}
//返回有多少个分区
def numPartitions : Int = partitionMap.size
//根据名字,返回对应的分区
def getPartition(key:Any):Int = partitionMap.getOrElse(key.toString(),0)
}
案例三:访问数据库
将RDD的数据保存到MySQL数据库中
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.{SparkConf, SparkContext}
/**
* @Autho: Administrator and wind
* @Version: 2019/11/13 & 1.0
*
* 针对分区的数据库操作
*/
object MyLogCountToMysql {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("MyLogCountToMysql")
val sc = new SparkContext(conf)
/**
* 192.168.88.1 - - [30/Jul/2017:12:54:42 +0800] "GET /MyDemoWeb/web.jsp HTTP/1.1" 200 239
*
* (web.jsp,1)
*/
val rdd1 = sc.textFile("E:\\test\\localhost_access_log.2017-07-30.txt")
.map(
line => {
val index1 = line.indexOf("\"")
val index2 = line.lastIndexOf("\"")
val line1 = line.substring(index1 + 1, index2)
val index3 = line1.indexOf(" ")
val index4 = line1.lastIndexOf(" ")
val line2 = line1.substring(index3 + 1, index4)
val jspName = line2.substring(line2.lastIndexOf(".") + 1)
(jspName, 1)
})
rdd1.foreachPartition(savaToMysql)
sc.stop()
}
//针对分区的数据库操作
def savaToMysql(it:Iterator[(String,Int)]) = {
var conn: Connection = null
var pst: PreparedStatement = null
try{
conn = DriverManager.getConnection("mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "root", "123456")
pst = conn.prepareStatement("insert into mydata value(?,?)")
it.foreach(
t => {
pst.setString(1, t._1)
pst.setInt(2, t._2)
pst.executeUpdate()
})
}catch {
case t : Throwable => t.printStackTrace()
}finally {
if(pst!=null) pst.close()
if(conn!=null) conn.close()
}
}
}
案例四:自定义分区
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
import scala.collection.mutable
/**
* @Autho: Administrator and wind
* @Version: 2019/11/13 & 1.0
*
* 按 Key 分区
*/
object MyLogPartitioner {
def main(args: Array[String]): Unit = {
//System.setProperty("hadoop.home.dir","E:\\bin\\hadoop-2.5.2")
//创建SparkContext对象
val conf = new SparkConf().setAppName("MyLogPartitioner").setMaster("local")
val sc = new SparkContext(conf)
/**
* 192.168.88.1 - - [30/Jul/2017:12:54:42 +0800] "GET /MyDemoWeb/web.jsp HTTP/1.1" 200 239
*
* (web.jsp,1)
*/
val rdd1 = sc.textFile("E:\\test\\localhost_access_log.2017-07-30.txt")
.map(
line=>{
val index1 = line.indexOf("\"")
val index2 = line.lastIndexOf("\"")
val line1 = line.substring(index1+1,index2)
val index3 = line1.indexOf(" ")
val index4 = line1.lastIndexOf(" ")
val line2 = line1.substring(index3+1,index4)
val jspName = line2.substring(line2.lastIndexOf("/")+1)
(jspName,line)
}
)
val jspList = rdd1.map(_._1).distinct().collect()
//自定义分区,新建一个类
val myPartitioner = new MyWebPartition(jspList)
val rdd2 = rdd1.partitionBy(myPartitioner)
//输出
rdd2.saveAsTextFile("E:\\test\\abc\\test.partition")
sc.stop()
}
}
class MyWebPartition(jspList:Array[String]) extends Partitioner{
//定义一个集合来保存分区的条件
val partitionMap = new mutable.HashMap[String,Int]
var partID = 0 //分区号
for(jsp <- jspList){
partitionMap.put(jsp,partID)
partID += 1
}
//返回有多少个分区
def numPartitions : Int = partitionMap.size
//根据名字,返回对应的分区
def getPartition(key:Any):Int = partitionMap.getOrElse(key.toString(),0)
}
案例五:使用jdbcRDD操作数据库
mport java.sql.DriverManager
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* @Autho: Administrator and wind
* @Version: 2019/11/13 & 1.0
*
* 使用jdbcRDD操作数据库
*/
object MysqlDemo {
val connection = () = {
Class.forName("com.mysql.jdbc.Driver").newInstance()
DriverManager.getConnection("mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8")
}
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("MysqlDemo").setMaster("local")
val sc = new SparkContext(conf)
val mysqlRDD = new JdbcRDD(sc,connection,"select * from emp where sal > ? and sql <= ?",900,1200,2,r=>{
val ename = r.getString(2)
val sal = r.getInt(4)
(ename,sal)
})
val result = mysqlRDD.collect()
println(result.toBuffer)
sc.stop()
}
}