Spark运行时框架
在分布式环境下,Spark集群采用的是主/从结构。一个节点负责中央协调,该节点被称为驱动器(Driver)节点.与之对应的工作节点被称为执行器(executor)节点。驱动器节点可以和大量的执行器节点进行通信,他们也都作为独立的Java进程运行。驱动器
和所有的执行器被称为一个Spark应用
Spark通过一个叫做做集群管理器的外部服务在急群众的机器上启动。
Spark可以运行在三种集群管理器上:独立集群管理器(自带)、Hadoop Yarn和Apache Mesos。
驱动器节点
驱动器:执行程序中main()方法的进程,用来执行创建SparkContext,RDD以及RDD转化操作、行动操作代码。
Spark程序隐式创建了一个由操作组成的逻辑上的有向无环图,驱动器将其转化为物理执行计划。
Spark会对逻辑执行计划做一些优化,如将连续的映射转为流水线化执行,将多个操作合并到一个步骤中等。
Spark将逻辑计划转化为一系列步骤,每个步骤由多个任务组成。
任务是最小的工作单位
Spark驱动器程序在各个执行器进程间协调任务的调度。。执行器启动后,向驱动器进程注册自己。
当任务执行师,执行器会把缓存数据存储起来,驱动器会跟踪这些缓存数据的位置,并利用这些位置信息调度任务,从而减少网络传输。
驱动器会将程序运行时的一些信息通过网页形式展示出来。
执行器节点
执行器是一种工作进程,负责在Spark中执行任务看,任务间相互独立。
执行器两大任务:
1、执行Spark应用的任务,并将结果返回给驱动器
2、通过自身的块管理器,为用户程序要求缓存的RDD提供内存式存储。RDD直接缓存在执行器内,因此任务可以利用缓存数据加速运算。
启动程序
可以通过spark-submit提交到对应的集群管理器上。
使用spark-submit部署应用
调用spark-submit时,如果没有别的参数,那么spark程序只会在本地执行。
–master参数可以指定运行模式(本地/集群)
值 | 描述 |
---|---|
spark://host:port | 连接到指定端口的Spark独立集群上,默认情况下Spark独立主节点使用7077端口 |
mesos://host:port | 连接到指定Mesos集群上。默认主节点监听5050端口 |
yarn | 连接到一个YARN集群。当在YARN上运行时,需设置环境变量HADOOP_CONF_DIR配置目录,以获取集群信息 |
local | 运行本地模式,使用单核 |
local[N] | 运行本地模式,使用N个核心 |
local[*] | 运行本地模式,使用尽可能多的核心 |
//spark-submit的一般格式
bin/spark-submit [options] <app jar|python file> [app options]
spark-submit的一些常见标记
标记 | 描述 |
---|---|
–master | 表示要连接的集群管理器 |
–deploy-mode | 选择在本地(客户端)启动驱动器程序,还是在集群中的一台工作节点机器上启动。默认为本地模式 |
–class | 运行Scala程序时应用的主类 |
–name | 应用的显示名,会显示在Spark的网页用户界面上 |
–jars | 需要上传并放到应用的CLASSPATH中的JAR包的列表,如果应用依赖少量第三方JAR包,可以将他们放在这个列表 |
–files | 需要放到应用目录中的文件列表,这个参数一般用来存放数据文件 |
–py-files | 需要添加到PYTHONPATH中的文件的列表,其中可以包含.py,.egg以及.zip文件 |
–executor-memory | 执行器进程使用的内存量,以字节为单位,也可以使用更大单位,如”512m”、“15g” |
–driver-memory | 驱动器进程使用的内存量,以字节为单位,也可以使用更大单位,如”512m”、“15g” |
spark-submit 还允许通过–conf prop=value标记任意的SparkConf配置选项,也可以使用 –properties-File指定一个包含键值对的属性文件。
//使用各项选项调用Spark-submit
# 使用独立集群模式提交Java应用
$ ./bin/spark-submit \
--master spark://hostname:7077 \
--deploy-mode cluster \
--class com.databricks.examples.SparkExample \
--name "Example Program" \
--jars dep1.jar,dep2.jar,dep3.jar \
--total-executor-cores 300 \
--executor-memory 10g \
myApp.jar "options" "to your application" "go here"
# 使用YARN客户端模式提交Python应用
$ export HADOP_CONF_DIR=/opt/hadoop/conf
$ ./bin/spark-submit \
--master yarn \
--py-files somelib-1.2.egg,otherlib-4.4.zip,other-file.py \
--deploy-mode client \
--name "Example Program" \
--queue exampleQueue \
--num-executors 40 \
--executor-memory 10g \
my_script.py "options" "to your application" "go here"
Java和Scala用户可以通过spark-submit的–jars标记独立提交JAR包依赖。如果依赖过多,需要使用构建工具,生成单个大JAR包,包含应用的所有的传递依赖,这个JAR包被称为超级JAR或组合JAR。
最常用的构建工具有Maven和sbt。Maven一般用于Java工程,而sbt一般用于Scala工程。
使用sbt构建的用Scala编写的Spark应用
sbt是一个在Scala中使用的比较新的构建工具。在工程的根目录中,需要创建一个叫做build.sbt的构建文案件,源码则放在src/main/scala 中。sbt构建文件是用配置语言写成的,在这个文件中把值付给特定的键,用来定义工程的创建。
//使用sbt0.13的spark应用的build.sbt文件
import AssemblyKeys._
name := "Simple Project"
version := "1.0"
organization := "com.databricks"
scalaVersion := "2.10.3"
libraryDependencies ++= Seq(
// Spark依赖
"org.apache.spark" % "spark-core_2.10" % "1.2.0" % "provided",
// 第三方库
"net.sf.jopt-simple" % "jopt-simple" % "4.3",
"joda-time" % "joda-time" % "2.0"
)
// 这条语句打开了assembly插件的功能
assemblySettings
// 配置assembly插件所使用的JAR
jarName in assembly := "my-project-assembly.jar"
// 一个用来把Scala本身排除在组合JAR包之外的特殊选项,因为Spark
// 已经包含了Scala
assemblyOption in assembly :=
(assemblyOption in assembly).value.copy(includeScala = false)
这个构筑文件第一行从插件中引入了一些功能,这个插件用来支持创建项目的组合JAR包。要使用这个插件,需要在project/目录下加入一个小文件,来列出对插件的依赖。我们只需要创建project/assembly.sbt文件,并在其中加入addSbtPlugin(“com.eed3si9n”%”sbt-assembly”%”0.11.2”)。
//在sbt工程构建中添加assembly插件
# 显示project/assembly.sbt的内容
$ cat project/assembly.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")
// 打包使用 sbt 构建的 Spark 应用
$ sbt assembly
# 在目标路径中,可以看到一个组合JAR包
$ ls target/scala-2.10/
my-project-assembly.jar
# 展开组合JAR包可以看到依赖库中的类
$ jar tf target/scala-2.10/my-project-assembly.jar
...
joptsimple/HelpFormatter.class
...
org/joda/time/tz/UTCProvider.class
...
# 组合JAR可以直接传给spark-submit
$ /path/to/spark/bin/spark-submit --master local ...
target/scala-2.10/my-project-assembly.jar
依赖冲突
解决以来冲突的方法有两个:
1、修改应用,使其使用的依赖库的版本与Spark所使用的相同
2、使用shading的方式打包应用,shading可以以另一个命名空间保留冲突的包,并自动重写应用的代码,使得它们使用重命名后的版本。