開篇
?最早接觸DataX是在前阿里同事在現(xiàn)在的公司引入的時候提到的,一直想抽空好好看看這部分代碼,因為DataX的代碼框架設計的很好,非常適合二次開發(fā)。在熟悉DataX的代碼過程中,沒有時間針對每個數(shù)據(jù)源的讀寫部分代碼進行研究(這部分代碼非常值得研究,基本上主流數(shù)據(jù)源的讀寫操作都能看到),主要閱讀的還是DataX的啟動和工作部分代碼。
?DataX的框架的核心部分我個人看來就兩大塊,一塊是配置貫穿DataX,all in configuration,將配置的json用到了極致;另一塊是通過URLClassLoader實現(xiàn)插件的熱加載。
DataX的github地址:https://github.com/alibaba/DataX
DataX介紹
?DataX 是阿里巴巴集團內被廣泛使用的離線數(shù)據(jù)同步工具/平臺,實現(xiàn)包括 MySQL、SQL Server、Oracle、PostgreSQL、HDFS、Hive、HBase、OTS、ODPS 等各種異構數(shù)據(jù)源之間高效的數(shù)據(jù)同步功能。
?DataX本身作為數(shù)據(jù)同步框架,將不同數(shù)據(jù)源的同步抽象為從源頭數(shù)據(jù)源讀取數(shù)據(jù)的Reader插件,以及向目標端寫入數(shù)據(jù)的Writer插件,理論上DataX框架可以支持任意數(shù)據(jù)源類型的數(shù)據(jù)同步工作。同時DataX插件體系作為一套生態(tài)系統(tǒng), 每接入一套新數(shù)據(jù)源該新加入的數(shù)據(jù)源即可實現(xiàn)和現(xiàn)有的數(shù)據(jù)源互通。
Job&Task概念
?在DataX的邏輯模型中包括job、task兩個維度,通過將job進行task拆分,然后將task合并到taskGroup進行運行。
job實例運行在jobContainer容器中,它是所有任務的master,負責初始化、拆分、調度、運行、回收、監(jiān)控和匯報,但它并不做實際的數(shù)據(jù)同步操作。
Job: Job是DataX用以描述從一個源頭到一個目的端的同步作業(yè),是DataX數(shù)據(jù)同步的最小業(yè)務單元。比如:從一張mysql的表同步到odps的一個表的特定分區(qū)。
Task: Task是為最大化而把Job拆分得到的最小執(zhí)行單元。比如:讀一張有1024個分表的mysql分庫分表的Job,拆分成1024個讀Task,用若干個并發(fā)執(zhí)行。
TaskGroup: 描述的是一組Task集合。在同一個TaskGroupContainer執(zhí)行下的Task集合稱之為TaskGroup。
JobContainer: Job執(zhí)行器,負責Job全局拆分、調度、前置語句和后置語句等工作的工作單元。類似Yarn中的JobTracker。
TaskGroupContainer: TaskGroup執(zhí)行器,負責執(zhí)行一組Task的工作單元,類似Yarn中的TaskTracker。
簡而言之, Job拆分成Task,在分別在框架提供的容器中執(zhí)行,插件只需要實現(xiàn)Job和Task兩部分邏輯。
啟動過程

說明:
- 上圖中,黃色表示
Job部分的執(zhí)行階段,藍色表示Task部分的執(zhí)行階段,綠色表示框架執(zhí)行階段。

說明:
- reader和writer的自定義插件內部需要實現(xiàn)job和task的接口即可
DataX開啟Debug
?閱讀源碼的最好方法是debug整個項目工程,在如何調試DataX項目的過程中還是花費了一些精力在里面的,現(xiàn)在一并共享出來供有興趣的程序員一并研究。
?整個debug過程需要按照下列步驟進行:
- 1、github上下載DataX的源碼并通過以下命令進行編譯,github官網(wǎng)有編譯命令,如果遇到依賴包無法下載可以省去部分writer或reader插件,不影響debug。
(1)、下載DataX源碼:
$ git clone git@github.com:alibaba/DataX.git
(2)、通過maven打包:
$ cd {DataX_source_code_home}
$ mvn -U clean package assembly:assembly -Dmaven.test.skip=true
打包成功,日志顯示如下:
[INFO] BUILD SUCCESS
[INFO] -----------------------------------------------------------------
[INFO] Total time: 08:12 min
[INFO] Finished at: 2015-12-13T16:26:48+08:00
[INFO] Final Memory: 133M/960M
[INFO] -----------------------------------------------------------------
打包成功后的DataX包位于 {DataX_source_code_home}/target/datax/datax/ ,結構如下:
$ cd {DataX_source_code_home}
$ ls ./target/datax/datax/
bin conf job lib log log_perf plugin script tmp
- 2、由于DataX是通過python腳本進行啟動的,所以在python腳本中把啟動參數(shù)打印出來,核心在于print startCommand這句,繼而我們就能夠獲取啟動命令參數(shù)了。
if __name__ == "__main__":
printCopyright()
parser = getOptionParser()
options, args = parser.parse_args(sys.argv[1:])
if options.reader is not None and options.writer is not None:
generateJobConfigTemplate(options.reader,options.writer)
sys.exit(RET_STATE['OK'])
if len(args) != 1:
parser.print_help()
sys.exit(RET_STATE['FAIL'])
startCommand = buildStartCommand(options, args)
print startCommand
child_process = subprocess.Popen(startCommand, shell=True)
register_signal()
(stdout, stderr) = child_process.communicate()
sys.exit(child_process.returncode)
- 3、獲取啟動DataX的啟動命令
java -server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=//Users/lebron374/Documents/github/DataX/target/datax/datax/log
-Dloglevel=info -Dfile.encoding=UTF-8
-Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener
-Djava.security.egd=file:///dev/urandom -Ddatax.home=//Users/lebron374/Documents/github/DataX/target/datax/datax
-Dlogback.configurationFile=//Users/lebron374/Documents/github/DataX/target/datax/datax/conf/logback.xml
-classpath //Users/lebron374/Documents/github/DataX/target/datax/datax/lib/*:.
-Dlog.file.name=s_datax_job_job_json
com.alibaba.datax.core.Engine
-mode standalone -jobid -1
-job //Users/lebron374/Documents/github/DataX/target/datax/datax/job/job.json
- 4、配置Idea啟動腳本

以下配置在VM options當中
-server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=//Users/lebron374/Documents/github/DataX/target/datax/datax/log
-Dloglevel=info -Dfile.encoding=UTF-8
-Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener
-Djava.security.egd=file:///dev/urandom -Ddatax.home=//Users/lebron374/Documents/github/DataX/target/datax/datax
-Dlogback.configurationFile=//Users/lebron374/Documents/github/DataX/target/datax/datax/conf/logback.xml
-classpath //Users/lebron374/Documents/github/DataX/target/datax/datax/lib/*:.
-Dlog.file.name=s_datax_job_job_json
com.alibaba.datax.core.Engine
以下配置在Program arguments當中
-mode standalone -jobid -1
-job //Users/lebron374/Documents/github/DataX/target/datax/datax/job/job.json
啟動步驟解析
1、解析配置,包括job.json、core.json、plugin.json三個配置
2、設置jobId到configuration當中
3、啟動Engine,通過Engine.start()進入啟動程序
4、設置RUNTIME_MODE奧configuration當中
5、通過JobContainer的start()方法啟動
6、依次執(zhí)行job的preHandler()、init()、prepare()、split()、schedule()、- post()、postHandle()等方法。
7、init()方法涉及到根據(jù)configuration來初始化reader和writer插件,這里涉及到jar包熱加載以及調用插件init()操作方法,同時設置reader和writer的configuration信息
8、prepare()方法涉及到初始化reader和writer插件的初始化,通過調用插件的prepare()方法實現(xiàn),每個插件都有自己的jarLoader,通過集成URLClassloader實現(xiàn)而來
9、split()方法通過adjustChannelNumber()方法調整channel個數(shù),同時執(zhí)行reader和writer最細粒度的切分,需要注意的是,writer的切分結果要參照reader的切分結果,達到切分后數(shù)目相等,才能滿足1:1的通道模型
10、channel的計數(shù)主要是根據(jù)byte和record的限速來實現(xiàn)的,在split()的函數(shù)中第一步就是計算channel的大小
11、split()方法reader插件會根據(jù)channel的值進行拆分,但是有些reader插件可能不會參考channel的值,writer插件會完全根據(jù)reader的插件1:1進行返回
12、split()方法內部的mergeReaderAndWriterTaskConfigs()負責合并reader、writer、以及transformer三者關系,生成task的配置,并且重寫job.content的配置
13、schedule()方法根據(jù)split()拆分生成的task配置分配生成taskGroup對象,根據(jù)task的數(shù)量和單個taskGroup支持的task數(shù)量進行配置,兩者相除就可以得出taskGroup的數(shù)量
14、schdule()內部通過AbstractScheduler的schedule()執(zhí)行,繼續(xù)執(zhí)行startAllTaskGroup()方法創(chuàng)建所有的TaskGroupContainer組織相關的task,TaskGroupContainerRunner負責運行TaskGroupContainer執(zhí)行分配的task。
15、taskGroupContainerExecutorService啟動固定的線程池用以執(zhí)行TaskGroupContainerRunner對象,TaskGroupContainerRunner的run()方法調用taskGroupContainer.start()方法,針對每個channel創(chuàng)建一個TaskExecutor,通過taskExecutor.doStart()啟動任務
啟動過程源碼分析
入口main函數(shù)
public class Engine {
public static void main(String[] args) throws Exception {
int exitCode = 0;
try {
Engine.entry(args);
} catch (Throwable e) {
System.exit(exitCode);
}
}
public static void entry(final String[] args) throws Throwable {
// 省略相關參數(shù)的解析代碼
// 獲取job的配置路徑信息
String jobPath = cl.getOptionValue("job");
// 如果用戶沒有明確指定jobid, 則 datax.py 會指定 jobid 默認值為-1
String jobIdString = cl.getOptionValue("jobid");
RUNTIME_MODE = cl.getOptionValue("mode");
// 解析配置信息
Configuration configuration = ConfigParser.parse(jobPath);
// 省略相關代碼
boolean isStandAloneMode = "standalone".equalsIgnoreCase(RUNTIME_MODE);
configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, jobId);
// 根據(jù)配置啟動參數(shù)
Engine engine = new Engine();
engine.start(configuration);
}
}
說明:
main函數(shù)主要做兩件事情,分別是:
- 1、解析job相關配置生成configuration。
- 2、依據(jù)配置啟動Engine。
configuration解析過程
public final class ConfigParser {
private static final Logger LOG = LoggerFactory.getLogger(ConfigParser.class);
/**
* 指定Job配置路徑,ConfigParser會解析Job、Plugin、Core全部信息,并以Configuration返回
*/
public static Configuration parse(final String jobPath) {
// 加載任務的指定的配置文件,這個配置是有固定的json的固定模板格式的
Configuration configuration = ConfigParser.parseJobConfig(jobPath);
// 合并conf/core.json的配置文件
configuration.merge(
ConfigParser.parseCoreConfig(CoreConstant.DATAX_CONF_PATH),
false);
// todo config優(yōu)化,只捕獲需要的plugin
// 固定的節(jié)點路徑 job.content[0].reader.name
String readerPluginName = configuration.getString(
CoreConstant.DATAX_JOB_CONTENT_READER_NAME);
// 固定的節(jié)點路徑 job.content[0].writer.name
String writerPluginName = configuration.getString(
CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME);
// 固定的節(jié)點路徑 job.preHandler.pluginName
String preHandlerName = configuration.getString(
CoreConstant.DATAX_JOB_PREHANDLER_PLUGINNAME);
// 固定的節(jié)點路徑 job.postHandler.pluginName
String postHandlerName = configuration.getString(
CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINNAME);
// 添加讀寫插件的列表待加載
Set<String> pluginList = new HashSet<String>();
pluginList.add(readerPluginName);
pluginList.add(writerPluginName);
if(StringUtils.isNotEmpty(preHandlerName)) {
pluginList.add(preHandlerName);
}
if(StringUtils.isNotEmpty(postHandlerName)) {
pluginList.add(postHandlerName);
}
try {
// parsePluginConfig(new ArrayList<String>(pluginList))加載指定的插件的配置信息,并且和全局的配置文件進行合并
configuration.merge(parsePluginConfig(new ArrayList<String>(pluginList)), false);
}catch (Exception e){
}
// configuration整合了三方的配置,包括 任務配置、core核心配置、指定插件的配置。
return configuration;
}
// 在指定的reader和writer目錄獲取指定的插件并解析其配置
public static Configuration parsePluginConfig(List<String> wantPluginNames) {
// 創(chuàng)建一個空的配置信息對象
Configuration configuration = Configuration.newDefault();
Set<String> replicaCheckPluginSet = new HashSet<String>();
int complete = 0;
// 所有的reader在/plugin/reader目錄,遍歷獲取所有reader的目錄
// 獲取待加載插件的配資信息,并合并到上面創(chuàng)建的空配置對象
// //Users/lebron374/Documents/github/DataX/target/datax/datax/plugin/reader
for (final String each : ConfigParser
.getDirAsList(CoreConstant.DATAX_PLUGIN_READER_HOME)) {
// 解析單個reader目錄,eachReaderConfig保存的是key是plugin.reader.pluginname,value是對應的plugin.json內容
Configuration eachReaderConfig = ConfigParser.parseOnePluginConfig(each, "reader", replicaCheckPluginSet, wantPluginNames);
if(eachReaderConfig!=null) {
// 采用覆蓋式的合并
configuration.merge(eachReaderConfig, true);
complete += 1;
}
}
// //Users/lebron374/Documents/github/DataX/target/datax/datax/plugin/writer
for (final String each : ConfigParser
.getDirAsList(CoreConstant.DATAX_PLUGIN_WRITER_HOME)) {
Configuration eachWriterConfig = ConfigParser.parseOnePluginConfig(each, "writer", replicaCheckPluginSet, wantPluginNames);
if(eachWriterConfig!=null) {
configuration.merge(eachWriterConfig, true);
complete += 1;
}
}
if (wantPluginNames != null && wantPluginNames.size() > 0 && wantPluginNames.size() != complete) {
throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_INIT_ERROR, "插件加載失敗,未完成指定插件加載:" + wantPluginNames);
}
return configuration;
}
}
說明:
configuration解析包括三部分的配置解析合并解析結果并返回,分別是:
- 1、解析job的配置信息,由啟動參數(shù)指定job.json文件。
- 2、解析DataX自帶配置信息,由默認指定的core.json文件。
- 3、解析讀寫插件配置信息,由job.json指定的reader和writer插件信息
configuration配置信息
job.json的configuration
{
"job": {
"setting": {
"speed": {
"byte":10485760,
"record":1000
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column" : [
{
"value": "DataX",
"type": "string"
},
{
"value": 19890604,
"type": "long"
},
{
"value": "1989-06-04 00:00:00",
"type": "date"
},
{
"value": true,
"type": "bool"
},
{
"value": "test",
"type": "bytes"
}
],
"sliceRecordCount": 100000
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": false,
"encoding": "UTF-8"
}
}
}
]
}
}
core.json的configuration
{
"entry": {
"jvm": "-Xms1G -Xmx1G",
"environment": {}
},
"common": {
"column": {
"datetimeFormat": "yyyy-MM-dd HH:mm:ss",
"timeFormat": "HH:mm:ss",
"dateFormat": "yyyy-MM-dd",
"extraFormats":["yyyyMMdd"],
"timeZone": "GMT+8",
"encoding": "utf-8"
}
},
"core": {
"dataXServer": {
"address": "http://localhost:7001/api",
"timeout": 10000,
"reportDataxLog": false,
"reportPerfLog": false
},
"transport": {
"channel": {
"class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel",
"speed": {
"byte": 100,
"record": 10
},
"flowControlInterval": 20,
"capacity": 512,
"byteCapacity": 67108864
},
"exchanger": {
"class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger",
"bufferSize": 32
}
},
"container": {
"job": {
"reportInterval": 10000
},
"taskGroup": {
"channel": 5
},
"trace": {
"enable": "false"
}
},
"statistics": {
"collector": {
"plugin": {
"taskClass": "com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector",
"maxDirtyNumber": 10
}
}
}
}
}
plugin.json的configuration
{
"name": "streamreader",
"class": "com.alibaba.datax.plugin.reader.streamreader.StreamReader",
"description": {
"useScene": "only for developer test.",
"mechanism": "use datax framework to transport data from stream.",
"warn": "Never use it in your real job."
},
"developer": "alibaba"
}
{
"name": "streamwriter",
"class": "com.alibaba.datax.plugin.writer.streamwriter.StreamWriter",
"description": {
"useScene": "only for developer test.",
"mechanism": "use datax framework to transport data to stream.",
"warn": "Never use it in your real job."
},
"developer": "alibaba"
}
合并后的configuration
{
"common": {
"column": {
"dateFormat": "yyyy-MM-dd",
"datetimeFormat": "yyyy-MM-dd HH:mm:ss",
"encoding": "utf-8",
"extraFormats": ["yyyyMMdd"],
"timeFormat": "HH:mm:ss",
"timeZone": "GMT+8"
}
},
"core": {
"container": {
"job": {
"id": -1,
"reportInterval": 10000
},
"taskGroup": {
"channel": 5
},
"trace": {
"enable": "false"
}
},
"dataXServer": {
"address": "http://localhost:7001/api",
"reportDataxLog": false,
"reportPerfLog": false,
"timeout": 10000
},
"statistics": {
"collector": {
"plugin": {
"maxDirtyNumber": 10,
"taskClass": "com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector"
}
}
},
"transport": {
"channel": {
"byteCapacity": 67108864,
"capacity": 512,
"class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel",
"flowControlInterval": 20,
"speed": {
"byte": -1,
"record": -1
}
},
"exchanger": {
"bufferSize": 32,
"class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger"
}
}
},
"entry": {
"jvm": "-Xms1G -Xmx1G"
},
"job": {
"content": [{
"reader": {
"name": "streamreader",
"parameter": {
"column": [{
"type": "string",
"value": "DataX"
}, {
"type": "long",
"value": 19890604
}, {
"type": "date",
"value": "1989-06-04 00:00:00"
}, {
"type": "bool",
"value": true
}, {
"type": "bytes",
"value": "test"
}],
"sliceRecordCount": 100000
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": false
}
}
}],
"setting": {
"errorLimit": {
"percentage": 0.02,
"record": 0
},
"speed": {
"byte": 10485760
}
}
},
"plugin": {
"reader": {
"streamreader": {
"class": "com.alibaba.datax.plugin.reader.streamreader.StreamReader",
"description": {
"mechanism": "use datax framework to transport data from stream.",
"useScene": "only for developer test.",
"warn": "Never use it in your real job."
},
"developer": "alibaba",
"name": "streamreader",
"path": "http://Users/lebron374/Documents/github/DataX/target/datax/datax/plugin/reader/streamreader"
}
},
"writer": {
"streamwriter": {
"class": "com.alibaba.datax.plugin.writer.streamwriter.StreamWriter",
"description": {
"mechanism": "use datax framework to transport data to stream.",
"useScene": "only for developer test.",
"warn": "Never use it in your real job."
},
"developer": "alibaba",
"name": "streamwriter",
"path": "http://Users/lebron374/Documents/github/DataX/target/datax/datax/plugin/writer/streamwriter"
}
}
}
}
Engine的start過程
public class Engine {
private static final Logger LOG = LoggerFactory.getLogger(Engine.class);
private static String RUNTIME_MODE;
/* check job model (job/task) first */
public void start(Configuration allConf) {
// 省略相關代碼
boolean isJob = !("taskGroup".equalsIgnoreCase(allConf
.getString(CoreConstant.DATAX_CORE_CONTAINER_MODEL)));
AbstractContainer container;
if (isJob) {
allConf.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, RUNTIME_MODE);
// 核心點在于JobContainer的對象
container = new JobContainer(allConf);
instanceId = allConf.getLong(
CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, 0);
}
Configuration jobInfoConfig = allConf.getConfiguration(CoreConstant.DATAX_JOB_JOBINFO);
// 核心容器的啟動
container.start();
}
說明:
start過程中做了兩件事:
- 1、創(chuàng)建JobContainer對象
- 2、啟動JobContainer對象
JobContainer的啟動過程
public class JobContainer extends AbstractContainer {
/**
* jobContainer主要負責的工作全部在start()里面,包括init、prepare、split、scheduler、
* post以及destroy和statistics
*/
@Override
public void start() {
try {
isDryRun = configuration.getBool(CoreConstant.DATAX_JOB_SETTING_DRYRUN, false);
if(isDryRun) {
// 省略相關代碼
} else {
//拷貝一份新的配置,保證線程安全
userConf = configuration.clone();
// 執(zhí)行preHandle()操作
LOG.debug("jobContainer starts to do preHandle ...");
this.preHandle();
// 執(zhí)行reader、transform、writer等初始化
LOG.debug("jobContainer starts to do init ...");
this.init();
// 執(zhí)行plugin的prepare
LOG.info("jobContainer starts to do prepare ...");
this.prepare();
// 執(zhí)行任務切分
LOG.info("jobContainer starts to do split ...");
this.totalStage = this.split();
// 執(zhí)行任務調度
LOG.info("jobContainer starts to do schedule ...");
this.schedule();
// 執(zhí)行后置操作
LOG.debug("jobContainer starts to do post ...");
this.post();
// 執(zhí)行postHandle操作
LOG.debug("jobContainer starts to do postHandle ...");
this.postHandle();
LOG.info("DataX jobId [{}] completed successfully.", this.jobId);
this.invokeHooks();
}
} catch (Throwable e) {
// 省略相關代碼
} finally {
// 省略相關代碼
}
}
}
說明:
JobContainer的start方法會執(zhí)行一系列job相關的操作,如下:
- 1、執(zhí)行job的preHandle()操作,暫時不關注。
- 2、執(zhí)行job的init()操作,需重點關注。
- 3、執(zhí)行job的prepare()操作,暫時不關注。
- 4、執(zhí)行job的split()操作,需重點關注。
- 5、執(zhí)行job的schedule()操作,需重點關注。
- 6、執(zhí)行job的post()和postHandle()操作,暫時不關注。
Job的init過程
public class JobContainer extends AbstractContainer {
private void init() {
this.jobId = this.configuration.getLong(
CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, -1);
if (this.jobId < 0) {
LOG.info("Set jobId = 0");
this.jobId = 0;
this.configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID,
this.jobId);
}
Thread.currentThread().setName("job-" + this.jobId);
// 初始化
JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector(
this.getContainerCommunicator());
//必須先Reader ,后Writer
this.jobReader = this.initJobReader(jobPluginCollector);
this.jobWriter = this.initJobWriter(jobPluginCollector);
}
private Reader.Job initJobReader(
JobPluginCollector jobPluginCollector) {
// 獲取插件名字
this.readerPluginName = this.configuration.getString(
CoreConstant.DATAX_JOB_CONTENT_READER_NAME);
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.READER, this.readerPluginName));
Reader.Job jobReader = (Reader.Job) LoadUtil.loadJobPlugin(
PluginType.READER, this.readerPluginName);
// 設置reader的jobConfig
jobReader.setPluginJobConf(this.configuration.getConfiguration(
CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER));
// 設置reader的readerConfig
jobReader.setPeerPluginJobConf(this.configuration.getConfiguration(
CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER));
jobReader.setJobPluginCollector(jobPluginCollector);
// 這里已經(jīng)到每個插件具體的初始化操作
jobReader.init();
classLoaderSwapper.restoreCurrentThreadClassLoader();
return jobReader;
}
private Writer.Job initJobWriter(
JobPluginCollector jobPluginCollector) {
this.writerPluginName = this.configuration.getString(
CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME);
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.WRITER, this.writerPluginName));
Writer.Job jobWriter = (Writer.Job) LoadUtil.loadJobPlugin(
PluginType.WRITER, this.writerPluginName);
// 設置writer的jobConfig
jobWriter.setPluginJobConf(this.configuration.getConfiguration(
CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER));
// 設置reader的readerConfig
jobWriter.setPeerPluginJobConf(this.configuration.getConfiguration(
CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER));
jobWriter.setPeerPluginName(this.readerPluginName);
jobWriter.setJobPluginCollector(jobPluginCollector);
jobWriter.init();
classLoaderSwapper.restoreCurrentThreadClassLoader();
return jobWriter;
}
}
說明:
Job的init()過程主要做了兩個事情,分別是:
- 1、創(chuàng)建reader的job對象,通過URLClassLoader實現(xiàn)類加載。
- 2、創(chuàng)建writer的job對象,通過URLClassLoader實現(xiàn)類加載。
job的split過程
public class JobContainer extends AbstractContainer {
private int split() {
this.adjustChannelNumber();
if (this.needChannelNumber <= 0) {
this.needChannelNumber = 1;
}
List<Configuration> readerTaskConfigs = this
.doReaderSplit(this.needChannelNumber);
int taskNumber = readerTaskConfigs.size();
List<Configuration> writerTaskConfigs = this
.doWriterSplit(taskNumber);
List<Configuration> transformerList = this.configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER);
LOG.debug("transformer configuration: "+ JSON.toJSONString(transformerList));
/**
* 輸入是reader和writer的parameter list,輸出是content下面元素的list
*/
List<Configuration> contentConfig = mergeReaderAndWriterTaskConfigs(
readerTaskConfigs, writerTaskConfigs, transformerList);
LOG.debug("contentConfig configuration: "+ JSON.toJSONString(contentConfig));
this.configuration.set(CoreConstant.DATAX_JOB_CONTENT, contentConfig);
return contentConfig.size();
}
private void adjustChannelNumber() {
int needChannelNumberByByte = Integer.MAX_VALUE;
int needChannelNumberByRecord = Integer.MAX_VALUE;
boolean isByteLimit = (this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_BYTE, 0) > 0);
if (isByteLimit) {
long globalLimitedByteSpeed = this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_BYTE, 10 * 1024 * 1024);
Long channelLimitedByteSpeed = this.configuration
.getLong(CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE);
needChannelNumberByByte =
(int) (globalLimitedByteSpeed / channelLimitedByteSpeed);
needChannelNumberByByte =
needChannelNumberByByte > 0 ? needChannelNumberByByte : 1;
LOG.info("Job set Max-Byte-Speed to " + globalLimitedByteSpeed + " bytes.");
}
boolean isRecordLimit = (this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_RECORD, 0)) > 0;
if (isRecordLimit) {
long globalLimitedRecordSpeed = this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_RECORD, 100000);
Long channelLimitedRecordSpeed = this.configuration.getLong(
CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD);
needChannelNumberByRecord =
(int) (globalLimitedRecordSpeed / channelLimitedRecordSpeed);
needChannelNumberByRecord =
needChannelNumberByRecord > 0 ? needChannelNumberByRecord : 1;
}
// 取較小值
this.needChannelNumber = needChannelNumberByByte < needChannelNumberByRecord ?
needChannelNumberByByte : needChannelNumberByRecord;
boolean isChannelLimit = (this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL, 0) > 0);
if (isChannelLimit) {
this.needChannelNumber = this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL);
LOG.info("Job set Channel-Number to " + this.needChannelNumber
+ " channels.");
return;
}
throw DataXException.asDataXException(
FrameworkErrorCode.CONFIG_ERROR,
"Job運行速度必須設置");
}
}
說明:
DataX的job的split過程主要是根據(jù)限流配置計算channel的個數(shù),進而計算task的個數(shù),主要過程如下:
- 1、adjustChannelNumber的過程根據(jù)按照字節(jié)限流和record限流計算channel的個數(shù)。
- 2、reader的個數(shù)根據(jù)channel的個數(shù)進行計算。
- 3、writer的個數(shù)根據(jù)reader的個數(shù)進行計算,writer和reader實現(xiàn)1:1綁定。
- 4、通過mergeReaderAndWriterTaskConfigs()方法生成reader+writer的task的configuration,至此我們生成了task的配置信息。
Job的schedule過程
public class JobContainer extends AbstractContainer {
private void schedule() {
/**
* 通過獲取配置信息得到每個taskGroup需要運行哪些tasks任務
*/
List<Configuration> taskGroupConfigs = JobAssignUtil.assignFairly(this.configuration,
this.needChannelNumber, channelsPerTaskGroup);
ExecuteMode executeMode = null;
AbstractScheduler scheduler;
try {
executeMode = ExecuteMode.STANDALONE;
scheduler = initStandaloneScheduler(this.configuration);
//設置 executeMode
for (Configuration taskGroupConfig : taskGroupConfigs) {
taskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, executeMode.getValue());
}
// 開始調度所有的taskGroup
scheduler.schedule(taskGroupConfigs);
} catch (Exception e) {
// 省略相關代碼
}
}
}
說明:
Job的schedule的過程主要做了兩件事,分別是:
- 1、將task拆分成taskGroup,生成List<Configuration> taskGroupConfigs。
- 2、啟動taskgroup的對象, scheduler.schedule(taskGroupConfigs)。
TaskGroup的schedule過程
public abstract class AbstractScheduler {
public void schedule(List<Configuration> configurations) {
int totalTasks = calculateTaskCount(configurations);
// 啟動所有的TaskGroup
startAllTaskGroup(configurations);
try {
while (true) {
// 省略相關代碼
}
} catch (InterruptedException e) {
}
}
}
public abstract class ProcessInnerScheduler extends AbstractScheduler {
private ExecutorService taskGroupContainerExecutorService;
@Override
public void startAllTaskGroup(List<Configuration> configurations) {
//todo 根據(jù)taskGroup的數(shù)量啟動固定的線程數(shù)
this.taskGroupContainerExecutorService = Executors
.newFixedThreadPool(configurations.size());
//todo 每個TaskGroup啟動一個TaskGroupContainerRunner
for (Configuration taskGroupConfiguration : configurations) {
//todo 創(chuàng)建TaskGroupContainerRunner并提交線程池運行
TaskGroupContainerRunner taskGroupContainerRunner = newTaskGroupContainerRunner(taskGroupConfiguration);
this.taskGroupContainerExecutorService.execute(taskGroupContainerRunner);
}
// 等待所有任務執(zhí)行完后會關閉,執(zhí)行該方法后不會再接收新任務
this.taskGroupContainerExecutorService.shutdown();
}
}
public class TaskGroupContainerRunner implements Runnable {
private TaskGroupContainer taskGroupContainer;
private State state;
public TaskGroupContainerRunner(TaskGroupContainer taskGroup) {
this.taskGroupContainer = taskGroup;
this.state = State.SUCCEEDED;
}
@Override
public void run() {
try {
Thread.currentThread().setName(
String.format("taskGroup-%d", this.taskGroupContainer.getTaskGroupId()));
this.taskGroupContainer.start();
this.state = State.SUCCEEDED;
} catch (Throwable e) {
}
}
}
說明:
TaskGroup的Schedule方法做的事情如下:
- 1、為所有的TaskGroup創(chuàng)建TaskGroupContainerRunner。
- 2、通過線程池提交TaskGroupContainerRunner任務,執(zhí)行TaskGroupContainerRunner的run()方法。
- 3、在run()方法內部執(zhí)行this.taskGroupContainer.start()方法。
TaskGroupContainer的啟動
public class TaskGroupContainer extends AbstractContainer {
@Override
public void start() {
try {
// 省略相關代碼
int taskCountInThisTaskGroup = taskConfigs.size();
Map<Integer, Configuration> taskConfigMap = buildTaskConfigMap(taskConfigs); //taskId與task配置
List<Configuration> taskQueue = buildRemainTasks(taskConfigs); //待運行task列表
Map<Integer, TaskExecutor> taskFailedExecutorMap = new HashMap<Integer, TaskExecutor>(); //taskId與上次失敗實例
List<TaskExecutor> runTasks = new ArrayList<TaskExecutor>(channelNumber); //正在運行task
Map<Integer, Long> taskStartTimeMap = new HashMap<Integer, Long>(); //任務開始時間
while (true) {
// 省略相關代碼
// 新增任務會在這里被啟動
Iterator<Configuration> iterator = taskQueue.iterator();
while(iterator.hasNext() && runTasks.size() < channelNumber){
Configuration taskConfig = iterator.next();
Integer taskId = taskConfig.getInt(CoreConstant.TASK_ID);
int attemptCount = 1;
TaskExecutor lastExecutor = taskFailedExecutorMap.get(taskId);
// todo 需要新建任務的配置信息
Configuration taskConfigForRun = taskMaxRetryTimes > 1 ? taskConfig.clone() : taskConfig;
// todo taskExecutor應該就需要新建的任務
TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun, attemptCount);
taskStartTimeMap.put(taskId, System.currentTimeMillis());
taskExecutor.doStart();
iterator.remove();
runTasks.add(taskExecutor);
}
} catch (Throwable e) {
}finally {
}
}
}
說明:
TaskGroupContainer的內部主要做的事情如下:
- 1、根據(jù)TaskGroupContainer分配的Task任務列表,創(chuàng)建TaskExecutor對象。
- 2、創(chuàng)建TaskExecutor對象,用以啟動分配該TaskGroup的task。
- 3、至此,已經(jīng)成功的啟動了Job當中的Task任務。
Task的啟動
class TaskExecutor {
private Channel channel;
private Thread readerThread;
private Thread writerThread;
private ReaderRunner readerRunner;
private WriterRunner writerRunner;
/**
* 該處的taskCommunication在多處用到:
* 1. channel
* 2. readerRunner和writerRunner
* 3. reader和writer的taskPluginCollector
*/
public TaskExecutor(Configuration taskConf, int attemptCount) {
// 獲取該taskExecutor的配置
this.taskConfig = taskConf;
// 得到taskId
this.taskId = this.taskConfig.getInt(CoreConstant.TASK_ID);
this.attemptCount = attemptCount;
/**
* 由taskId得到該taskExecutor的Communication
* 要傳給readerRunner和writerRunner,同時要傳給channel作統(tǒng)計用
*/
this.channel = ClassUtil.instantiate(channelClazz,
Channel.class, configuration);
// channel在這里生成,每個taskGroup生成一個channel,在generateRunner方法當中生成writer或reader并注入channel
this.channel.setCommunication(this.taskCommunication);
/**
* 獲取transformer的參數(shù)
*/
List<TransformerExecution> transformerInfoExecs = TransformerUtil.buildTransformerInfo(taskConfig);
/**
* 生成writerThread
*/
writerRunner = (WriterRunner) generateRunner(PluginType.WRITER);
this.writerThread = new Thread(writerRunner,
String.format("%d-%d-%d-writer",
jobId, taskGroupId, this.taskId));
//通過設置thread的contextClassLoader,即可實現(xiàn)同步和主程序不通的加載器
this.writerThread.setContextClassLoader(LoadUtil.getJarLoader(
PluginType.WRITER, this.taskConfig.getString(
CoreConstant.JOB_WRITER_NAME)));
/**
* 生成readerThread
*/
readerRunner = (ReaderRunner) generateRunner(PluginType.READER,transformerInfoExecs);
this.readerThread = new Thread(readerRunner,
String.format("%d-%d-%d-reader",
jobId, taskGroupId, this.taskId));
/**
* 通過設置thread的contextClassLoader,即可實現(xiàn)同步和主程序不通的加載器
*/
this.readerThread.setContextClassLoader(LoadUtil.getJarLoader(
PluginType.READER, this.taskConfig.getString(
CoreConstant.JOB_READER_NAME)));
}
public void doStart() {
this.writerThread.start();
this.readerThread.start();
}
}
說明:
TaskExecutor的啟動過程主要做了以下事情:
- 1、創(chuàng)建了reader和writer的線程任務,reader和writer公用一個channel。
- 2、先啟動writer線程后,再啟動reader線程。
- 3、至此,同步數(shù)據(jù)的Task任務已經(jīng)啟動了。
DataX的數(shù)據(jù)傳輸
?跟一般的生產(chǎn)者-消費者模式一樣,Reader插件和Writer插件之間也是通過channel來實現(xiàn)數(shù)據(jù)的傳輸?shù)?。channel可以是內存的,也可能是持久化的,插件不必關心。插件通過RecordSender往channel寫入數(shù)據(jù),通過RecordReceiver從channel讀取數(shù)據(jù)。
?channel中的一條數(shù)據(jù)為一個Record的對象,Record中可以放多個Column對象,這可以簡單理解為數(shù)據(jù)庫中的記錄和列。
public class DefaultRecord implements Record {
private static final int RECORD_AVERGAE_COLUMN_NUMBER = 16;
private List<Column> columns;
private int byteSize;
// 首先是Record本身需要的內存
private int memorySize = ClassSize.DefaultRecordHead;
public DefaultRecord() {
this.columns = new ArrayList<Column>(RECORD_AVERGAE_COLUMN_NUMBER);
}
@Override
public void addColumn(Column column) {
columns.add(column);
incrByteSize(column);
}
@Override
public Column getColumn(int i) {
if (i < 0 || i >= columns.size()) {
return null;
}
return columns.get(i);
}
@Override
public void setColumn(int i, final Column column) {
if (i < 0) {
throw DataXException.asDataXException(FrameworkErrorCode.ARGUMENT_ERROR,
"不能給index小于0的column設置值");
}
if (i >= columns.size()) {
expandCapacity(i + 1);
}
decrByteSize(getColumn(i));
this.columns.set(i, column);
incrByteSize(getColumn(i));
}
}