MySqlSourceReader類核心功能
數(shù)據(jù)分片
執(zhí)行start方法向MySqlSourceEnumerator發(fā)起分片請求, 觸發(fā)MySqlSourceEnumerator類的handleSplitRequest方法, 并向reader分配分片
public void start() {
if (getNumberOfCurrentlyAssignedSplits() <= 1) {
context.sendSplitRequest();
}
}
分片處理
執(zhí)行addSplits方法, 處理MySqlSourceEnumerator分配的chunk分片
private void addSplits(List<MySqlSplit> splits, boolean checkTableChangeForBinlogSplit) {
// restore for finishedUnackedSplits
// 存放未處理的分片
List<MySqlSplit> unfinishedSplits = new ArrayList<>();
for (MySqlSplit split : splits) {
LOG.info("Source reader {} adds split {}", subtaskId, split);
// 全量階段
if (split.isSnapshotSplit()) {
MySqlSnapshotSplit snapshotSplit = split.asSnapshotSplit();
// 判斷該分片是否已完成, 如已完成存放到finishedUnackedSplits map中
if (snapshotSplit.isSnapshotReadFinished()) {
finishedUnackedSplits.put(snapshotSplit.splitId(), snapshotSplit);
} else if (sourceConfig
.getTableFilters()
.dataCollectionFilter()
.isIncluded(split.asSnapshotSplit().getTableId())) {
// 如匹配到配置的同步表, 放入未處理分片的列表中
unfinishedSplits.add(split);
} else {
LOG.debug(
"The subtask {} is skipping split {} because it does not match new table filter.",
subtaskId,
split.splitId());
}
} else {
// Binlog階段
MySqlBinlogSplit binlogSplit = split.asBinlogSplit();
// When restore from a checkpoint, the finished split infos may contain some splits
// for the deleted tables.
// We need to remove these splits for the deleted tables at the finished split
// infos.
// 如果從CK恢復(fù), 需移除可能被刪除的同步表
if (checkTableChangeForBinlogSplit) {
binlogSplit =
filterOutdatedSplitInfos(
binlogSplit,
sourceConfig
.getMySqlConnectorConfig()
.getTableFilters()
.dataCollectionFilter());
}
// Try to discovery table schema once for newly added tables when source reader
// start or restore
// 是否有新增同步表
boolean checkNewlyAddedTableSchema =
!mySqlSourceReaderContext.isHasAssignedBinlogSplit()
&& sourceConfig.isScanNewlyAddedTableEnabled();
mySqlSourceReaderContext.setHasAssignedBinlogSplit(true);
// the binlog split is suspended
if (binlogSplit.isSuspended()) {
suspendedBinlogSplit = binlogSplit;
} else if (!binlogSplit.isCompletedSplit()) {
uncompletedBinlogSplits.put(binlogSplit.splitId(), binlogSplit);
requestBinlogSplitMetaIfNeeded(binlogSplit);
} else {
uncompletedBinlogSplits.remove(binlogSplit.splitId());
MySqlBinlogSplit mySqlBinlogSplit =
discoverTableSchemasForBinlogSplit(
binlogSplit, sourceConfig, checkNewlyAddedTableSchema);
unfinishedSplits.add(mySqlBinlogSplit);
}
LOG.info(
"Source reader {} received the binlog split : {}.", subtaskId, binlogSplit);
context.sendSourceEventToCoordinator(new BinlogSplitAssignedEvent());
}
}
// notify split enumerator again about the finished unacked snapshot splits
reportFinishedSnapshotSplitsIfNeed();
// add all un-finished splits (including binlog split) to SourceReaderBase
// 調(diào)用父類SourceReaderBase.addSplits方法, 添加 un-finished splits
if (!unfinishedSplits.isEmpty()) {
super.addSplits(unfinishedSplits);
} else if (suspendedBinlogSplit
!= null) { // only request new snapshot split if the binlog split is suspended
context.sendSplitRequest();
}
}
繼續(xù)調(diào)用SourceReaderBase.addSplits方法
@Override
public void addSplits(List<SplitT> splits) {
LOG.info("Adding split(s) to reader: {}", splits);
// Initialize the state for each split.
splits.forEach(
s ->
splitStates.put(
s.splitId(), new SplitContext<>(s.splitId(), initializedState(s))));
// Hand over the splits to the split fetcher to start fetch.
splitFetcherManager.addSplits(splits);
}
繼續(xù)調(diào)用splitFetcherManager.addSplits方法, 其中splitFetcherManager為SingleThreadFetcherManager對象, 在MySqlSourceReader構(gòu)造函數(shù)中進(jìn)行的初始化
@Override
public void addSplits(List<SplitT> splitsToAdd) {
// 獲取正在運(yùn)行的fetcher
SplitFetcher<E, SplitT> fetcher = getRunningFetcher();
if (fetcher == null) {
/**
* 創(chuàng)建SplitFetcher, 并加入到fetchers集合中
* */
fetcher = createSplitFetcher();
// Add the splits to the fetchers.
// 創(chuàng)建AddSplitsTask任務(wù), 并加入到taskQueue中待執(zhí)行
fetcher.addSplits(splitsToAdd);
// 啟動執(zhí)行fetcher
startFetcher(fetcher);
} else {
// 創(chuàng)建AddSplitsTask任務(wù), 并加入到taskQueue中待執(zhí)行
fetcher.addSplits(splitsToAdd);
}
}
繼續(xù)查看SplitFetcher run方法
@Override
public void run() {
LOG.info("Starting split fetcher {}", id);
try {
while (runOnce()) {
// nothing to do, everything is inside #runOnce.
}
} catch (Throwable t) {
errorHandler.accept(t);
} finally {
try {
splitReader.close();
} catch (Exception e) {
errorHandler.accept(e);
} finally {
LOG.info("Split fetcher {} exited.", id);
// This executes after possible errorHandler.accept(t). If these operations bear
// a happens-before relation, then we can checking side effect of
// errorHandler.accept(t)
// to know whether it happened after observing side effect of shutdownHook.run().
shutdownHook.run();
}
}
}
繼續(xù)調(diào)用runOnce()
boolean runOnce() {
// first blocking call = get next task. blocks only if there are no active splits and queued
// tasks.
SplitFetcherTask task;
lock.lock();
try {
if (closed) {
return false;
}
// 從taskQueue獲取待執(zhí)行的任務(wù)
task = getNextTaskUnsafe();
if (task == null) {
// (spurious) wakeup, so just repeat
return true;
}
LOG.debug("Prepare to run {}", task);
// store task for #wakeUp
this.runningTask = task;
} finally {
lock.unlock();
}
// execute the task outside of lock, so that it can be woken up
boolean taskFinished;
try {
// 執(zhí)行SplitFetcherTask run方法
taskFinished = task.run();
} catch (Exception e) {
throw new RuntimeException(
String.format(
"SplitFetcher thread %d received unexpected exception while polling the records",
id),
e);
}
// re-acquire lock as all post-processing steps, need it
lock.lock();
try {
this.runningTask = null;
processTaskResultUnsafe(task, taskFinished);
} finally {
lock.unlock();
}
return true;
}
繼續(xù)調(diào)用AddSplitsTask run方法, 這里的splitReader對象是MySqlSplitReader, 執(zhí)行其handleSplitsChanges方法,
@Override
public boolean run() {
for (SplitT s : splitsToAdd) {
assignedSplits.put(s.splitId(), s);
}
splitReader.handleSplitsChanges(new SplitsAddition<>(splitsToAdd));
return true;
}
handleSplitsChanges方法只是把MySqlSplit添加到對應(yīng)的snapshotSplits或者binlogSplits列表中
@Override
public void handleSplitsChanges(SplitsChange<MySqlSplit> splitsChanges) {
if (!(splitsChanges instanceof SplitsAddition)) {
throw new UnsupportedOperationException(
String.format(
"The SplitChange type of %s is not supported.",
splitsChanges.getClass()));
}
LOG.info("Handling split change {}", splitsChanges);
for (MySqlSplit mySqlSplit : splitsChanges.splits()) {
if (mySqlSplit.isSnapshotSplit()) {
snapshotSplits.add(mySqlSplit.asSnapshotSplit());
} else {
binlogSplits.add(mySqlSplit.asBinlogSplit());
}
}
}
在前面createSplitFetcher方法創(chuàng)建SplitFetcher對象時, 其構(gòu)造函數(shù)中會創(chuàng)建FetchTask
SplitFetcher(
int id,
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
SplitReader<E, SplitT> splitReader,
Consumer<Throwable> errorHandler,
Runnable shutdownHook,
Consumer<Collection<String>> splitFinishedHook,
boolean allowUnalignedSourceSplits) {
this.id = id;
this.elementsQueue = checkNotNull(elementsQueue);
this.splitReader = checkNotNull(splitReader);
this.errorHandler = checkNotNull(errorHandler);
this.shutdownHook = checkNotNull(shutdownHook);
this.allowUnalignedSourceSplits = allowUnalignedSourceSplits;
// 創(chuàng)建fetchTask, 其中run 方法中會調(diào)用splitReader.fetch()方法
this.fetchTask =
new FetchTask<>(
splitReader,
elementsQueue,
ids -> {
ids.forEach(assignedSplits::remove);
splitFinishedHook.accept(ids);
LOG.info("Finished reading from splits {}", ids);
},
id);
}
最終調(diào)用MySqlSplitReader.fetch()方法去拉取數(shù)據(jù), 由DebeziumReader讀取數(shù)據(jù)