通过Spark结合使用Hive和ORC存储格式

本文介绍如何使用Spark访问Hive中的数据,包括通过SparkShell交互式访问Spark、读取HDFS文件创建RDD、创建Hive ORC格式表、使用SparkSQL查询Hive表等关键步骤。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >

在这篇博客中,我们将一起分析通过Spark访问Hive的数据,主要分享以下几点内容:

1.      如何通过Spark Shell交互式访问Spark

2.      如何读取HDFS文件和创建一个RDD

3.      如何通过Spark API交互式地分析数据集

4.      如何创建Hive的ORC格式的表

5.      如何使用Spark SQL查询Hive表

6.      如何以ORC格式存储数据

 

Spark SQL使用Spark引擎对存储在HDFS或者存在的RDDs执行SQL查询。我们可以在Spark程序中使用SQL语句来操作数据。

 

1.      获取数据集

在Linux服务器终端中获取样例数据:

wget http://hortonassets.s3.amazonaws.com/tutorial/data/yahoo_stocks.csv

 

将下载的数据上传到HDFS的目录中,如下:

hdfs dfs -put ./yahoo_stocks.csv /tmp/

 

2.      启动Spark Shell

spark-shell

这里启动了spark-shell,并且能够和Hive进行交互,因为我们已经将hive-site.xml,hdfs-site.xml和core-site.xml拷贝到spark的conf目录下面了。

 

导入需要的库文件:

scala> import org.apache.spark.sql.hive.orc._

import org.apache.spark.sql.hive.orc._

 

scala> import org.apache.spark.sql._

import org.apache.spark.sql._

 

3.      创建SparkSession

在Spark 2.0中提供了SparkSession,内置支持Hive特性,包括使用HiveQL,访问Hive UDFs,并且可以从Hive表中获取数据。

 

创建实例:

scala> import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.SparkSession

 

我们使用spark-shell登录时,默认已经为我们创建了一个SparkSession的实例为spark,后面可以直接使用该实例。

Spark session available as 'spark'.

 

4.      创建ORC格式的表

在Hive中创建表:

scala> spark.sql("create table yahoo_orc_table (date STRING,open_price FLOAT, high_price FLOAT, low_price FLOAT, close_price FLOAT, volume INT, adj_price FLOAT) stored as orc")

res0: org.apache.spark.sql.DataFrame = []

 

5.      加载数据文件并创建一个RDD

scala> val yahoo_stocks =sc.textFile("hdfs://SZB-L0023776:8020/tmp/yahoo_stocks.csv")

yahoo_stocks: org.apache.spark.rdd.RDD[String] =

hdfs://SZB-L0023776:8020/tmp/yahoo_stocks.csv MapPartitionsRDD[2] at textFile at <console>:30

 

获取10行数据:

scala> yahoo_stocks.take(10).foreach(println)

Date,Open,High,Low,Close,Volume,AdjClose

2015-04-28,44.34,44.57,43.94,44.34,7188300,44.34

2015-04-27,44.65,45.10,44.25,44.36,10840900,44.36

2015-04-24,43.73,44.71,43.69,44.52,11267500,44.52

2015-04-23,43.92,44.06,43.58,43.70,14274900,43.70

2015-04-22,44.58,44.85,43.67,43.98,32241200,43.98

2015-04-21,45.15,45.18,44.45,44.49,16103700,44.49

2015-04-20,44.73,44.91,44.41,44.66,10052900,44.66

2015-04-17,45.30,45.44,44.25,44.45,13305700,44.45

2015-04-16,45.82,46.13,45.53,45.78,13800300,45.78

 

 

6.      数据的首行为字段名称

scala> val header = yahoo_stocks.first

header: String = Date,Open,High,Low,Close,Volume,Adj Close

 

下面我们创建一个新的RDD,不包括首行字段名称:

scala> val data = yahoo_stocks.mapPartitionsWithIndex { (idx, iter)=> if (idx == 0) iter.drop(1) else iter }

data: org.apache.spark.rdd.RDD[String] =MapPartitionsRDD[3] at mapPartitionsWithIndex at <console>:32

 

 

7.      创建一个Schema

scala> case class YahooStockPrice(date: String, open: Float, high:Float, low: Float, close: Float, volume: Integer, adjClose: Float)

defined class YahooStockPrice

 

8.      将Schema绑定到处理后的数据上

针对YahooStockPrice创建一个RDD,并注册为一张表:

scala> val stockprice = data.map(_.split(",")).map(row=> YahooStockPrice(row(0), row(1).trim.toFloat, row(2).trim.toFloat,row(3).trim.toFloat, row(4).trim.toFloat, row(5).trim.toInt,row(6).trim.toFloat)).toDF()

stockprice: org.apache.spark.sql.DataFrame = [date: string,open: float ... 5 more fields]

 

查看数据:

scala> stockprice.first

res4: org.apache.spark.sql.Row =[2015-04-28,44.34,44.57,43.94,44.34,7188300,44.34]

 

查看更多的数据:

scala> stockprice.show

验证Schema:

scala> stockprice.printSchema

root

 |-- date: string (nullable = true)

 |-- open: float (nullable = false)

 |-- high: float (nullable = false)

 |-- low: float (nullable = false)

 |-- close: float (nullable = false)

 |-- volume: integer (nullable = true)

 |-- adjClose: float (nullable = false)

 

 

9.      注册一个临时表

scala> stockprice.createOrReplaceTempView("yahoo_stocks_temp")

 

10.  查询创建的临时表

注意这里的表不是Hive里面的表,而是一个RDD:

scala> val results = spark.sql("SELECT * FROM yahoo_stocks_temp")

scala> results.map(t => "Stock Entry: " +t.toString).collect().foreach(println)

……

Stock Entry:[1996-05-06,32.50008,32.50008,29.37504,30.12504,8214400,1.25521]

Stock Entry: [1996-05-03,32.25,32.50008,31.24992,31.99992,6116800,1.33333]

Stock Entry:[1996-05-02,31.5,33.25008,31.5,32.87496,9731200,1.36979]

Stock Entry:[1996-05-01,30.25008,31.75008,30.0,31.62504,4881600,1.31771]

Stock Entry: [1996-04-30,31.24992,31.5,29.50008,29.74992,5003200,1.23958]

……

 

11.  作为ORC文件格式保存

我们将上面的数据写入到Hive表里面,并且存储的文件格式为ORC。

scala> results.write.format("orc").saveAsTable("yahoo_stocks_orc")

 

12.  读取ORC文件

scala> val yahoo_stocks_orc= spark.read.format("orc").load("yahoo_stocks_orc")

yahoo_stocks_orc: org.apache.spark.sql.DataFrame = [date:string, open: float ... 5 more fields]

 

 

注册一个临时基于内存的表并映射到此ORC表:

scala> yahoo_stocks_orc.createOrReplaceTempView("orcTest")

 

查询:

scala> spark.sql("SELECT * from orcTest").collect.foreach(println)

……

[1996-04-29,31.5,31.99992,30.49992,31.00008,5928000,1.29167]

[1996-04-26,31.99992,32.25,31.24992,31.75008,7561600,1.32292]

[1996-04-25,30.0,32.25,28.99992,31.24992,19478400,1.30208]

[1996-04-24,28.5,29.12496,27.75,28.99992,7795200,1.20833]

……

 

13.  查询Hive的表数据

我们在使用spark-shell登录时,默认初始化了一个spark实例:

Spark session available as 'spark'.

我们可以使用spark访问Hive的表数据。

 

scala> val tableDF =spark.sql("select * from yahoo_stocks_orc limit 10")

tableDF: org.apache.spark.sql.DataFrame = [date: string,open: float ... 5 more fields]

 

查看10行数据:

scala> tableDF.take(10).foreach(println)

[2015-04-28,44.34,44.57,43.94,44.34,7188300,44.34]

[2015-04-27,44.65,45.1,44.25,44.36,10840900,44.36]

[2015-04-24,43.73,44.71,43.69,44.52,11267500,44.52]

[2015-04-23,43.92,44.06,43.58,43.7,14274900,43.7]

[2015-04-22,44.58,44.85,43.67,43.98,32241200,43.98]

[2015-04-21,45.15,45.18,44.45,44.49,16103700,44.49]

[2015-04-20,44.73,44.91,44.41,44.66,10052900,44.66]

[2015-04-17,45.3,45.44,44.25,44.45,13305700,44.45]

[2015-04-16,45.82,46.13,45.53,45.78,13800300,45.78]

[2015-04-15,45.46,45.83,45.23,45.73,15033500,45.73]


<think>嗯,用户想了解Hive支持的存储格式以及如何选择合适的。首先我需要回忆Hive常见的存储格式有哪些。记得有TextFile、SequenceFile、RCFile、ORCFile、Parquet这些。然后每个格式的特点适用场景需要理清楚。 比如TextFile是最基础的,纯文本,适合导入导出方便,但性能可能差。SequenceFile是二进制,支持压缩分割,可能适合中间数据存储。RCFile是行列混合,而ORCParquet是更高效的行列式存储,适合查询性能要求高的场景。 然后用户可能关心如何选择,比如需要考虑压缩、查询速度、是否频繁更新、是否与外部系统交互等因素。比如ORCParquet在分析查询时现好,但如果需要Sqoop交互的话可能得用TextFile,像引用4提到的。 还要注意引用里的信息,比如引用2提到ORCParquet实际应用多,引用4提到Sqoop需要TextFile。所以回答里要结合这些引用,给出格式使用建议,最后生成相关问题。需要确保LaTeX格式正确,行内用$,独立公式用$$,但问题里可能数学公式不多。可能涉及压缩类型如Snappy、Gzip,这些在引用1里提到,所以需要关联到存储格式的选择中。</think>### Hive支持的存储格式使用场景 Hive支持多种存储格式,每种格式针对不同场景优化。以下是主要格式的特点应用建议: #### 一、存储格式 1. **文本格式(TextFile)** - **特点**:默认格式,纯文本存储(如CSV、TSV),可读性强,但无压缩且查询效率低 - **适用场景**: - 数据导入/导出需求频繁的场景(如与Sqoop交互时需使用TextFile[^4]) - 临时数据或小规模数据集 - **示例建语句**: ```sql CREATE TABLE logs (id INT, content STRING) STORED AS TEXTFILE; ``` 2. **二进制序列化文件(SequenceFile)** - **特点**:二进制键值对存储,支持压缩分片,适合Hadoop生态链交互 - **适用场景**: - MapReduce作业中间结果存储 - 需兼容Hadoop其他组件的场景 3. **行列式文件(RCFile)** - **特点**:行组内列式存储,压缩比高于TextFile,但性能弱于ORC/Parquet - **适用场景**: - 历史遗留系统兼容 - 需平衡存储与查询的场景(逐渐被ORC/Parquet替代) 4. **优化的行列式文件(ORCFile)** - **特点**: - 列式存储,支持高压缩比(如Zlib、Snappy[^1]) - 内置索引(如布隆过滤器)加速查询 - 支持ACID事务 - **适用场景**: - 大规模数据分析(如数仓核心) - 高并发查询场景[^2] - **示例建语句**: ```sql CREATE TABLE user_behavior (user_id STRING, action STRING) STORED AS ORC; ``` 5. **Apache Parquet** - **特点**: - 列式存储,支持嵌套数据结构 - 高效压缩(兼容Snappy、Gzip[^1]) - 适合Spark等生态工具 - **适用场景**: - 复杂嵌套结构数据处理(如JSON日志) - 跨计算引擎协作场景(如Hive+Spark) #### 二、选择依据 $$ \text{选择公式} = \max \left( \frac{\text{查询性能} \times \text{压缩比}}{\text{写入成本} + \text{兼容性约束}} \right) $$ 1. **优先ORC/Parquet**:99%的生产场景首选,例如: - ORC适合Hive专属高频分析 - Parquet适合跨平台数据湖 2. **强制使用TextFile**: - 与Sqoop交互时[^4] - 数据需直接人工查看 3. **特殊需求**: - 频繁更新数据 → 使用ORC(支持ACID) - 极致压缩 → ORC+Zlib压缩 #### 三、性能对比 | 格式 | 压缩比 | 查询速度 | 写入速度 | 适用操作类型 | |------------|--------|----------|----------|--------------------| | TextFile | 低 | 慢 | 快 | 全扫描 | | ORC | 高 | 快 | 中 | 列裁剪/聚合 | | Parquet | 高 | 快 | 中 | 嵌套查询/跨平台 |
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值