Spark Yarn 提交流程

spark-submit  --class wordcount

                      --master spark://aml168:7077

                      --driver-memory 800M

                      --executor-memory 600M  

                      --jars ../examples/jars/wordcount.jar   指定要执行的jar包 或者所在文件夹

                      ../examples/jars/wordcount.jar

1. linux 环境  执行SPARK_HOME 下的bin/spark-submit,接着去调用SparkClass脚本

exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

2.调用spark-class 脚本  

spark-class

组建一个Java命令 启动

1.获取Java环境 
RUNNER="${JAVA_HOME}/bin/java"


2.使用这个org.apache.spark.launcher.Main 类来调整传入的参数
build_command() {
  "$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
  printf "%d\0" $?
}

3.将从Main类输出的结果当作参数,保存到一个数组CMD 中
set +o posix
CMD=()
DELIM=$'\n'
CMD_START_FLAG="false"
while IFS= read -d "$DELIM" -r ARG; do
  if [ "$CMD_START_FLAG" == "true" ]; then
    CMD+=("$ARG")
  else
    if [ "$ARG" == $'\0' ]; then
      # After NULL character is consumed, change the delimiter and consume command string.
      DELIM=''
      CMD_START_FLAG="true"
    elif [ "$ARG" != "" ]; then
      echo "$ARG"
    fi
  fi
done < <(build_command "$@")

#   (command)< <(command)  这个 < < 是将左边命令的结果当作右边命令的参数

3.执行这个Java  -cp 启动命令,就是调用 org.apache.spark.deploy.SparkSubmit的main方法
exec "${CMD[@]}"

3.进入SparkSubmit的main方法

1.
根据提交类型进入submit
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)

2.点进去是runMain方法
runMain(args, uninitLog)

2.1 runMain中有下面这个

val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)

2.2进入prepareSubmitEnvironment 方法
第640行左右  会对childMainClass 进行赋值  
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
    if (isYarnCluster) {
      childMainClass = YARN_CLUSTER_SUBMIT_CLASS

private[deploy] val YARN_CLUSTER_SUBMIT_CLASS =
    "org.apache.spark.deploy.yarn.YarnClusterApplication"
可以看到是使用org.apache.spark.deploy.yarn.YarnClusterApplication 


2.3接下来调用反射,调用org.apache.spark.deploy.yarn.YarnClusterApplication类的statr方法
 mainClass = Utils.classForName(childMainClass)
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
      mainClass.newInstance().asInstanceOf[SparkApplication]
app.start(childArgs.toArray, sparkConf)


4. 查看org.apache.spark.deploy.yarn.YarnClusterApplication类

双击shift 查看这个类的时候发现找不到,需要加一个pom依赖 

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-yarn_2.11</artifactId>
    <version>2.1.1</version>
</dependency>

或者是下载spark官网下载源码,加载core文件夹下的目录

 

 

 

 

 

 

 

 

 

 

  override def main(args: Array[String]): Unit = {
    val submit = new SparkSubmit() {
      self =>

      override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
        new SparkSubmitArguments(args) {
          override protected def logInfo(msg: => String): Unit = self.logInfo(msg)

          override protected def logWarning(msg: => String): Unit = self.logWarning(msg)
        }
      }

      override protected def logInfo(msg: => String): Unit = printMessage(msg)

      override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg")

      override def doSubmit(args: Array[String]): Unit = {
        try {
          super.doSubmit(args)
        } catch {
          case e: SparkUserAppException =>
            exitFn(e.exitCode)
        }
      }

    }
    //执行重写的doSubmit 方法
    submit.doSubmit(args)
  }
def doSubmit(args: Array[String]): Unit = {
    // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
    // be reset before the application starts.
    val uninitLog = initializeLogIfNecessary(true, silent = true)

    val appArgs = parseArguments(args)
    if (appArgs.verbose) {
      logInfo(appArgs.toString)
    }
    appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
      case SparkSubmitAction.PRINT_VERSION => printVersion()
    }
  }
runMain(args: SparkSubmitArguments, uninitLog: Boolean) 方法,可以忽略掉uninitLog参数,就是记录日志的,args可以看到已经被保存在SparkSumitArguments对象中,我们提交的spark-submit  --class wordcount 。。。已经被解析并保存进去了
  /**
   * Run the main method of the child class using the submit arguments.
   * 使用Submit参数运行子类的main方法
   * This runs in two steps. First, we prepare the launch environment by setting up
   * the appropriate classpath, system properties, and application arguments for
   * running the child main class based on the cluster manager and the deploy mode.
   * Second, we use this launch environment to invoke the main method of the child
   * main class.
   * 这个执行分为两步。首先是为运行子类main方法准备好适合的类路径,系统路径,提交的参数,根据集群管理者(类似yarn 我猜)和提交模式来确定。
   * 第二步,调用反射来执行提供的class的main方法
   * Note that this main class will not be the one provided by the user if we're
   * running cluster deploy mode or python applications.
   * 注意这个情况,如果是集群部署或者python应用的时候,那么运行的class 不是由用户提供的,而是运行由spark提供的类
   */
  private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
    //准备环境
    val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
   

   //反射
   mainClass = Utils.classForName(childMainClass)
  
  //实例化mainClass
  val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
      mainClass.newInstance().asInstanceOf[SparkApplication]
  // 启动app
  app.start(childArgs.toArray, sparkConf)

6.如果是采用yarn模式,那么会跑这个类org.apache.spark.deploy.yarn.YarnClusterApplication

//会运行这个类    
private[deploy] val YARN_CLUSTER_SUBMIT_CLASS =
    "org.apache.spark.deploy.yarn.YarnClusterApplication"


    // In yarn-cluster mode, use yarn.Client as a wrapper around the user class
    if (isYarnCluster) {
      childMainClass = YARN_CLUSTER_SUBMIT_CLASS
      if (args.isPython) {
        childArgs += ("--primary-py-file", args.primaryResource)
        childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
      } else if (args.isR) {
        val mainFile = new Path(args.primaryResource).getName
        childArgs += ("--primary-r-file", mainFile)
        childArgs += ("--class", "org.apache.spark.deploy.RRunner")
      } else {
    // 不是Python 不是R  走这个地方
        if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
          childArgs += ("--jar", args.primaryResource)
        }
        childArgs += ("--class", args.mainClass)
      }
      if (args.childArgs != null) {
        args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
      }
    }

7.上面第五步启动app(SparkApplication类型)的start方法,发现还是运行org.apache.spark.deploy.yarn.Client类

private[spark] class YarnClusterApplication extends SparkApplication {
    
  override def start(args: Array[String], conf: SparkConf): Unit = {
    // SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
    // so remove them from sparkConf here for yarn mode.
    conf.remove("spark.jars")
    conf.remove("spark.files")

    new Client(new ClientArguments(args), conf).run()
  }

}

8.

 

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值