spark提交流程-源碼分析

org.apache.spark.deploy.SparkSubmit
-main

  -submit.doSubmit(args)
     - SparkSubmitArguments.parseArguments(args) 
       - - SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
        -  submit(args: SparkSubmitArguments, uninitLog: Boolean)
            -runMain(args: SparkSubmitArguments, uninitLog: Boolean)
            - new JavaMainApplication(mainClass)
              -app.start(childArgs.toArray, sparkConf)
// Following constants are visible for testing.
private[deploy] val YARN_CLUSTER_SUBMIT_CLASS =
  "org.apache.spark.deploy.yarn.YarnClusterApplication"

  // In yarn-cluster mode, use yarn.Client as a wrapper around the user class
  if (isYarnCluster) {
    childMainClass = YARN_CLUSTER_SUBMIT_CLASS
   //mainClass 是SparkApplication類型,構(gòu)建SparkApplication
   val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
      mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
    } else {
      new JavaMainApplication(mainClass)
    }

-YarnClusterApplication.start
//ClientArguments傳遞這spark submit的提交單數(shù)
- new Client(new ClientArguments(args), conf, null).run()
-Client.submitApplication()
private val yarnClient = YarnClient.createYarnClient
//用于與rm通信
protected ApplicationClientProtocol rmClient;

//獲取appid
protected ApplicationClientProtocol rmClient;
//穿件容器上下文環(huán)境
val containerContext = createContainerLaunchContext(newAppResponse)
containerContext 上下文包含 運行ApplicationMaster的的classname 等啟動參數(shù)

 val amClass =
      if (isClusterMode) {
        Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
      } else {
        Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
      }

//創(chuàng)建提交app的上下文環(huán)境
val appContext = createApplicationSubmissionContext(newApp, containerContext)

-yarnClient.submitApplication(appContext)
-rmClient.submitApplication(request);

搜索”org.apache.spark.deploy.yarn.ApplicationMaster“
-org.apache.spark.deploy.yarn.ApplicationMaster.main
-master = new ApplicationMaster(amArgs, sparkConf, yarnConf)
ApplicationMaster成員
private val client = new YarnRMClient()
//用于am和rm通信
-amClient = AMRMClient.createAMRMClient()
-master = new ApplicationMaster(amArgs, sparkConf, yarnConf)
-master.run()
如果是集群模式,運行Driver

 if (isClusterMode) {
        runDriver()
      } else {
        runExecutorLauncher()
      }
 private def runDriver(): Unit = {
   addAmIpFilter(None, System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV))
   userClassThread = startUserApplication()
  ... ...
   try {
    //等待startUserApplication的中driver線程將用戶代碼sparkContext的創(chuàng)建完成,否則一直阻塞在這里
     val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
       Duration(totalWaitTime, TimeUnit.MILLISECONDS))
     //sc不為空
     if (sc != null) {
       val rpcEnv = sc.env.rpcEnv

       val userConf = sc.getConf
       val host = userConf.get(DRIVER_HOST_ADDRESS)
       val port = userConf.get(DRIVER_PORT)
      //注冊AM,申請西苑
       registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId)

       val driverRef = rpcEnv.setupEndpointRef(
         RpcAddress(host, port),
         YarnSchedulerBackend.ENDPOINT_NAME)
     //創(chuàng)建分配器,返回資源可用列表
       createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)
     } else {
       // Sanity check; should never happen in normal operation, since sc should only be null
       // if the user app did not create a SparkContext.
       throw new IllegalStateException("User did not initialize spark context!")
     }
     //當(dāng)資源準(zhǔn)備就緒,調(diào)用resumeDriver方法,改變狀態(tài),讓driver線程繼續(xù)執(zhí)行(用戶代碼邏輯)
     resumeDriver()
    userClassThread執(zhí)行完之后,rundriver方法再繼續(xù)執(zhí)行
     userClassThread.join()
   } catch {
    ... ...
   } finally {
     resumeDriver()
   }
 }

userClassThread = startUserApplication()

 private def startUserApplication(): Thread = {
    logInfo("Starting the user application in a separate Thread")

 ... ...

    val mainMethod = userClassLoader.loadClass(args.userClass)
      .getMethod("main", classOf[Array[String]])

    val userThread = new Thread {
      override def run(): Unit = {
        try {
          if (!Modifier.isStatic(mainMethod.getModifiers)) {
            logError(s"Could not find static main method in object ${args.userClass}")
            finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_EXCEPTION_USER_CLASS)
          } else {
            //靜態(tài)方法,也就是用戶的用戶編寫Job Object 的main方法,調(diào)用用戶代碼
            mainMethod.invoke(null, userArgs.toArray)
            finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
            logDebug("Done running user class")
          }
        } catch {
          ......
        } ... ...
      }
    }
    userThread.setContextClassLoader(userClassLoader)
   //設(shè)置drive線程
    userThread.setName("Driver")
//設(shè)置drive線程啟動
    userThread.start()
    userThread
  }

創(chuàng)建SparkContext
val sc = new SparkContext(sparConf)
SparkContext進(jìn)行sc初始化的時候,有一段代碼如下

// Post init
   _taskScheduler.postStartHook()
private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc) {
...
  override def postStartHook(): Unit = {
//sparkContextInitialized干兩個事兒
 //SparkContext 初始化后,喚醒runDriver 方法繼續(xù)執(zhí)行
  //暫停driver線程(user thread),為了讓在runDriver函數(shù)中進(jìn)行初始化
    ApplicationMaster.sparkContextInitialized(sc)

    super.postStartHook()
    logInfo("YarnClusterScheduler.postStartHook done")
  }

}
private def sparkContextInitialized(sc: SparkContext) = {
   sparkContextPromise.synchronized {
     //SparkContext 初始化后,喚醒runDriver 方法繼續(xù)執(zhí)行
     // Notify runDriver function that SparkContext is available
     sparkContextPromise.success(sc)
     //暫停driver線程(user thread),為了讓在runDriver函數(shù)中進(jìn)行初始化
     // Pause the user class thread in order to make proper initialization in runDriver function.
     sparkContextPromise.wait()
   }
 }

-TaskSchedulerImpl.postStartHook

TaskSchedulerImpl.waitBackendReady
private def waitBackendReady(): Unit = {
    if (backend.isReady) {
      return
    }
   //循環(huán)等待知道資源就緒,此時用戶代碼不會往下執(zhí)行
   //那什么時候driver線程會繼續(xù)執(zhí)行?
//當(dāng)rundrive方法調(diào)用resumeDriver,改變backend 狀態(tài),代表資源就緒
    while (!backend.isReady) {
      // Might take a while for backend to be ready if it is waiting on resources.
      if (sc.stopped.get) {
        // For example: the master removes the application for some reason
        throw new IllegalStateException("Spark context stopped while waiting for backend")
      }
      synchronized {
        this.wait(100)
      }
    }
  }

runDriver方法的registerAM
private val client = new YarnRMClient()
client.register(host, port, yarnConf, _sparkConf, uiAddress, historyAddress)

amClient = AMRMClient.createAMRMClient()
通過AMRMClient向RM注冊
amClient.registerApplicationMaster(driverHost, driverPort, trackingUrl)

  private def createAllocator(
      driverRef: RpcEndpointRef,
      _sparkConf: SparkConf,
      rpcEnv: RpcEnv,
      appAttemptId: ApplicationAttemptId,
      distCacheConf: SparkConf): Unit = {
    ... ...

    val appId = appAttemptId.getApplicationId().toString()
    val driverUrl = RpcEndpointAddress(driverRef.address.host, driverRef.address.port,
      CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
    val localResources = prepareLocalResources(distCacheConf)
      ... ...
    //創(chuàng)建分配器
    allocator = client.createAllocator(
      yarnConf,
      _sparkConf,
      appAttemptId,
      driverUrl,
      driverRef,
      securityMgr,
      localResources)

   ...  ...
//獲取可用資源列表
    allocator.allocateResources()
    val ms = MetricsSystem.createMetricsSystem(MetricsSystemInstances.APPLICATION_MASTER,
      sparkConf, securityMgr)
    val prefix = _sparkConf.get(YARN_METRICS_NAMESPACE).getOrElse(appId)
    ms.registerSource(new ApplicationMasterSource(prefix, allocator))
    // do not register static sources in this case as per SPARK-25277
    ms.start(false)
    metricsSystem = Some(ms)
    reporterThread = launchReporterThread()
  }

//處理可用于分配的容器
handleAllocatedContainers(allocatedContainers.asScala.toSeq)

 def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {
    val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)

    // Match incoming requests by host
    val remainingAfterHostMatches = new ArrayBuffer[Container]
   //可分配容器 分類整理按主機名和機架(首選位置的應(yīng)用)
    for (allocatedContainer <- allocatedContainers) {
      matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost,
        containersToUse, remainingAfterHostMatches)
    }
... ...
    val remainingAfterRackMatches = new ArrayBuffer[Container]
    if (remainingAfterHostMatches.nonEmpty) {
      var exception: Option[Throwable] = None
      val thread = new Thread("spark-rack-resolver") {
        override def run(): Unit = {
          try {
            for (allocatedContainer <- remainingAfterHostMatches) {
              val rack = resolver.resolve(allocatedContainer.getNodeId.getHost)
              matchContainerToRequest(allocatedContainer, rack, containersToUse,
                remainingAfterRackMatches)
            }
          } catch {
            case e: Throwable =>
         ... ...
    }
    //運行已分配容器進(jìn)行
    runAllocatedContainers(containersToUse)

    logInfo("Received %d containers from YARN, launching executors on %d of them."
      .format(allocatedContainers.size, containersToUse.size))
  }

runAllocatedContainers 啟動容器

 private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = synchronized {
    for (container <- containersToUse) {
      val rpId = getResourceProfileIdFromPriority(container.getPriority)
      executorIdCounter += 1
      val executorHostname = container.getNodeId.getHost
      val containerId = container.getId
      val executorId = executorIdCounter.toString
      val yarnResourceForRpId = rpIdToYarnResource.get(rpId)
      assert(container.getResource.getMemory >= yarnResourceForRpId.getMemory)
      logInfo(s"Launching container $containerId on host $executorHostname " +
        s"for executor with ID $executorId for ResourceProfile Id $rpId")
      ... ...
          TargetNum容器大于Running的容器數(shù),說明還需要啟動容器
      if (rpRunningExecs < getOrUpdateTargetNumExecutorsForRPId(rpId)) {
        getOrUpdateNumExecutorsStartingForRPId(rpId).incrementAndGet()
        if (launchContainers) {
啟動線程池,啟動容器
          launcherPool.execute(() => {
            try {
              new ExecutorRunnable(
                Some(container),
                conf,
                sparkConf,
                driverUrl,
                executorId,
                executorHostname,
                containerMem,
                containerCores,
                appAttemptId.getApplicationId.toString,
                securityMgr,
                localResources,
                rp.id
              ).run()
              updateInternalState()
            } catch {
              ... ...
            }
          })
        } else {
          // For test only
          updateInternalState()
        }
      ... ...
    }
  }

ExecutorRunnable.run

//
nmClient = NMClient.createNMClient()
    nmClient.init(conf)
    nmClient.start()
   //通過nmClient 通知指定nm啟動contanter
    startContainer()

startContainer

def startContainer(): java.util.Map[String, ByteBuffer] = {
   ... ...
   //prepareCommand 是準(zhǔn)備啟動容器進(jìn)程的腳本
  //啟動org.apache.spark.executor.YarnCoarseGrainedExecutorBackend 進(jìn)程(excutor 的通信后臺)
    val commands = prepareCommand()

    ctx.setCommands(commands.asJava)
    ctx.setApplicationACLs(
      YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr).asJava)

     ...  ....

    // Send the start request to the ContainerManager
    try {
      //啟動container,攜帶啟動容器的上下文ctx
      nmClient.startContainer(container.get, ctx)
    } catch {
      ... ...
    }
  }

org.apache.spark.executor.YarnCoarseGrainedExecutorBackend

-CoarseGrainedExecutorBackend.run

def run(
     arguments: Arguments,
     backendCreateFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) =>
       CoarseGrainedExecutorBackend): Unit = {

   ... ...
//可以找到driver 與driver通信
     val fetcher = RpcEnv.create(
       "driverPropsFetcher",
       arguments.bindAddress,
       arguments.hostname,
       -1,
       executorConf,
       new SecurityManager(executorConf),
       numUsableCores = 0,
       clientMode = true)

     var driver: RpcEndpointRef = null
     val nTries = 3
     for (i <- 0 until nTries if driver == null) {
       try {
         driver = fetcher.setupEndpointRefByURI(arguments.driverUrl)
       } catch {
         case e: Throwable => if (i == nTries - 1) {
           throw e
         }
       }
     }
     //創(chuàng)建Executor的evn環(huán)境
     val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,
       arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)
    //設(shè)置rpcEnv環(huán)境的通信終端
backendCreateFn 其實就是YarnCoarseGrainedExecutorBackend
     env.rpcEnv.setupEndpoint("Executor",
       backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile))
     arguments.workerUrl.foreach { url =>
       env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
     }
     env.rpcEnv.awaitTermination()
   }
 }

//消息循環(huán)器
var messageLoop: MessageLoop = null
      try {
        messageLoop = endpoint match {
          case e: IsolatedRpcEndpoint =>
             //匹配消息循環(huán)器類型
            new DedicatedMessageLoop(name, e, this)
          case _ =>
            sharedLoop.register(name, endpoint)
            sharedLoop
        }
        endpoints.put(name, messageLoop)
      } catch {
        case NonFatal(e) =>
          endpointRefs.remove(endpoint)
          throw e
      }

-DedicatedMessageLoop
//收件箱
private val inbox = new Inbox(name, endpoint)

-Inbox
protected val messages = new java.util.LinkedListInboxMessage
// OnStart should be the first message to process
//放入一個OnStart消息
inbox.synchronized {
messages.add(OnStart)
}

//RpcEndpoint的生命周期
 * {@code constructor -> onStart -> receive* -> onStop}

private[spark] trait RpcEndpoint {

CoarseGrainedExecutorBackend.onStart

override def onStart(): Unit = {
  ... ...

    logInfo("Connecting to driver: " + driverUrl)
    try {
      _resources = parseOrFindResources(resourcesFileOpt)
    } catch {
      case NonFatal(e) =>
        exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
    }
    rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
      // This is a very fast action so we can use "ThreadUtils.sameThread"
      //獲取driver
      driver = Some(ref)
     //像driver發(fā)送RegisterExecutor消息
      ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,
        extractAttributes, _resources, resourceProfile.id))
    }(ThreadUtils.sameThread).onComplete {
      case Success(_) =>
        self.send(RegisteredExecutor)
      case Failure(e) =>
        exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
    }(ThreadUtils.sameThread)
  }

driver是一個線程,所有是SparkContext接收消息
_schedulerBackend是driver通信后臺
private var _schedulerBackend: SchedulerBackend = _
//集群模式的SchedulerBackend
CoarseGrainedSchedulerBackend
//消息回復(fù)
CoarseGrainedSchedulerBackend.receiveAndReply

匹配RegisterExecutor消息
     case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls,
          attributes, resources, resourceProfileId) =>
總的核數(shù) 注冊數(shù)量增加
    totalCoreCount.addAndGet(cores)
          totalRegisteredExecutors.addAndGet(1)
//最后回復(fù)一個true表示注冊成功
   context.reply(true)

CoarseGrainedExecutorBackend.收到消息

case Success(_) =>
   //給自己發(fā)送一條消息,表示注冊完畢
       self.send(RegisteredExecutor)

CoarseGrainedExecutorBackend.收到給自己發(fā)送的RegisteredExecutor消息

override def receive: PartialFunction[Any, Unit] = {
  case RegisteredExecutor =>
    logInfo("Successfully registered with driver")
    try {
    //創(chuàng)建Executor計算對象,區(qū)別CoarseGrainedExecutorBackend(通信對象)
      executor = new (executorId, hostname, env, userClassPath, isLocal = false,
        resources = _resources)
   //給driver發(fā)送LaunchedExecutor 消息
      driver.get.send(LaunchedExecutor(executorId))

driver端的CoarseGrainedSchedulerBackend receive到
LaunchedExecutor消息

  case LaunchedExecutor(executorId) =>
      //增加核數(shù)
        executorDataMap.get(executorId).foreach { data =>
          data.freeCores = data.totalCores
        }
   //做一些操作 //tode
        makeOffers(executorId)
      case e =>
        logError(s"Received unexpected message. ${e}")
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容