spark-submit --class wordcount
--master spark://aml168:7077
--driver-memory 800M
--executor-memory 600M
--jars ../examples/jars/wordcount.jar 指定要执行的jar包 或者所在文件夹
../examples/jars/wordcount.jar
1. linux 环境 执行SPARK_HOME 下的bin/spark-submit,接着去调用SparkClass脚本
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
2.调用spark-class 脚本
spark-class
组建一个Java命令 启动
1.获取Java环境
RUNNER="${JAVA_HOME}/bin/java"
2.使用这个org.apache.spark.launcher.Main 类来调整传入的参数
build_command() {
"$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
printf "%d\0" $?
}
3.将从Main类输出的结果当作参数,保存到一个数组CMD 中
set +o posix
CMD=()
DELIM=$'\n'
CMD_START_FLAG="false"
while IFS= read -d "$DELIM" -r ARG; do
if [ "$CMD_START_FLAG" == "true" ]; then
CMD+=("$ARG")
else
if [ "$ARG" == $'\0' ]; then
# After NULL character is consumed, change the delimiter and consume command string.
DELIM=''
CMD_START_FLAG="true"
elif [ "$ARG" != "" ]; then
echo "$ARG"
fi
fi
done < <(build_command "$@")
# (command)< <(command) 这个 < < 是将左边命令的结果当作右边命令的参数
3.执行这个Java -cp 启动命令,就是调用 org.apache.spark.deploy.SparkSubmit的main方法
exec "${CMD[@]}"
3.进入SparkSubmit的main方法
1.
根据提交类型进入submit
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
2.点进去是runMain方法
runMain(args, uninitLog)
2.1 runMain中有下面这个
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
2.2进入prepareSubmitEnvironment 方法
第640行左右 会对childMainClass 进行赋值
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
childMainClass = YARN_CLUSTER_SUBMIT_CLASS
private[deploy] val YARN_CLUSTER_SUBMIT_CLASS =
"org.apache.spark.deploy.yarn.YarnClusterApplication"
可以看到是使用org.apache.spark.deploy.yarn.YarnClusterApplication
2.3接下来调用反射,调用org.apache.spark.deploy.yarn.YarnClusterApplication类的statr方法
mainClass = Utils.classForName(childMainClass)
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
mainClass.newInstance().asInstanceOf[SparkApplication]
app.start(childArgs.toArray, sparkConf)
4. 查看org.apache.spark.deploy.yarn.YarnClusterApplication类
双击shift 查看这个类的时候发现找不到,需要加一个pom依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_2.11</artifactId>
<version>2.1.1</version>
</dependency>
或者是下载spark官网下载源码,加载core文件夹下的目录
override def main(args: Array[String]): Unit = {
val submit = new SparkSubmit() {
self =>
override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
new SparkSubmitArguments(args) {
override protected def logInfo(msg: => String): Unit = self.logInfo(msg)
override protected def logWarning(msg: => String): Unit = self.logWarning(msg)
}
}
override protected def logInfo(msg: => String): Unit = printMessage(msg)
override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg")
override def doSubmit(args: Array[String]): Unit = {
try {
super.doSubmit(args)
} catch {
case e: SparkUserAppException =>
exitFn(e.exitCode)
}
}
}
//执行重写的doSubmit 方法
submit.doSubmit(args)
}
def doSubmit(args: Array[String]): Unit = {
// Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
// be reset before the application starts.
val uninitLog = initializeLogIfNecessary(true, silent = true)
val appArgs = parseArguments(args)
if (appArgs.verbose) {
logInfo(appArgs.toString)
}
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
case SparkSubmitAction.PRINT_VERSION => printVersion()
}
}
runMain(args: SparkSubmitArguments, uninitLog: Boolean) 方法,可以忽略掉uninitLog参数,就是记录日志的,args可以看到已经被保存在SparkSumitArguments对象中,我们提交的spark-submit --class wordcount 。。。已经被解析并保存进去了
/**
* Run the main method of the child class using the submit arguments.
* 使用Submit参数运行子类的main方法
* This runs in two steps. First, we prepare the launch environment by setting up
* the appropriate classpath, system properties, and application arguments for
* running the child main class based on the cluster manager and the deploy mode.
* Second, we use this launch environment to invoke the main method of the child
* main class.
* 这个执行分为两步。首先是为运行子类main方法准备好适合的类路径,系统路径,提交的参数,根据集群管理者(类似yarn 我猜)和提交模式来确定。
* 第二步,调用反射来执行提供的class的main方法
* Note that this main class will not be the one provided by the user if we're
* running cluster deploy mode or python applications.
* 注意这个情况,如果是集群部署或者python应用的时候,那么运行的class 不是由用户提供的,而是运行由spark提供的类
*/
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
//准备环境
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
//反射
mainClass = Utils.classForName(childMainClass)
//实例化mainClass
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
mainClass.newInstance().asInstanceOf[SparkApplication]
// 启动app
app.start(childArgs.toArray, sparkConf)
6.如果是采用yarn模式,那么会跑这个类org.apache.spark.deploy.yarn.YarnClusterApplication
//会运行这个类
private[deploy] val YARN_CLUSTER_SUBMIT_CLASS =
"org.apache.spark.deploy.yarn.YarnClusterApplication"
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
childMainClass = YARN_CLUSTER_SUBMIT_CLASS
if (args.isPython) {
childArgs += ("--primary-py-file", args.primaryResource)
childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
} else if (args.isR) {
val mainFile = new Path(args.primaryResource).getName
childArgs += ("--primary-r-file", mainFile)
childArgs += ("--class", "org.apache.spark.deploy.RRunner")
} else {
// 不是Python 不是R 走这个地方
if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
childArgs += ("--jar", args.primaryResource)
}
childArgs += ("--class", args.mainClass)
}
if (args.childArgs != null) {
args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
}
}
7.上面第五步启动app(SparkApplication类型)的start方法,发现还是运行org.apache.spark.deploy.yarn.Client类
private[spark] class YarnClusterApplication extends SparkApplication {
override def start(args: Array[String], conf: SparkConf): Unit = {
// SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
// so remove them from sparkConf here for yarn mode.
conf.remove("spark.jars")
conf.remove("spark.files")
new Client(new ClientArguments(args), conf).run()
}
}
8.