Spark SQL说明和操作

DataFrame关联密切的是Spark SQL技术,作为Spark中的4大模块之一,在DataFrame的基础上,将其注册为表,然后使用SQL语句进行读取处理。
主要的处理步骤如下:

操作的对应的视频如下,在腾讯课堂可免费查看所有的视频与下载简介资料

个人大数据平台的搭建与学习实践-PySpark-学习视频教程-腾讯课堂 (qq.com)

PySpark的认识和使用

1-创建DataFrame,对于Spark2使用SparkSession;对于Spark 1使用SQLContext
 

# 1 在Spark2中使用SparkSession对象
from Pysparkimport SparkContext
from Pyspark.sql import SparkSession
sc=SparkContext(appName="test_sc") 
spark=SparkSession.builder.appName('xxx_session').getOrCreate()

# 2在Spark1中使用SQLContext对象
from Pyspark.sql import SQLContext
sql_sc = SQLContext(sc)

# 3使用read进行读取各种数据源的创建DataFrame,Spark 1和Spark 2操作方法一样
df = spark.read.csv(....)
df = spark.read.json(....)
df = spark.read.jdbc(....)
df = spark.read.textFile(....)

# 或直接使用CreateDataFrame函数构建
df=spark.createDataFrame(…)

2-获取数据

可以是通过Python Pandas读取数据,或是通过SparkSession中的read属性,它是pyspark.sql.readwriter 模块中的DataFrameReader类型,在文档中可以搜索DataFrameReader,可以发现针对不同的数据源的读取函数。pyspark.sql.readwriter

3-在DataFrame的基础上构建表或视图,有很多的函数创建具有不同生命周期的视图或表

  1. createGlobalTempView(name) 使用此DataFrame创建全局临时视图,其生命周期和Spark应用程序
  2. createOrReplaceGlobalTempView(name) 使用给定的名称创建或替换全局临时视图。
  3. createTempView(name) 用DataFrame创建一个本地临时视图
  4. createOrReplaceTempView(name) 用DataFrame创建或替换一个本地临时视图
  5. registerTempTable(name) 使用此给定名称注册此DataFrame作为临时表

删除Spark SQL表或试图

  1. spark.catalog.dropGlobalTempView(name)
  2. spark.catalog.dropTempView (name)
### Spark SQL 基本操作与使用指南 Spark SQL 是 Apache Spark 提供的一个模块,用于处理结构化半结构化数据。它支持通过 SQL 查询 DataFrame/Dataset API 来操作数据,并且可以与现有的 Hive 集成。以下是 Spark SQL 的基本操作使用方法。 #### 1. 初始化 SparkSession 在使用 Spark SQL 之前,需要创建一个 `SparkSession` 对象,它是 Spark SQL 的入口点。 ```scala import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("SparkSQLExample") .config("spark.some.config.option", "some-value") .getOrCreate() ``` 上述代码初始化了一个 SparkSession 实例[^2]。 #### 2. 创建表或视图 可以通过多种方式创建表或视图,例如从 RDD、DataFrame 或外部数据源加载数据。 ```scala // 创建临时视图 val df = spark.read.json("examples/src/main/resources/people.json") df.createOrReplaceTempView("people") // 使用 SQL 查询临时视图 val sqlDF = spark.sql("SELECT * FROM people") sqlDF.show() ``` 这里展示了如何从 JSON 文件中加载数据并创建临时视图[^2]。 #### 3. 数据查询 可以通过 SQL 查询或 DataFrame API 执行查询操作。 ```scala // SQL 查询 val teenagers = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19") // DataFrame API 查询 val teenagersDF = df.filter("age BETWEEN 13 AND 19").select("name", "age") teenagersDF.show() ``` 上述示例展示了如何使用 SQL DataFrame API 进行过滤选择操作[^2]。 #### 4. 数据写入 可以将查询结果保存到不同的存储格式中,例如 Parquet、JSON 或 CSV。 ```scala // 将 DataFrame 写入 Parquet 文件 df.write.parquet("people.parquet") // 将 DataFrame 写入 CSV 文件 df.write.format("csv").option("header", "true").save("people.csv") ``` 这些代码片段说明了如何将数据写入 Parquet CSV 文件[^2]。 #### 5. 日期时间处理 在实际应用中,经常需要对日期时间进行转换。Spark SQL 提供了一些内置函数来处理日期时间数据。 ```scala // 示例:日期时间转换 val dateDF = Seq(("2023-03-01", "12:00:00")).toDF("date", "time") val resultDF = dateDF.select( to_date(col("date")).as("converted_date"), to_timestamp(concat_ws(" ", col("date"), col("time"))).as("converted_timestamp") ) resultDF.show() ``` 此代码展示了如何使用 `to_date` `to_timestamp` 函数进行日期时间转换[^3]。 #### 6. 性能优化 为了提高 Spark SQL 的性能,可以考虑以下几点: - **分区**:根据查询模式合理设计分区[^1]。 - **广播连接**:对于小表,可以启用广播连接以减少 shuffle 开销[^1]。 - **数据倾斜**:识别并解决数据倾斜问题,例如通过重新分布数据[^1]。 --- ###
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值