-C 參數(shù)
flink命令下有這樣的一個參數(shù)選項-C或者--classpath,含義解釋:
Adds a URL to each user code classloader on all nodes in the cluster. The paths must specify a protocol (e.g. file://) and be accessible on all nodes (e.g. by means of a NFS share). You can use this option multiple times for specifying more than one URL. The protocol must be supported by the {@link java.net.URLClassLoader}.
含義就是:給一個user classloader添加一個url,但是這個url 必須能夠被集群的所有的節(jié)點都能夠訪問到。該classloader指的是FlinkUserCodeClassLoaders,在任務(wù)啟動的過程中會使用該loader加載,具體使用是在StreamTask.invoke中初始化OperatorChain中,在OperatorChain初始化時,會從字節(jié)碼中反序列化一個operator的header operator,在這個加載過程中會使用FlinkUserCodeClassLoaders進行加載,
final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
final StreamConfig configuration = containingTask.getConfiguration();
headOperator = configuration.getStreamOperator(userCodeClassloader);
FlinkUserCodeClassLoaders分ParentFirstClassLoader與ChildFirstClassLoader兩類,默認使用ChildFirstClassLoader,表示在加載過程中會優(yōu)先從給定的url中加載類。
StreamOperator面向用戶調(diào)用的就是UserFunction,如果我們的自定義Function中有一些比較通用的包,有很多Flink任務(wù)都會使用到,那么我們就可以使用-C 來指定包的路徑,前提是集群的每個node都可以訪問到(file://),通過這種方式程序在打包的時候就不需要將這些通用的包打進去。
DistributedCache
DistributedCache正如其含義分布式緩存,其功能與spark的廣播變量類似,僅僅只會在一個TaskExecutor中維護一份該數(shù)據(jù),用法:
//注冊
env.registerCachedFile("file:/1.log","file1")
在userFunction中:
val file=getRuntimeContext.getDistributedCache.getFile("file1")
在ha模式下的工作機制: 文件會被上傳到high-availability.storageDir指定的目錄下(一般是hdfs),在任務(wù)啟動過程中,會啟動一個后臺線程從hdfs拉取文件到本地可提供訪問。
在非ha默認下的工作機制:文件存儲在jobmaster節(jié)點下的工作路徑中,在任務(wù)啟動過程中從jobmaster中拉去文件到taskExecutor本地路徑下。