https://blog.csdn.net/u013332124/article/details/91456422
一、spark-submit腳本分析
spark-submit的腳本內(nèi)容很簡單:
# 如果沒設置SPARK_HOME的環(huán)境變量,調(diào)用find-spark-home文件尋找spark-home
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi
# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0
# 直接將所有參數(shù)傳遞給spark-class
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
最后又調(diào)用spark-class。其實不光spark-submit,幾乎所有的spark服務最終都是調(diào)用spark-class來啟動的。spark-class的代碼也不多:
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi
. "${SPARK_HOME}"/bin/load-spark-env.sh
# Find the java binary
if [ -n "${JAVA_HOME}" ]; then
RUNNER="${JAVA_HOME}/bin/java"
else
if [ "$(command -v java)" ]; then
RUNNER="java"
else
echo "JAVA_HOME is not set" >&2
exit 1
fi
fi
# Find Spark jars.
if [ -d "${SPARK_HOME}/jars" ]; then
SPARK_JARS_DIR="${SPARK_HOME}/jars"
else
SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"
fi
if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then
echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2
echo "You need to build Spark with the target \"package\" before running this program." 1>&2
exit 1
else
LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"
fi
# Add the launcher build dir to the classpath if requested.
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
fi
# For tests
if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
fi
# 調(diào)用Main類生成命令
build_command() {
"$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
printf "%d\0" $?
}
# Turn off posix mode since it does not allow process substitution
set +o posix
CMD=()
while IFS= read -d '' -r ARG; do
CMD+=("$ARG")
done < <(build_command "$@")
COUNT=${#CMD[@]}
LAST=$((COUNT - 1))
LAUNCHER_EXIT_CODE=${CMD[$LAST]}
# Certain JVM failures result in errors being printed to stdout (instead of stderr), which causes
# the code that parses the output of the launcher to get confused. In those cases, check if the
# exit code is an integer, and if it's not, handle it as a special error case.
if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then
echo "${CMD[@]}" | head -n-1 1>&2
exit 1
fi
if [ $LAUNCHER_EXIT_CODE != 0 ]; then
exit $LAUNCHER_EXIT_CODE
fi
CMD=("${CMD[@]:0:$LAST}")
exec "${CMD[@]}"
spark-class主要是將參數(shù)交給org.apache.spark.launcher.Main類執(zhí)行,然后獲取到一個新的命令,之后我們拿著這個命令執(zhí)行。
比如我們執(zhí)行下面的spark-submit語句:
spark-submit --queue up --deploy-mode cluster --master yarn --class org.apache.spark.examples.SparkPi /www/harbinger-spark/examples/jars/spark-examples_2.11-2.1.0.jar 10
經(jīng)過Main類解析后,就會變成下面的命令:
/www/jdk1.8.0_51/bin/java -cp /www/harbinger-spark/conf/:/www/harbinger-spark/jars/*:/www/harbinger-hadoop/etc/hadoop/ -Xmx52m org.apache.spark.deploy.SparkSubmit --master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi --queue up /www/harbinger-spark/examples/jars/spark-examples_2.11-2.1.0.jar 10
我們發(fā)現(xiàn),最終又繞回來了,還是通過java命令調(diào)用SparkSubmit類。
那么,為什么spark不直接運行SparkSubmit,而是繞了一大圈通過Main類解析獲得命令然后再運行呢?
二、Main類的作用
spark-submit的命令解析主要是經(jīng)過SparkSubmitCommandBuilder#buildSparkSubmitCommand()方法,我們可以看一下源碼:
private List<String> buildSparkSubmitCommand(Map<String, String> env)
throws IOException, IllegalArgumentException {
//加載配置文件的配置
Map<String, String> config = getEffectiveConfig();
boolean isClientMode = isClientMode(config);
//獲取用戶指定的classPath
String extraClassPath = isClientMode ? config.get(SparkLauncher.DRIVER_EXTRA_CLASSPATH) : null;
List<String> cmd = buildJavaCommand(extraClassPath);
// Take Thrift Server as daemon
if (isThriftServer(mainClass)) {
addOptionString(cmd, System.getenv("SPARK_DAEMON_JAVA_OPTS"));
}
addOptionString(cmd, System.getenv("SPARK_SUBMIT_OPTS"));
// We don't want the client to specify Xmx. These have to be set by their corresponding
// memory flag --driver-memory or configuration entry spark.driver.memory
String driverExtraJavaOptions = config.get(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS);
if (!isEmpty(driverExtraJavaOptions) && driverExtraJavaOptions.contains("Xmx")) {
String msg = String.format("Not allowed to specify max heap(Xmx) memory settings through " +
"java options (was %s). Use the corresponding --driver-memory or " +
"spark.driver.memory configuration instead.", driverExtraJavaOptions);
throw new IllegalArgumentException(msg);
}
if (isClientMode) {
String tsMemory =
isThriftServer(mainClass) ? System.getenv("SPARK_DAEMON_MEMORY") : null;
String memory = firstNonEmpty(tsMemory, config.get(SparkLauncher.DRIVER_MEMORY),
System.getenv("SPARK_DRIVER_MEMORY"), System.getenv("SPARK_MEM"), DEFAULT_MEM);
cmd.add("-Xmx" + memory);
addOptionString(cmd, driverExtraJavaOptions);
mergeEnvPathList(env, getLibPathEnvName(),
config.get(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH));
}
cmd.add("org.apache.spark.deploy.SparkSubmit");
cmd.addAll(buildSparkSubmitArgs());
return cmd;
}
主要做的事情其實就是讀取各種配置然后往命令中添加一些參數(shù)。也就是對命令進行加工。
其實添加參數(shù)這種事情直接在shell中也能做,但是這個過程需要讀取配置文件,shell可能做起來比較麻煩。另外其他服務也會經(jīng)過Main類進行加工,一些公共的代碼也可以抽象出來。所以,這個Main類主要用于對命令的加工和轉(zhuǎn)換。
一些spark服務,如果要修改一些服務的參數(shù),比如調(diào)整堆大小,就是在Main類中讀取相關(guān)的環(huán)境變量來設置的。比如SparkHistoryServer,Main類中會讀取環(huán)境變量SPARK_HISTORY_OPTS的值,然后在啟動SparkHistoryServer時加上去。其他的服務也類似。另外,環(huán)境變量可以在"${SPARK_HOME}"/bin/load-spark-env.sh中設置,spark-class中會加載這個文件的配置。
三、SparkSubmit類提交任務的過程
SparkSubmit做的事情就是提交任務運行。我們這里討論一下yarn模式的任務提交。
整個任務提交流程也比較好理解,主要就是收集ApplicationMaster的上下文,比如ApplicationMaster的啟動命令、資源文件、環(huán)境變量等,然后和yarn建立連接,通過yarnClient提交ApplicationMaster到y(tǒng)arn上運行。之后,不斷向yarn輪詢?nèi)蝿盏臓顟B(tài)直到任務運行結(jié)束。
因為整個過程代碼比較多,我們挑一些關(guān)鍵點進行分析。
如何和ResourceManger建立連接
在yarn的模式下,spark會去讀取環(huán)境變量HADOOP_CONF_DIR或者YARN_CONF_DIR目錄下的配置文件,如果這兩個環(huán)境變量都沒找到,運行spark-submit命令時就會報錯:
Exception in thread "main" java.lang.Exception: When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.
at org.apache.spark.deploy.SparkSubmitArguments.validateSubmitArguments(SparkSubmitArguments.scala:256)
at org.apache.spark.deploy.SparkSubmitArguments.validateArguments(SparkSubmitArguments.scala:233)
at org.apache.spark.deploy.SparkSubmitArguments.<init>(SparkSubmitArguments.scala:110)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
spark主要是為了讀取該目錄下的3個文件:core-site.xml、yarn-site.xml、hdfs-site.xml。
其中core-site.xml是hadoop的核心配置。讀取yarn-site.xml配置主要是為了獲取ResourceManger的地址,之后就可以通過rpc建立連接。而讀取hdfs-site.xml主要是要上傳需要資源文件到hdfs用。
所以,運行spark-submit其實并不需要整個hadoop安裝包,只需要將這三個配置文件放好然后設置一下HADOOP_CONF_DIR或者YARN_CONF_DIR環(huán)境變量即可。
提交任務到y(tǒng)arn的相關(guān)代碼在spark源碼的resource-managers/yarn目錄下。在使用maven編譯時,需要帶上 -Pyarn 才會將這些代碼打包進去
spark任務配置的優(yōu)先級
在spark中,有三種方式可以設置參數(shù),這三種方法的優(yōu)先級從低到高依次是:
- 在 spark_default.conf 文件中配置
- 執(zhí)行spark-submit 時通過參數(shù)指定配置
- 在代碼中直接通過SparkConf的方法設置參數(shù)
比如我們在 spark_default.conf 中設置了spark.executor.cores = 1,但是在spark-submit時又指定了--executor-cores 2,這時真正的executor的core數(shù)量就是2,spark_default.conf 中的配置被覆蓋。
但是也有一些情況,可能只會用到spark_default.conf 文件中的配置或者spark-submit的參數(shù)配置。在代碼中設置是沒用的,比如在client模式下,spark.driver.extraClassPath這參數(shù)必須在啟動Driver的時候立馬設置,這時通過SparkConf設置等于沒設置。
還有一種情況,我們在spark-submit中設置appName為"a",但是在SparkConf中又設置了appname為"b"。這時我們?nèi)arn的頁面就會發(fā)現(xiàn)這個app的name還是"a",不會被覆蓋。去SparkHisotryServer中這個app的name就是"b"。這個主要是因為spark向yarn提交任務時Driver還未運行,此時獲取到的spark.app.name還是spark-submit設置的"a"。到了真正執(zhí)行,spark.app.name配置就變成"b"了。
所以,雖然大多數(shù)的配置優(yōu)先級是那樣,但是如果我們發(fā)現(xiàn)哪個配置沒生效,還是需要具體情況具體分析的。
spark尋找spark_default.conf文件的過程主要是先讀取SPARK_CONF_DIR環(huán)境變量,然后讀取 目錄下面的spark_default.conf文件。獲取SPARK_CONF_DIR沒設置,就讀取SPARK_HOME/conf目錄下的配置文件。這時如果SPARK_HOME環(huán)境變量也沒設置,就會報錯
client模式的真正運行方式
spark提交請求的Application上下文中有一個command參數(shù),也就是告訴yarn怎么啟動ApplicationMaster。我們發(fā)現(xiàn)在cluster模式下,啟動的ApplicationMaster是org.apache.spark.deploy.yarn.ApplicationMaster類,而在client模式下,啟動的ApplicationMaster是org.apache.spark.deploy.yarn.ExecutorLauncher。
其實ExecutorLauncher的main方法還是直接調(diào)用ApplicationMaster的main方法。之后在ApplicationMaster#run()方法中,如果是client模式,會去連接運行的客戶端機器上的Driver。之后做的事就是根據(jù)Driver的命令(也就是rpc請求)申請或者釋放Container資源了。
之前經(jīng)常以為client模式下,Driver就是ApplicationMaster,只是AppcationMaster運行在客戶端服務器上而已。但是實際并不是這樣。client模式下,Driver運行在客戶端上,ApplicationMaster還是運行在yarn的Container中,只是這時這個ApplicationMaster只負責進行資源的調(diào)度而已。