Spark装载数据源

Spark可以加载好多种外部数据源的格式,例如:csv,text,json,parquet等。我们在这里讲解下csv和json格式。

一、装载CSV数据源

文件链接
提取码: t4n2

文件预览
在这里插入图片描述
使用SparkContext

def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("scvdemo")
    val sc = new SparkContext(conf)

    val lines = sc.textFile("in/users.csv",3)

    val fields2 = lines.filter(v=>v.startsWith("user_id")==false).map(_.split(","))
    println(fields2.count)
}

在这里插入图片描述

  • 使用SparkSession

option参数说明:
1. delimiter 分隔符,默认为逗号,
2. nullValue 空值设置,如果不想用任何符号作为空值,可以赋值null即可
3. quote 引号字符,默认为双引号"
4. header 第一行是否不作为数据内容,作为标题(true/false)
5. inferSchema 是否自动推测字段类型(true/false)
6. ignoreLeadingWhiteSpace 是否裁剪前面的空格(true/false)
7. ignoreTrailingWhiteSpace 是否裁剪后面的空格(true/false)

def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("scvdemo")
    val spark = SparkSession.builder().config(conf).getOrCreate()

    val df = spark.read.format("csv")
      .option("header", "true").load("in/users.csv")
    //显示表头
    df.printSchema()
    //显示表数据,show中不加参数默认显示20行
    df.select("user_id","locale","birthyear").show(5)
}

在这里插入图片描述

  • 如果数据没有表头
def main(args: Array[String]): Unit = {
  val conf = new SparkConf().setMaster("local").setAppName("csvdemo2")
  val spark = SparkSession.builder().config(conf).getOrCreate()
  
  //sparkSession读取csv文件,没有表头信息,header的参数false
  val df = spark.read.format("csv").option("header","false").load("in/users2.csv")
  df.printSchema()
  
  //修改列名
  val df2 = df.withColumnRenamed("_c0","id")
  //修改列表数据类型
  val df3 = df2.withColumn("id",df2.col("id").cast("long"))
  df3.printSchema()
}

在这里插入图片描述

改名这里有个坑,下面是一些总结

  • 列名不区分大小写
    对比
//列名修改成全大写
val df2 = df.withColumn("USER_ID",df.col("user_id")).drop("user_id")
df2.printSchema()


//修改成新列名,结果新增一列
val df3 = df.withColumn("id",df.col("user_id"))
df3.printSchema()

//下面将原列删除
val df4 = df.withColumn("id",df.col("user_id")).drop("user_id")
df4.printSchema()

在这里插入图片描述

二、装载JSON数据源

  • 使用SparkContext

JSON数据解析:
(1)读取JSON格式文件
直接使用sc.textFile(“file://”)来读取.json文件即可
(2)JSON
Scala中有一个自带的JSON库 scala.util.parsing.json.JSON 可以 实现对JSON数据解析。
通过调用 JSON.parseFull(jsonString:String) 函数对输入的JSON字符串进行解析,如果解析成功则返回一个 Some( map:Map[String,Any] ) ,失败则返回None

def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("jsondemo")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("in/users.json")
    import scala.util.parsing.json._
    val rdd = lines.map(x=>JSON.parseFull(x))
    rdd.foreach(println)
}

在这里插入图片描述

  • 使用SparkSession

加载Json格式后,df会自动识别Schema
注:如果Json格式不严谨,sparkSQL能将问题数据解析出来

def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("jsondemo")
    val spark = SparkSession.builder().config(conf).getOrCreate()

    val df = spark.read.format("json")
      .option("header", "true").load("in/users.json")
    df.printSchema()
    
    //修改列名,json同csv,列名不区分大小写
    val df2 = df.withColumnRenamed("Age","age")
    df2.printSchema()
    df2.select("age","name").show()

在这里插入图片描述

    //修改列数据类型
    val df3 =
      df.withColumn("age",df.col("Age").cast("Int"))
    df3.printSchema()

   //操作数据,将age加 5
    val df4 = df.withColumn("age",df.col("Age")+5)
     .withColumn("name",df.col("name"))
    df4.printSchema()
    df4.show
}

在这里插入图片描述


评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值