org.apache.spark.deploy.SparkSubmit
-main
-submit.doSubmit(args)
- SparkSubmitArguments.parseArguments(args)
- - SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
- submit(args: SparkSubmitArguments, uninitLog: Boolean)
-runMain(args: SparkSubmitArguments, uninitLog: Boolean)
- new JavaMainApplication(mainClass)
-app.start(childArgs.toArray, sparkConf)
// Following constants are visible for testing.
private[deploy] val YARN_CLUSTER_SUBMIT_CLASS =
"org.apache.spark.deploy.yarn.YarnClusterApplication"
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
childMainClass = YARN_CLUSTER_SUBMIT_CLASS
//mainClass 是SparkApplication類型,構(gòu)建SparkApplication
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
} else {
new JavaMainApplication(mainClass)
}
-YarnClusterApplication.start
//ClientArguments傳遞這spark submit的提交單數(shù)
- new Client(new ClientArguments(args), conf, null).run()
-Client.submitApplication()
private val yarnClient = YarnClient.createYarnClient
//用于與rm通信
protected ApplicationClientProtocol rmClient;
//獲取appid
protected ApplicationClientProtocol rmClient;
//穿件容器上下文環(huán)境
val containerContext = createContainerLaunchContext(newAppResponse)
containerContext 上下文包含 運行ApplicationMaster的的classname 等啟動參數(shù)
val amClass =
if (isClusterMode) {
Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
} else {
Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
}
//創(chuàng)建提交app的上下文環(huán)境
val appContext = createApplicationSubmissionContext(newApp, containerContext)
-yarnClient.submitApplication(appContext)
-rmClient.submitApplication(request);
搜索”org.apache.spark.deploy.yarn.ApplicationMaster“
-org.apache.spark.deploy.yarn.ApplicationMaster.main
-master = new ApplicationMaster(amArgs, sparkConf, yarnConf)
ApplicationMaster成員
private val client = new YarnRMClient()
//用于am和rm通信
-amClient = AMRMClient.createAMRMClient()
-master = new ApplicationMaster(amArgs, sparkConf, yarnConf)
-master.run()
如果是集群模式,運行Driver
if (isClusterMode) {
runDriver()
} else {
runExecutorLauncher()
}
private def runDriver(): Unit = {
addAmIpFilter(None, System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV))
userClassThread = startUserApplication()
... ...
try {
//等待startUserApplication的中driver線程將用戶代碼sparkContext的創(chuàng)建完成,否則一直阻塞在這里
val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
Duration(totalWaitTime, TimeUnit.MILLISECONDS))
//sc不為空
if (sc != null) {
val rpcEnv = sc.env.rpcEnv
val userConf = sc.getConf
val host = userConf.get(DRIVER_HOST_ADDRESS)
val port = userConf.get(DRIVER_PORT)
//注冊AM,申請西苑
registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId)
val driverRef = rpcEnv.setupEndpointRef(
RpcAddress(host, port),
YarnSchedulerBackend.ENDPOINT_NAME)
//創(chuàng)建分配器,返回資源可用列表
createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)
} else {
// Sanity check; should never happen in normal operation, since sc should only be null
// if the user app did not create a SparkContext.
throw new IllegalStateException("User did not initialize spark context!")
}
//當(dāng)資源準(zhǔn)備就緒,調(diào)用resumeDriver方法,改變狀態(tài),讓driver線程繼續(xù)執(zhí)行(用戶代碼邏輯)
resumeDriver()
userClassThread執(zhí)行完之后,rundriver方法再繼續(xù)執(zhí)行
userClassThread.join()
} catch {
... ...
} finally {
resumeDriver()
}
}
userClassThread = startUserApplication()
private def startUserApplication(): Thread = {
logInfo("Starting the user application in a separate Thread")
... ...
val mainMethod = userClassLoader.loadClass(args.userClass)
.getMethod("main", classOf[Array[String]])
val userThread = new Thread {
override def run(): Unit = {
try {
if (!Modifier.isStatic(mainMethod.getModifiers)) {
logError(s"Could not find static main method in object ${args.userClass}")
finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_EXCEPTION_USER_CLASS)
} else {
//靜態(tài)方法,也就是用戶的用戶編寫Job Object 的main方法,調(diào)用用戶代碼
mainMethod.invoke(null, userArgs.toArray)
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
logDebug("Done running user class")
}
} catch {
......
} ... ...
}
}
userThread.setContextClassLoader(userClassLoader)
//設(shè)置drive線程
userThread.setName("Driver")
//設(shè)置drive線程啟動
userThread.start()
userThread
}
創(chuàng)建SparkContext
val sc = new SparkContext(sparConf)
SparkContext進(jìn)行sc初始化的時候,有一段代碼如下
// Post init
_taskScheduler.postStartHook()
private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc) {
...
override def postStartHook(): Unit = {
//sparkContextInitialized干兩個事兒
//SparkContext 初始化后,喚醒runDriver 方法繼續(xù)執(zhí)行
//暫停driver線程(user thread),為了讓在runDriver函數(shù)中進(jìn)行初始化
ApplicationMaster.sparkContextInitialized(sc)
super.postStartHook()
logInfo("YarnClusterScheduler.postStartHook done")
}
}
private def sparkContextInitialized(sc: SparkContext) = {
sparkContextPromise.synchronized {
//SparkContext 初始化后,喚醒runDriver 方法繼續(xù)執(zhí)行
// Notify runDriver function that SparkContext is available
sparkContextPromise.success(sc)
//暫停driver線程(user thread),為了讓在runDriver函數(shù)中進(jìn)行初始化
// Pause the user class thread in order to make proper initialization in runDriver function.
sparkContextPromise.wait()
}
}
-TaskSchedulerImpl.postStartHook
TaskSchedulerImpl.waitBackendReady
private def waitBackendReady(): Unit = {
if (backend.isReady) {
return
}
//循環(huán)等待知道資源就緒,此時用戶代碼不會往下執(zhí)行
//那什么時候driver線程會繼續(xù)執(zhí)行?
//當(dāng)rundrive方法調(diào)用resumeDriver,改變backend 狀態(tài),代表資源就緒
while (!backend.isReady) {
// Might take a while for backend to be ready if it is waiting on resources.
if (sc.stopped.get) {
// For example: the master removes the application for some reason
throw new IllegalStateException("Spark context stopped while waiting for backend")
}
synchronized {
this.wait(100)
}
}
}
runDriver方法的registerAM
private val client = new YarnRMClient()
client.register(host, port, yarnConf, _sparkConf, uiAddress, historyAddress)
amClient = AMRMClient.createAMRMClient()
通過AMRMClient向RM注冊
amClient.registerApplicationMaster(driverHost, driverPort, trackingUrl)
private def createAllocator(
driverRef: RpcEndpointRef,
_sparkConf: SparkConf,
rpcEnv: RpcEnv,
appAttemptId: ApplicationAttemptId,
distCacheConf: SparkConf): Unit = {
... ...
val appId = appAttemptId.getApplicationId().toString()
val driverUrl = RpcEndpointAddress(driverRef.address.host, driverRef.address.port,
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
val localResources = prepareLocalResources(distCacheConf)
... ...
//創(chuàng)建分配器
allocator = client.createAllocator(
yarnConf,
_sparkConf,
appAttemptId,
driverUrl,
driverRef,
securityMgr,
localResources)
... ...
//獲取可用資源列表
allocator.allocateResources()
val ms = MetricsSystem.createMetricsSystem(MetricsSystemInstances.APPLICATION_MASTER,
sparkConf, securityMgr)
val prefix = _sparkConf.get(YARN_METRICS_NAMESPACE).getOrElse(appId)
ms.registerSource(new ApplicationMasterSource(prefix, allocator))
// do not register static sources in this case as per SPARK-25277
ms.start(false)
metricsSystem = Some(ms)
reporterThread = launchReporterThread()
}
//處理可用于分配的容器
handleAllocatedContainers(allocatedContainers.asScala.toSeq)
def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {
val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)
// Match incoming requests by host
val remainingAfterHostMatches = new ArrayBuffer[Container]
//可分配容器 分類整理按主機名和機架(首選位置的應(yīng)用)
for (allocatedContainer <- allocatedContainers) {
matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost,
containersToUse, remainingAfterHostMatches)
}
... ...
val remainingAfterRackMatches = new ArrayBuffer[Container]
if (remainingAfterHostMatches.nonEmpty) {
var exception: Option[Throwable] = None
val thread = new Thread("spark-rack-resolver") {
override def run(): Unit = {
try {
for (allocatedContainer <- remainingAfterHostMatches) {
val rack = resolver.resolve(allocatedContainer.getNodeId.getHost)
matchContainerToRequest(allocatedContainer, rack, containersToUse,
remainingAfterRackMatches)
}
} catch {
case e: Throwable =>
... ...
}
//運行已分配容器進(jìn)行
runAllocatedContainers(containersToUse)
logInfo("Received %d containers from YARN, launching executors on %d of them."
.format(allocatedContainers.size, containersToUse.size))
}
runAllocatedContainers 啟動容器
private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = synchronized {
for (container <- containersToUse) {
val rpId = getResourceProfileIdFromPriority(container.getPriority)
executorIdCounter += 1
val executorHostname = container.getNodeId.getHost
val containerId = container.getId
val executorId = executorIdCounter.toString
val yarnResourceForRpId = rpIdToYarnResource.get(rpId)
assert(container.getResource.getMemory >= yarnResourceForRpId.getMemory)
logInfo(s"Launching container $containerId on host $executorHostname " +
s"for executor with ID $executorId for ResourceProfile Id $rpId")
... ...
TargetNum容器大于Running的容器數(shù),說明還需要啟動容器
if (rpRunningExecs < getOrUpdateTargetNumExecutorsForRPId(rpId)) {
getOrUpdateNumExecutorsStartingForRPId(rpId).incrementAndGet()
if (launchContainers) {
啟動線程池,啟動容器
launcherPool.execute(() => {
try {
new ExecutorRunnable(
Some(container),
conf,
sparkConf,
driverUrl,
executorId,
executorHostname,
containerMem,
containerCores,
appAttemptId.getApplicationId.toString,
securityMgr,
localResources,
rp.id
).run()
updateInternalState()
} catch {
... ...
}
})
} else {
// For test only
updateInternalState()
}
... ...
}
}
ExecutorRunnable.run
//
nmClient = NMClient.createNMClient()
nmClient.init(conf)
nmClient.start()
//通過nmClient 通知指定nm啟動contanter
startContainer()
startContainer
def startContainer(): java.util.Map[String, ByteBuffer] = {
... ...
//prepareCommand 是準(zhǔn)備啟動容器進(jìn)程的腳本
//啟動org.apache.spark.executor.YarnCoarseGrainedExecutorBackend 進(jìn)程(excutor 的通信后臺)
val commands = prepareCommand()
ctx.setCommands(commands.asJava)
ctx.setApplicationACLs(
YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr).asJava)
... ....
// Send the start request to the ContainerManager
try {
//啟動container,攜帶啟動容器的上下文ctx
nmClient.startContainer(container.get, ctx)
} catch {
... ...
}
}
org.apache.spark.executor.YarnCoarseGrainedExecutorBackend
-CoarseGrainedExecutorBackend.run
def run(
arguments: Arguments,
backendCreateFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) =>
CoarseGrainedExecutorBackend): Unit = {
... ...
//可以找到driver 與driver通信
val fetcher = RpcEnv.create(
"driverPropsFetcher",
arguments.bindAddress,
arguments.hostname,
-1,
executorConf,
new SecurityManager(executorConf),
numUsableCores = 0,
clientMode = true)
var driver: RpcEndpointRef = null
val nTries = 3
for (i <- 0 until nTries if driver == null) {
try {
driver = fetcher.setupEndpointRefByURI(arguments.driverUrl)
} catch {
case e: Throwable => if (i == nTries - 1) {
throw e
}
}
}
//創(chuàng)建Executor的evn環(huán)境
val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,
arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)
//設(shè)置rpcEnv環(huán)境的通信終端
backendCreateFn 其實就是YarnCoarseGrainedExecutorBackend
env.rpcEnv.setupEndpoint("Executor",
backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile))
arguments.workerUrl.foreach { url =>
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
}
env.rpcEnv.awaitTermination()
}
}
//消息循環(huán)器
var messageLoop: MessageLoop = null
try {
messageLoop = endpoint match {
case e: IsolatedRpcEndpoint =>
//匹配消息循環(huán)器類型
new DedicatedMessageLoop(name, e, this)
case _ =>
sharedLoop.register(name, endpoint)
sharedLoop
}
endpoints.put(name, messageLoop)
} catch {
case NonFatal(e) =>
endpointRefs.remove(endpoint)
throw e
}
-DedicatedMessageLoop
//收件箱
private val inbox = new Inbox(name, endpoint)
-Inbox
protected val messages = new java.util.LinkedListInboxMessage
// OnStart should be the first message to process
//放入一個OnStart消息
inbox.synchronized {
messages.add(OnStart)
}
//RpcEndpoint的生命周期
* {@code constructor -> onStart -> receive* -> onStop}
private[spark] trait RpcEndpoint {
CoarseGrainedExecutorBackend.onStart
override def onStart(): Unit = {
... ...
logInfo("Connecting to driver: " + driverUrl)
try {
_resources = parseOrFindResources(resourcesFileOpt)
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
// This is a very fast action so we can use "ThreadUtils.sameThread"
//獲取driver
driver = Some(ref)
//像driver發(fā)送RegisterExecutor消息
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,
extractAttributes, _resources, resourceProfile.id))
}(ThreadUtils.sameThread).onComplete {
case Success(_) =>
self.send(RegisteredExecutor)
case Failure(e) =>
exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
}(ThreadUtils.sameThread)
}
driver是一個線程,所有是SparkContext接收消息
_schedulerBackend是driver通信后臺
private var _schedulerBackend: SchedulerBackend = _
//集群模式的SchedulerBackend
CoarseGrainedSchedulerBackend
//消息回復(fù)
CoarseGrainedSchedulerBackend.receiveAndReply
匹配RegisterExecutor消息
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls,
attributes, resources, resourceProfileId) =>
總的核數(shù) 注冊數(shù)量增加
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
//最后回復(fù)一個true表示注冊成功
context.reply(true)
CoarseGrainedExecutorBackend.收到消息
case Success(_) =>
//給自己發(fā)送一條消息,表示注冊完畢
self.send(RegisteredExecutor)
CoarseGrainedExecutorBackend.收到給自己發(fā)送的RegisteredExecutor消息
override def receive: PartialFunction[Any, Unit] = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
try {
//創(chuàng)建Executor計算對象,區(qū)別CoarseGrainedExecutorBackend(通信對象)
executor = new (executorId, hostname, env, userClassPath, isLocal = false,
resources = _resources)
//給driver發(fā)送LaunchedExecutor 消息
driver.get.send(LaunchedExecutor(executorId))
driver端的CoarseGrainedSchedulerBackend receive到
LaunchedExecutor消息
case LaunchedExecutor(executorId) =>
//增加核數(shù)
executorDataMap.get(executorId).foreach { data =>
data.freeCores = data.totalCores
}
//做一些操作 //tode
makeOffers(executorId)
case e =>
logError(s"Received unexpected message. ${e}")