1. 背景
在開發(fā)大數(shù)據(jù)平臺XSailboat中的查看Flink任務的狀態(tài)數(shù)據(jù)工具時,用State Process API解析保存點數(shù)據(jù),將其從HDFS上讀取出來再將其解析過后下沉到HDFS以CSV格式保存,然后由其它接口提供對這個文件的分頁加載功能。
以CSV格式下沉到HDFS,筆者直接使用了DataStream上已經廢棄的writeAsCsv方法,因為這個方法的特性正好和此處的需求相符,沒有使用FileSink,因為它的Bucket特性,在此處不適用。
2. 問題
String checkPointUrl = "http://yc/a/b/c.csv" ;
SavepointReader.read(env, checkPointUrl , new HashMapStateBackend()) ;
在連接hdfs的過程中,出現(xiàn)了
java.lang.IllegalArgumentException: java.net.UnknownHostException: yc
at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:465)
at org.apache.hadoop.hdfs.NameNodeProxiesClient.createProxyWithClientProtocol(NameNodeProxiesClient.java:139)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:357)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:291)
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:173)
at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:168)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:528)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
at org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(AbstractFsCheckpointStorageAccess.java:257)
at org.apache.flink.state.api.runtime.SavepointLoader.loadSavepointMetadata(SavepointLoader.java:50)
at org.apache.flink.state.api.SavepointReader.read(SavepointReader.java:101)
...
3. 解決辦法
yc這個集群名稱是在Hadoop的配置文件hdfs-site.xml的。
<property>
<name>dfs.nameservices</name>
<value>yc</value>
</property>
<!-- yc包含兩個NameNode,分別是nn1,nn2 -->
<property>
<name>dfs.ha.namenodes.yc</name>
<value>nn1,nn2</value>
</property>
<!-- nn1的RPC通信地址 -->
<property>
<name>dfs.namenode.rpc-address.yc.nn1</name>
<value>XCloud150:9000</value>
</property>
<!-- nn1的http通信地址 -->
<property>
<name>dfs.namenode.http-address.yc.nn1</name>
<value>XCloud150:50070</value>
</property>
<!-- nn2的RPC通信地址 -->
<property>
<name>dfs.namenode.rpc-address.yc.nn2</name>
<value>XCloud151:9000</value>
</property>
<!-- nn2的http通信地址 -->
<property>
<name>dfs.namenode.http-address.yc.nn2</name>
<value>XCloud151:50070</value>
</property>
經過代碼跟蹤分析得知,F(xiàn)link使用org.apache.flink.runtime.util.HadoopUtils的getHadoopConfiguration方法得到Hadoop的Configuration(org.apache.hadoop.conf.Configuration)。
現(xiàn)摘錄其中的代碼:
public static Configuration getHadoopConfiguration(
org.apache.flink.configuration.Configuration flinkConfiguration) {
// Instantiate an HdfsConfiguration to load the hdfs-site.xml and hdfs-default.xml
// from the classpath
Configuration result = new HdfsConfiguration();
boolean foundHadoopConfiguration = false;
// We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and
// the hdfs configuration.
// The properties of a newly added resource will override the ones in previous resources, so
// a configuration
// file with higher priority should be added later.
// Approach 1: HADOOP_HOME environment variables
String[] possibleHadoopConfPaths = new String[2];
// #############################################################
// 1. 從系統(tǒng)環(huán)境變量HADOOP_HOME目錄下去尋找
// 因hadoop的這些配置文件已經放置在XSailboat的配置目錄config/MicroService/common目錄下了,此種方式不合適,不采用
final String hadoopHome = System.getenv("HADOOP_HOME");
if (hadoopHome != null) {
LOG.debug("Searching Hadoop configuration files in HADOOP_HOME: {}", hadoopHome);
possibleHadoopConfPaths[0] = hadoopHome + "/conf";
possibleHadoopConfPaths[1] = hadoopHome + "/etc/hadoop"; // hadoop 2.2
}
for (String possibleHadoopConfPath : possibleHadoopConfPaths) {
if (possibleHadoopConfPath != null) {
foundHadoopConfiguration = addHadoopConfIfFound(result, possibleHadoopConfPath);
}
}
// Approach 2: Flink configuration (deprecated)
// #############################################################
// 2.通過HDFS_DEFAULT_CONFIG配置項指定配置文件所在位置,這個參數(shù)已經標注deprecated,不是首選
final String hdfsDefaultPath =
flinkConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null);
if (hdfsDefaultPath != null) {
result.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));
LOG.debug("Using hdfs-default configuration-file path from Flink config: {}"
,hdfsDefaultPath);
foundHadoopConfiguration = true;
}
// #############################################################
// 3.通過HDFS_SITE_CONFIG配置項指定配置文件所在位置,這個參數(shù)已經標注deprecated,不是首選
final String hdfsSitePath =
flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
if (hdfsSitePath != null) {
result.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));
LOG.debug(
"Using hdfs-site configuration-file path from Flink config: {}", hdfsSitePath);
foundHadoopConfiguration = true;
}
// #############################################################
// 4.通過PATH_HADOOP_CONFIG配置項指定配置文件所在位置,這個參數(shù)已經標注deprecated,不是首選
final String hadoopConfigPath =
flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
if (hadoopConfigPath != null) {
LOG.debug("Searching Hadoop configuration files in Flink config: {}", hadoopConfigPath);
foundHadoopConfiguration =
addHadoopConfIfFound(result, hadoopConfigPath) || foundHadoopConfiguration;
}
// Approach 3: HADOOP_CONF_DIR environment variable
// #############################################################
// 5. 環(huán)境變量只能在啟動參數(shù)上設置,我希望在程序運行期根據(jù)參數(shù)目錄位置設置,以在部署過程中減少參數(shù)配置
String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
if (hadoopConfDir != null) {
LOG.debug("Searching Hadoop configuration files in HADOOP_CONF_DIR: {}", hadoopConfDir);
foundHadoopConfiguration =
addHadoopConfIfFound(result, hadoopConfDir) || foundHadoopConfiguration;
}
// Approach 4: Flink configuration
// add all configuration key with prefix 'flink.hadoop.' in flink conf to hadoop conf
// #############################################################
// 6.在flink配置里面配置,在原先hadoop的配置項名稱前面加上flink.hadoop.
for (String key : flinkConfiguration.keySet()) {
for (String prefix : FLINK_CONFIG_PREFIXES) {
if (key.startsWith(prefix)) {
String newKey = key.substring(prefix.length());
String value = flinkConfiguration.getString(key, null);
result.set(newKey, value);
LOG.debug(
"Adding Flink config entry for {} as {}={} to Hadoop config",
key,
newKey,
value);
foundHadoopConfiguration = true;
}
}
}
if (!foundHadoopConfiguration) {
LOG.warn(
"Could not find Hadoop configuration via any of the supported methods "
+ "(Flink configuration, environment variables).");
}
return result;
}
使用已經廢棄的參數(shù)ConfigConstants.PATH_HADOOP_CONFIG嘗試了一下,發(fā)現(xiàn)并沒有起什么作用,跟蹤代碼發(fā)現(xiàn)執(zhí)行了下面的代碼:
類:org.apache.flink.core.fs.FileSystem
// getUnguardedFileSystem方法內
// this "default" initialization makes sure that the FileSystem class works
// even when not configured with an explicit Flink configuration, like on
// JobManager or TaskManager setup
if (FS_FACTORIES.isEmpty()) {
initializeWithoutPlugins(new Configuration());
}
// 配置類是new 出來的,所以外面的配置是進不去的。
處理方法,自己主動調用一下這個初始化方法
if(!AppContext.get("Flink_FS_Init", false))
{
org.apache.flink.configuration.Configuration flinkHdfsConf = new org.apache.flink.configuration.Configuration() ;
flinkHdfsConf.setString(ConfigConstants.PATH_HADOOP_CONFIG , MSApp.instance().getAppPaths().getCommonConfigDir().getAbsolutePath()) ;
org.apache.flink.core.fs.FileSystem.initialize(flinkHdfsConf , null);
AppContext.get("Flink_FS_Init", true) ;
}