Flink中-C參數(shù)與DistributedCache使用

-C 參數(shù)

flink命令下有這樣的一個參數(shù)選項-C或者--classpath,含義解釋:

Adds a URL to each user code classloader  on all nodes in the cluster. The paths must specify a protocol (e.g. file://) and be accessible on all nodes (e.g. by means of a NFS share). You can use this option multiple times for specifying more than one URL. The protocol must be supported by the {@link java.net.URLClassLoader}.

含義就是:給一個user classloader添加一個url,但是這個url 必須能夠被集群的所有的節(jié)點都能夠訪問到。該classloader指的是FlinkUserCodeClassLoaders,在任務(wù)啟動的過程中會使用該loader加載,具體使用是在StreamTask.invoke中初始化OperatorChain中,在OperatorChain初始化時,會從字節(jié)碼中反序列化一個operator的header operator,在這個加載過程中會使用FlinkUserCodeClassLoaders進行加載,

final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
final StreamConfig configuration = containingTask.getConfiguration();
headOperator = configuration.getStreamOperator(userCodeClassloader);

FlinkUserCodeClassLoaders分ParentFirstClassLoader與ChildFirstClassLoader兩類,默認使用ChildFirstClassLoader,表示在加載過程中會優(yōu)先從給定的url中加載類。

StreamOperator面向用戶調(diào)用的就是UserFunction,如果我們的自定義Function中有一些比較通用的包,有很多Flink任務(wù)都會使用到,那么我們就可以使用-C 來指定包的路徑,前提是集群的每個node都可以訪問到(file://),通過這種方式程序在打包的時候就不需要將這些通用的包打進去。

DistributedCache

DistributedCache正如其含義分布式緩存,其功能與spark的廣播變量類似,僅僅只會在一個TaskExecutor中維護一份該數(shù)據(jù),用法:

 //注冊
 env.registerCachedFile("file:/1.log","file1")

在userFunction中:

val file=getRuntimeContext.getDistributedCache.getFile("file1")

在ha模式下的工作機制: 文件會被上傳到high-availability.storageDir指定的目錄下(一般是hdfs),在任務(wù)啟動過程中,會啟動一個后臺線程從hdfs拉取文件到本地可提供訪問。

在非ha默認下的工作機制:文件存儲在jobmaster節(jié)點下的工作路徑中,在任務(wù)啟動過程中從jobmaster中拉去文件到taskExecutor本地路徑下。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容