Presto技術(shù)源碼解析總結(jié)-一個(gè)SQL的奇幻之旅 上

Presto技術(shù)總結(jié) 因?yàn)閮?nèi)容過長(zhǎng)分為了上下兩集

1.環(huán)境準(zhǔn)備

Hadoop環(huán)境,Hive環(huán)境,mysql環(huán)境,ssh環(huán)境,presto本機(jī)debug環(huán)境

推薦hadoop2.2.0、hive1.2.1、mysql5.7、openssh-server&client、presto最新版本

presto本地debug環(huán)境搭建參考presto in idea

2.查詢?nèi)肟?amp;流程

所有的查詢首先打到StatementResource對(duì)應(yīng)的路徑為@Path("/v1/statement")

Query query = Query.create(
                sessionContext,
                statement,         //實(shí)際的sql
                queryManager,
                sessionPropertyManager,
                exchangeClient,
                responseExecutor,
                timeoutExecutor,
                blockEncodingSerde);
        queries.put(query.getQueryId(), query);  //創(chuàng)建query并放入執(zhí)行隊(duì)列

Query類中執(zhí)行

Query result = new Query(sessionContext, query, queryManager, sessionPropertyManager, exchangeClient, dataProcessorExecutor, timeoutExecutor, blockEncodingSerde);

Query類中的queryManager

QueryInfo queryInfo = queryManager.createQuery(sessionContext, query);  //其中sessionContext為用戶的seesion信息,query為用戶sql

2.1詞法語(yǔ)法分析生成AST

queryManager是一個(gè)接口目前只有SqlQueryManager的實(shí)現(xiàn)類,createQuery方法

private final ConcurrentMap<QueryId, QueryExecution> queries = new ConcurrentHashMap<>();
//QueryQueueManager是一個(gè)接口,sql相關(guān)的實(shí)現(xiàn)類SqlQueryQueueManager
private final QueryQueueManager queueManager;

//主要實(shí)現(xiàn)的邏輯,詞法和語(yǔ)法分析,生成AstNode
Statement wrappedStatement = sqlParser.createStatement(query, createParsingOptions(session));
statement = unwrapExecuteStatement(wrappedStatement, sqlParser, session);
List<Expression> parameters = wrappedStatement instanceof Execute ? ((Execute) wrappedStatement).getParameters() : emptyList();

//參數(shù)校驗(yàn)
validateParameters(statement, parameters);
//獲取對(duì)應(yīng)的query執(zhí)行器工廠
QueryExecutionFactory<?> queryExecutionFactory = executionFactories.get(statement.getClass());
//query執(zhí)行器工廠創(chuàng)建query執(zhí)行器
queryExecution = queryExecutionFactory.createQueryExecution(queryId, query, session, statement, parameters);
//將query執(zhí)行器個(gè)queryId映射到map中
queries.put(queryId, queryExecution);
//將query執(zhí)行器提交到queueManager
queueManager.submit(statement, queryExecution, queryExecutor);
//返回query信息
return queryInfo;

SqlQueryQueueManager的submit方法

List<QueryQueue> queues;
        try {
            //按照配置的規(guī)則,選擇執(zhí)行隊(duì)列
            queues = selectQueues(queryExecution.getSession(), executor);
        }
        catch (PrestoException e) {
            queryExecution.fail(e);
            return;
        }
        for (QueryQueue queue : queues) {
            if (!queue.reserve(queryExecution)) {
                queryExecution.fail(new PrestoException(QUERY_QUEUE_FULL, "Too many queued queries"));
                return;
            }
        }
        //如果符合規(guī)則則入隊(duì)
        queues.get(0).enqueue(createQueuedExecution(queryExecution, queues.subList(1, queues.size()), executor));

    //按照配置的規(guī)則,選擇執(zhí)行隊(duì)列
    private List<QueryQueue> selectQueues(Session session, Executor executor)
    {
        for (QueryQueueRule rule : rules) {
            Optional<List<QueryQueueDefinition>> queues = rule.match(session.toSessionRepresentation());
            if (queues.isPresent()) {
               //獲取或者創(chuàng)建一個(gè)Query隊(duì)列
                return getOrCreateQueues(session, executor, queues.get());
            }
        }
        throw new PrestoException(QUERY_REJECTED, "Query did not match any queuing rule");
    }

    //獲取或者創(chuàng)建一個(gè)Query隊(duì)列
    private List<QueryQueue> getOrCreateQueues(Session session, Executor executor, List<QueryQueueDefinition> definitions)
    {
        ImmutableList.Builder<QueryQueue> queues = ImmutableList.builder();
        for (QueryQueueDefinition definition : definitions) {
            String expandedName = definition.getExpandedTemplate(session);
            QueueKey key = new QueueKey(definition, expandedName);
            if (!queryQueues.containsKey(key)) {
                QueryQueue queue = new QueryQueue(executor, definition.getMaxQueued(), definition.getMaxConcurrent());
                if (queryQueues.putIfAbsent(key, queue) == null) {
                    // Export the mbean, after checking for races
                    String objectName = ObjectNames.builder(QueryQueue.class, definition.getTemplate()).withProperty("expansion", expandedName).build();
                    mbeanExporter.export(objectName, queue);
                }
            }
            queues.add(queryQueues.get(key));
        }
        return queues.build();
    }

QueryQueue(Executor queryExecutor, int maxQueuedQueries, int maxConcurrentQueries)
    {
        requireNonNull(queryExecutor, "queryExecutor is null");
        checkArgument(maxQueuedQueries > 0, "maxQueuedQueries must be greater than zero");
        checkArgument(maxConcurrentQueries > 0, "maxConcurrentQueries must be greater than zero");

        int permits = maxQueuedQueries + maxConcurrentQueries;
        // Check for overflow
        checkArgument(permits > 0, "maxQueuedQueries + maxConcurrentQueries must be less than or equal to %s", Integer.MAX_VALUE);

        this.queuePermits = new AtomicInteger(permits);
        this.asyncSemaphore = new AsyncSemaphore<>(maxConcurrentQueries,
                queryExecutor,
                queueEntry -> {
                    QueuedExecution queuedExecution = queueEntry.dequeue();
                    if (queuedExecution != null) {
                        queuedExecution.start();
                        return queuedExecution.getCompletionFuture();
                    }
                    return Futures.immediateFuture(null);
                });
    }

public void start()
    {
        // Only execute if the query is not already completed (e.g. cancelled)
        if (listenableFuture.isDone()) {
            return;
        }
        if (nextQueues.isEmpty()) {
            executor.execute(() -> {
                try (SetThreadName ignored = new SetThreadName("Query-%s", queryExecution.getQueryId())) {
                    //將statement轉(zhuǎn)化為analysis(plan)
                    queryExecution.start();
                }
            });
        }
        else {
            nextQueues.get(0).enqueue(new QueuedExecution(queryExecution, nextQueues.subList(1, nextQueues.size()), executor, listenableFuture));
        }
    }

2.2語(yǔ)義分析&生成邏輯執(zhí)行計(jì)劃

2.2.1語(yǔ)義分析

先看看SqlQueryExecution類的構(gòu)成

private final QueryStateMachine stateMachine;

private final Statement statement;                               //詞法語(yǔ)法分析生成的astNode
private final Metadata metadata;
private final AccessControl accessControl;
private final SqlParser sqlParser;                               //sql解析器
private final SplitManager splitManager;
private final NodePartitioningManager nodePartitioningManager;
private final NodeScheduler nodeScheduler; //將task分配給node的核心模塊,stage調(diào)度的時(shí)候會(huì)詳細(xì)說(shuō)明
private final List<PlanOptimizer> planOptimizers;
private final RemoteTaskFactory remoteTaskFactory;
private final LocationFactory locationFactory;
private final int scheduleSplitBatchSize;
private final ExecutorService queryExecutor;
private final ScheduledExecutorService schedulerExecutor;
private final FailureDetector failureDetector;

private final QueryExplainer queryExplainer;
private final PlanFlattener planFlattener;
private final CostCalculator costCalculator;
private final AtomicReference<SqlQueryScheduler> queryScheduler = new AtomicReference<>();
private final AtomicReference<Plan> queryPlan = new AtomicReference<>();
private final NodeTaskMap nodeTaskMap;
private final ExecutionPolicy executionPolicy;
private final List<Expression> parameters;
private final SplitSchedulerStats schedulerStats;

SqlQueryExecution類的start方法

PlanRoot plan = analyzeQuery();  //生成邏輯執(zhí)行計(jì)劃

//調(diào)用棧為
analyzeQuery() -> doAnalyzeQuery()
  
doAnalyzeQuery()
{
    //創(chuàng)建語(yǔ)義分析器
    Analyzer analyzer = new Analyzer(stateMachine.getSession(), metadata, sqlParser, accessControl, Optional.of(queryExplainer), parameters);
    //開始語(yǔ)義分析
    Analysis analysis = analyzer.analyze(statement);
    //生成邏輯Planner
    LogicalPlanner logicalPlanner = new LogicalPlanner(stateMachine.getSession(), planOptimizers, idAllocator, metadata, sqlParser, costCalculator);
    //邏輯Planner開始生成邏輯執(zhí)行計(jì)劃,還涉及到邏輯執(zhí)行計(jì)劃的優(yōu)化
    Plan plan = logicalPlanner.plan(analysis);
    queryPlan.set(plan);
    //對(duì)邏輯執(zhí)行計(jì)劃進(jìn)行分段,準(zhǔn)備生成分布式執(zhí)行計(jì)劃
    SubPlan fragmentedPlan = PlanFragmenter.createSubPlans(stateMachine.getSession(), metadata, nodePartitioningManager, plan, false);
  
    return new PlanRoot(fragmentedPlan, !explainAnalyze, extractConnectors(analysis));
}

Analyzer類的analyze方法

//sql重寫
Statement rewrittenStatement = StatementRewrite.rewrite(session, metadata, sqlParser, queryExplainer, statement, parameters, accessControl);
//初始化Analysis
Analysis analysis = new Analysis(rewrittenStatement, parameters, isDescribe);
//創(chuàng)建Statement分析器
StatementAnalyzer analyzer = new StatementAnalyzer(analysis, metadata, sqlParser, accessControl, session);
//調(diào)用Statement分析器去分析
analyzer.analyze(rewrittenStatement, Optional.empty());

analyze里面的具體實(shí)現(xiàn)就是遍歷ASTNode對(duì)每種類型的Node作分析,主要是獲取meta和校驗(yàn)元信息

2.2.2生成邏輯執(zhí)行計(jì)劃

LogicalPlanner類的plan方法

PlanNode root = planStatement(analysis, analysis.getStatement());
//檢查執(zhí)行計(jì)劃的有效性
PlanSanityChecker.validateIntermediatePlan(root, session, metadata, sqlParser, symbolAllocator.getTypes());
//對(duì)生成的邏輯執(zhí)行進(jìn)行優(yōu)化
root = optimizer.optimize(root, session, symbolAllocator.getTypes(), symbolAllocator, idAllocator);

LogicalPlanner類的planStatement方法

//對(duì)于普通的sql來(lái)說(shuō),只執(zhí)行下面
createOutputPlan(planStatementWithoutOutput(analysis, statement), analysis);

LogicalPlanner類的planStatementWithoutOutput方法

private RelationPlan planStatementWithoutOutput(Analysis analysis, Statement statement)
    {
        if (statement instanceof CreateTableAsSelect) {
            if (analysis.isCreateTableAsSelectNoOp()) {
                throw new PrestoException(NOT_SUPPORTED, "CREATE TABLE IF NOT EXISTS is not supported in this context " + statement.getClass().getSimpleName());
            }
            return createTableCreationPlan(analysis, ((CreateTableAsSelect) statement).getQuery());
        }
        else if (statement instanceof Insert) {
            checkState(analysis.getInsert().isPresent(), "Insert handle is missing");
            return createInsertPlan(analysis, (Insert) statement);
        }
        else if (statement instanceof Delete) {
            return createDeletePlan(analysis, (Delete) statement);
        }
        else if (statement instanceof Query) {
            return createRelationPlan(analysis, (Query) statement);
        }
        else if (statement instanceof Explain && ((Explain) statement).isAnalyze()) {
            return createExplainAnalyzePlan(analysis, (Explain) statement);
        }
        else {
            throw new PrestoException(NOT_SUPPORTED, "Unsupported statement type " + statement.getClass().getSimpleName());
        }
    }

LogicalPlanner類的createRelationPlan方法

return new RelationPlanner(analysis, symbolAllocator, idAllocator, buildLambdaDeclarationToSymbolMap(analysis, symbolAllocator), metadata, session)
        .process(query, null);

RelationPlanner類具體的實(shí)現(xiàn)就是,遍歷ASTNode,生成邏輯執(zhí)行計(jì)劃里面對(duì)應(yīng)的Node

邏輯執(zhí)行計(jì)劃中常見的Node和Visit操作如下面所示:

AggregationNode            聚合操作的節(jié)點(diǎn),有Final、partial、single三種,表示最終聚合、局部聚合和單點(diǎn)聚合,在執(zhí)行計(jì)劃優(yōu)化前,聚合類型都是單點(diǎn)聚合,在優(yōu)化器中會(huì)拆成局部聚合和最終聚合,類似于MR任務(wù)中的,map端局部reduce,和reduce端最終reduce

DeleteNode                 Delete操作的節(jié)點(diǎn)
ExchangeNode               邏輯執(zhí)行計(jì)劃中,不同Stage之間交換數(shù)據(jù)的節(jié)點(diǎn)
FilterNode                 進(jìn)行Filter過濾操作的節(jié)點(diǎn)
JoinNode                   執(zhí)行Join操作的節(jié)點(diǎn)
LimitNode                  執(zhí)行l(wèi)imit操作的節(jié)點(diǎn)
MarkDistinctNode           處理count distinct
OutputNode                 輸出Node

ProjectNode                將下層的節(jié)點(diǎn)輸出列映射成上層節(jié)點(diǎn) 例如:select a + 1 from b將TableScanNode的a列 + 1 映射到OutputNode

RemoteSourceNode           類似于ExchangeNode,在分布式執(zhí)行計(jì)劃中,不同Stage之間交換數(shù)據(jù)的節(jié)點(diǎn)
SampleNode                 抽樣函數(shù)Node
RowNumberNode              處理窗函數(shù)RowNumber
SortNode                   排序Node
TableScanNode              讀取表的數(shù)據(jù)
TableWriterNode            寫入表的數(shù)據(jù)
TopNNode                   order by ... limit 會(huì)使用效率更高的TopNNode
UnionNode                  處理Union操作
WindowNode                 處理窗口函數(shù)

RelationPlanner類的visit操作

visitTable                 生成TableScanNode
visitAliasedRelation       處理有別名的Relation
visitSampledRelation       添加一個(gè)SampleNode,主要處理抽樣函數(shù)

visitJoin                  根據(jù)不同的join類型,生成不同的節(jié)點(diǎn)結(jié)構(gòu),一般來(lái)說(shuō)是將左右兩邊生成對(duì)應(yīng)的queryPlan,然后左右各添加一個(gè)ProjectNode,中間添加一個(gè)JoinNode相連,讓上層添加一個(gè)FilterNode,F(xiàn)ilterNode為join條件

visitQuery                 使用QueryPlanner處理Query,并返回生成的執(zhí)行計(jì)劃
visitQuerySpecification    使用QueryPlanner處理QueryBody,并返回生成的執(zhí)行計(jì)劃

QueryPlanner類的plan操作(queryBody的定義就是指一個(gè)完整的sql,可以嵌套,例如select a from QueryBody b,通常來(lái)說(shuō)里面的這個(gè)QueryBody會(huì)被當(dāng)做AliasedRelation繼續(xù)plan)

Query和QuerySpecification相比,QuerySpecification代表完整的QueryBody,而Query則包含了QueryBody--QueryBody是一個(gè)抽象類,QuerySpecification繼承了QueryBody

plan(Query query)                 首先取出Query中的queryBody,然后調(diào)用RelationPlanner進(jìn)行分析,調(diào)用其visitQuerySpecification然后RelationPlanner調(diào)用QueryPlanner的plan方法
plan(QuerySpecification query)    生成一個(gè)queryBody中所有的組件Node

下面是最主要的 plan(QuerySpecification query)的plan過程

PlanBuilder builder = planFrom(node);                      //builder的root即為生成的NodeTree
RelationPlan fromRelationPlan = builder.getRelationPlan(); //生成TableScanNode
builder = filter(builder, analysis.getWhere(node), node);  //生成FilterNode
builder = aggregate(builder, node);                        //生成AggregateNode
builder = filter(builder, analysis.getHaving(node), node); //如果有Having則生成FilterNode
builder = window(builder, node);                           //生成windowNode
List<Expression> outputs = analysis.getOutputExpressions(node);
builder = handleSubqueries(builder, node, outputs);
if (node.getOrderBy().isPresent() && !SystemSessionProperties.isLegacyOrderByEnabled(session)) {
    if (analysis.getGroupingSets(node).isEmpty()) {
        builder = project(builder, outputs, fromRelationPlan);
        outputs = toSymbolReferences(computeOutputs(builder, outputs));
        builder = planBuilderFor(builder, analysis.getScope(node.getOrderBy().get()));
    }
    else {
        List<Expression> orderByAggregates = analysis.getOrderByAggregates(node.getOrderBy().get());
        builder = project(builder, Iterables.concat(outputs, orderByAggregates));
        outputs = toSymbolReferences(computeOutputs(builder, outputs));
        List<Expression> complexOrderByAggregatesToRemap = orderByAggregates.stream()
                .filter(expression -> !analysis.isColumnReference(expression))
                .collect(toImmutableList());
        builder = planBuilderFor(builder, analysis.getScope(node.getOrderBy().get()), complexOrderByAggregatesToRemap);
    }
    builder = window(builder, node.getOrderBy().get());
}
List<Expression> orderBy = analysis.getOrderByExpressions(node);
builder = handleSubqueries(builder, node, orderBy);
builder = project(builder, Iterables.concat(orderBy, outputs));
builder = distinct(builder, node);
builder = sort(builder, node);
builder = project(builder, outputs);
builder = limit(builder, node);
return new RelationPlan(
        builder.getRoot(),
        analysis.getScope(node),
        computeOutputs(builder, outputs));

2.2.3邏輯執(zhí)行計(jì)劃優(yōu)化

LogicalPlanner類的plan方法

root = optimizer.optimize(root, session, symbolAllocator.getTypes(), symbolAllocator, idAllocator);

optimizer優(yōu)化器的具體實(shí)現(xiàn)就是,調(diào)用不同的具體實(shí)現(xiàn)的優(yōu)化去對(duì),上一步生成的NodeTree(邏輯執(zhí)行計(jì)劃)進(jìn)行逐個(gè)優(yōu)化

具體的優(yōu)化方法

AddExchanges                   //生成分布式執(zhí)行計(jì)劃,例如添加局部聚合和最終聚合
AddLocalExchanges
BeginTableWrite
CanonicalizeExpressions        //將執(zhí)行計(jì)劃中的表達(dá)式標(biāo)準(zhǔn)化,比如將is not null 改寫為not(is null),將if語(yǔ)句改寫為case when
CheckSubqueryNodesAreRewritten
CountConstantOptimizer         //將count(a)改寫為count(*)提高不同數(shù)據(jù)源的兼容性
DesugaringOptimizer
DetermineJoinDistributionType
EliminateCrossJoins
EmptyDeleteOptimizer
HashGenerationOptimizer     //提前進(jìn)行hash計(jì)算
ImplementIntersectAndExceptAsUnion
IndexJoinOptimizer          //將Join優(yōu)化為IndexJoiJ,獲取Join表的索引,提升速度
IterativeOptimizer
LimitPushDown               //limit條件下推,減小下層節(jié)點(diǎn)的數(shù)據(jù)量
MetadataDeleteOptimizer
MetadataQueryOptimizer      //將對(duì)表的分區(qū)字段進(jìn)行的聚合操作,改寫為針對(duì)表元數(shù)據(jù)的查詢,減少讀取表的操作
OptimizeMixedDistinctAggregations
PickLayout
PredicatePushDown           //謂詞(過濾條件)下推,減下下層節(jié)點(diǎn)的數(shù)據(jù)量
ProjectionPushDown          //ProjectNode下推,減少Union節(jié)點(diǎn)的數(shù)據(jù)量
PruneUnreferencedOutputs    //去除ProjectNodeP不在最終輸出中的列,減小計(jì)算量
PruneRedundantProjections   //去除多余的projectNode,如果上下節(jié)點(diǎn)全都直接映射,則去掉該層projectNode
PushTableWriteThroughUnion
RemoveUnreferencedScalarLateralNodes
SetFlatteningOptimizer         //合并能夠合并的Union語(yǔ)句
SimplifyExpressions            //對(duì)執(zhí)行計(jì)劃中涉及到的表達(dá)式進(jìn)行化簡(jiǎn)和優(yōu)化
TransformCorrelatedNoAggregationSubqueryToJoin
TransformCorrelatedNoAggregationSubqueryToJoin
TransformCorrelatedScalarAggregationToJoin
TransformCorrelatedSingleRowSubqueryToProject
TransformQuantifiedComparisonApplyToLateralJoin
TransformUncorrelatedInPredicateSubqueryToSemiJoin
TransformUncorrelatedLateralToJoin
UnaliasSymbolReferences        //去除執(zhí)行計(jì)劃中projectNode無(wú)意義的映射,如果列直接相對(duì)而沒有帶表達(dá)式則直接映射到上層節(jié)點(diǎn)
WindowFilterPushDown

2.3生成分布式執(zhí)行計(jì)劃

2.3.1邏輯執(zhí)行計(jì)劃分段

這個(gè)階段會(huì)將上面生成的邏輯執(zhí)行計(jì)劃切分為,多個(gè)Stage,其中Stage的階段分為四個(gè)階段:

Sourece、Fixed、Single

Sourece:一般是TableScanNode、ProjectNode、FilterNode,一般是最下游的取數(shù)的Stage

Fixed:一般在Sourece之后,將Sourece階段獲取的數(shù)據(jù)分散到多個(gè)節(jié)點(diǎn)上處理,類似于Map端reduce操作,包括局部聚合、局部Join、局部數(shù)據(jù)寫入

Single:一般在Fixed之后,只在一臺(tái)機(jī)器上進(jìn)行,匯總所有的結(jié)果、做最終聚合、全局排序,并將結(jié)果傳輸給Coordinator

Coordinator_only:只在Coordinator上

SqlQueryExecution類doAnalyzeQuery方法

SubPlan fragmentedPlan = PlanFragmenter.createSubPlans(stateMachine.getSession(), metadata, nodePartitioningManager, plan, false);

//SubPlan類的構(gòu)造
private final PlanFragment fragment;
private final List<SubPlan> children;

可以看出來(lái),內(nèi)部是類似于B樹的樹形結(jié)構(gòu),這樣就將邏輯執(zhí)行計(jì)劃切分為了若干個(gè)段

2.3.2生成分布式執(zhí)行計(jì)劃

2.3.2.1獲取SplitSource分片

SqlQueryExecution類start方法

//生成分段的執(zhí)行計(jì)劃
PlanRoot plan = analyzeQuery();
//生成分布式執(zhí)行計(jì)劃
planDistribution(plan);   

SqlQueryExecution類planDistribution方法

//獲取stage的執(zhí)行計(jì)劃
StageExecutionPlan outputStageExecutionPlan = distributedPlanner.plan(plan.getRoot(), stateMachine.getSession());
//創(chuàng)建SqlQuery調(diào)度
SqlQueryScheduler scheduler = new SqlQueryScheduler(
                //狀態(tài)監(jiān)聽器
                stateMachine,
                locationFactory,
                outputStageExecutionPlan,
                nodePartitioningManager,
                //將task分配給node的核心模塊
                nodeScheduler,
                remoteTaskFactory,
                stateMachine.getSession(),
                plan.isSummarizeTaskInfos(),
                scheduleSplitBatchSize,
                queryExecutor,
                schedulerExecutor,
                failureDetector,
                rootOutputBuffers,
                //保存了當(dāng)前stage分配的task和node的映射關(guān)系
                nodeTaskMap,
                executionPolicy,
                schedulerStats);

DistributedExecutionPlanner類plan方法

調(diào)用棧為plan -> doPlan

private StageExecutionPlan doPlan(SubPlan root, Session session, ImmutableList.Builder<SplitSource> allSplitSources)
{
PlanFragment currentFragment = root.getFragment();
//visitor模式遍歷分段,對(duì)TableScaTNode進(jìn)行splitManager.getSplits()操作來(lái)獲取分片,實(shí)現(xiàn)類是HiveSplitManager,內(nèi)部實(shí)現(xiàn)是調(diào)用HiveSplitLoader.start()方法,下面進(jìn)行詳細(xì)說(shuō)明
//這里好像說(shuō)明了一個(gè)stage里面只能有一個(gè)tableScan???
Map<PlanNodeId, SplitSource> splitSources = currentFragment.getRoot().accept(new Visitor(session, currentFragment.getPipelineExecutionStrategy(), allSplitSources), null);
ImmutableList.Builder<StageExecutionPlan> dependencies = ImmutableList.builder();
for (SubPlan childPlan : root.getChildren()) {
    dependencies.add(doPlan(childPlan, session, allSplitSources)); //此處會(huì)遞歸的調(diào)用,將子邏輯執(zhí)行計(jì)劃全部轉(zhuǎn)化為帶用層級(jí)關(guān)系的stage執(zhí)行計(jì)劃
}
return new StageExecutionPlan(
        currentFragment,
        splitSources,
        dependencies.build());
}

對(duì)TableScaTNode進(jìn)行splitManager.getSplits()操作來(lái)獲取分片,并將結(jié)果保存在Map<PlanNodeId, SplitSource> splitSources中(其實(shí)就是每個(gè)Node對(duì)應(yīng)的SplitSource)

public Map<PlanNodeId, SplitSource> visitTableScan(TableScanNode node, Void context)
{
    // get dataSource for table
    SplitSource splitSource = splitManager.getSplits(
            session,
            node.getLayout().get(),
            pipelineExecutionStrategy == GROUPED_EXECUTION ? GROUPED_SCHEDULING : UNGROUPED_SCHEDULING);
    splitSources.add(splitSource);
    return ImmutableMap.of(node.getId(), splitSource);
}

HiveSplitManager實(shí)現(xiàn)ConnectorSplitManager(Presto SPI接口)

public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableLayoutHandle layoutHandle, SplitSchedulingStrategy splitSchedulingStrategy)
{
    HiveTableLayoutHandle layout = (HiveTableLayoutHandle) layoutHandle;
    SchemaTableName tableName = layout.getSchemaTableName();

    // get table metadata
    SemiTransactionalHiveMetastore metastore = metastoreProvider.apply((HiveTransactionHandle) transaction);
    Table table = metastore.getTable(tableName.getSchemaName(), tableName.getTableName())
            .orElseThrow(() -> new TableNotFoundException(tableName));

    // verify table is not marked as non-readable
    String tableNotReadable = table.getParameters().get(OBJECT_NOT_READABLE);
    if (!isNullOrEmpty(tableNotReadable)) {
        throw new HiveNotReadableException(tableName, Optional.empty(), tableNotReadable);
    }

    // 獲取hive的分區(qū)
    List<HivePartition> partitions = layout.getPartitions()
            .orElseThrow(() -> new PrestoException(GENERIC_INTERNAL_ERROR, "Layout does not contain partitions"));

    // short circuit if we don't have any partitions
    HivePartition partition = Iterables.getFirst(partitions, null);
    if (partition == null) {
        return new FixedSplitSource(ImmutableList.of());
    }

    // get buckets from first partition (arbitrary)
    List<HiveBucket> buckets = partition.getBuckets();

    // validate bucket bucketed execution
    Optional<HiveBucketHandle> bucketHandle = layout.getBucketHandle();
    if ((splitSchedulingStrategy == GROUPED_SCHEDULING) && !bucketHandle.isPresent()) {
        throw new PrestoException(GENERIC_INTERNAL_ERROR, "SchedulingPolicy is bucketed, but BucketHandle is not present");
    }

    // sort partitions
    partitions = Ordering.natural().onResultOf(HivePartition::getPartitionId).reverse().sortedCopy(partitions);

    Iterable<HivePartitionMetadata> hivePartitions = getPartitionMetadata(metastore, table, tableName, partitions, bucketHandle.map(HiveBucketHandle::toBucketProperty));

    HiveSplitLoader hiveSplitLoader = new BackgroundHiveSplitLoader(
            table,
            hivePartitions,
            layout.getCompactEffectivePredicate(),
            createBucketSplitInfo(bucketHandle, buckets),
            session,
            hdfsEnvironment,
            namenodeStats,
            directoryLister,
            executor,
            splitLoaderConcurrency,
            recursiveDfsWalkerEnabled);

    HiveSplitSource splitSource;
    switch (splitSchedulingStrategy) {
        case UNGROUPED_SCHEDULING:
            splitSource = HiveSplitSource.allAtOnce(
                    session,
                    table.getDatabaseName(),
                    table.getTableName(),
                    layout.getCompactEffectivePredicate(),
                    maxInitialSplits,
                    maxOutstandingSplits,
                    maxOutstandingSplitsSize,
                    hiveSplitLoader,
                    executor,
                    new CounterStat());
            break;
        case GROUPED_SCHEDULING:
            splitSource = HiveSplitSource.bucketed(
                    session,
                    table.getDatabaseName(),
                    table.getTableName(),
                    layout.getCompactEffectivePredicate(),
                    maxInitialSplits,
                    maxOutstandingSplits,
                    new DataSize(32, MEGABYTE),
                    hiveSplitLoader,
                    executor,
                    new CounterStat());
            break;
        default:
            throw new IllegalArgumentException("Unknown splitSchedulingStrategy: " + splitSchedulingStrategy);
    }
    hiveSplitLoader.start(splitSource);

    return splitSource;
}
2.3.2.2產(chǎn)生stage執(zhí)行計(jì)劃

上面產(chǎn)生了一個(gè)StageExecutionPlan(stage執(zhí)行計(jì)劃),下面看看StageExecutionPlan的結(jié)構(gòu)**

private final PlanFragment fragment;                       //當(dāng)前執(zhí)行計(jì)劃分段
private final Map<PlanNodeId, SplitSource> splitSources;   //從HiveSplitManager獲取的分片映射關(guān)系
private final List<StageExecutionPlan> subStages;          //子執(zhí)行計(jì)劃分段
private final Optional<List<String>> fieldNames;           //字段名稱

經(jīng)過planDistribution方法之后,分段的邏輯執(zhí)行計(jì)劃就轉(zhuǎn)化成了stage執(zhí)行計(jì)劃,而presto對(duì)task的調(diào)度都是基于stage來(lái)調(diào)度的,緊接著SqlQueryScheduler會(huì)構(gòu)造SqlStage執(zhí)行器

SqlQueryScheduler類的構(gòu)造方法

List<SqlStageExecution> stages = createStages(
                (fragmentId, tasks, noMoreExchangeLocations) -> updateQueryOutputLocations(queryStateMachine, rootBufferId, tasks, noMoreExchangeLocations),
                new AtomicInteger(),
                locationFactory,
                plan.withBucketToPartition(Optional.of(new int[1])),
                nodeScheduler,
                remoteTaskFactory,
                session,
                splitBatchSize,
                partitioningHandle -> partitioningCache.computeIfAbsent(partitioningHandle, handle -> nodePartitioningManager.getNodePartitioningMap(session, handle)),
                nodePartitioningManager,
                queryExecutor,
                schedulerExecutor,
                failureDetector,
                nodeTaskMap,
                stageSchedulers,
                stageLinkages);

SqlStageExecution rootStage = stages.get(0);
2.3.2.3產(chǎn)生stage執(zhí)行器

SqlQueryScheduler類的createStages方法

ImmutableList.Builder<SqlStageExecution> stages = ImmutableList.builder();

StageId stageId = new StageId(queryStateMachine.getQueryId(), nextStageId.getAndIncrement());
SqlStageExecution stage = new SqlStageExecution(  //創(chuàng)建當(dāng)前的SqlStageExecution
        stageId,
        locationFactory.createStageLocation(stageId),
        plan.getFragment(),
        remoteTaskFactory,
        session,
        summarizeTaskInfo,
        nodeTaskMap,
        queryExecutor,
        failureDetector,
        schedulerStats);
stages.add(stage);

...
...
//中間省略創(chuàng)建stage調(diào)度器和分配策略的步驟,詳情見2.4.3

ImmutableSet.Builder<SqlStageExecution> childStagesBuilder = ImmutableSet.builder();
        for (StageExecutionPlan subStagePlan : plan.getSubStages()) {
            List<SqlStageExecution> subTree = createStages( //遞歸創(chuàng)建所有的子SqlStageExecution
                    stage::addExchangeLocations,
                    nextStageId,
                    locationFactory,
                    subStagePlan.withBucketToPartition(bucketToPartition),
                    nodeScheduler,
                    remoteTaskFactory,
                    session,
                    splitBatchSize,
                    partitioningCache,
                    nodePartitioningManager,
                    queryExecutor,
                    schedulerExecutor,
                    failureDetector,
                    nodeTaskMap,
                    stageSchedulers,
                    stageLinkages);
            stages.addAll(subTree);

            SqlStageExecution childStage = subTree.get(0);
            childStagesBuilder.add(childStage);
        }
Set<SqlStageExecution> childStages = childStagesBuilder.build();

至此所有SqlStageExecution生成完畢,下面看一下SqlStageExecution的簡(jiǎn)化構(gòu)成

private final StageStateMachine stateMachine;       //stage狀態(tài)監(jiān)聽器
private final RemoteTaskFactory remoteTaskFactory;  //生成Task的工廠類
private final NodeTaskMap nodeTaskMap;              //保存當(dāng)前stage分配的task和節(jié)點(diǎn)映射列表
private final Map<Node, Set<RemoteTask>> tasks = new ConcurrentHashMap<>();
private final AtomicInteger nextTaskId = new AtomicInteger();
private final Set<TaskId> allTasks = newConcurrentHashSet();
private final Set<TaskId> finishedTasks = newConcurrentHashSet();
private final Multimap<PlanNodeId, RemoteTask> sourceTasks = HashMultimap.create();

2.4生成分布式執(zhí)行計(jì)劃調(diào)度

2.4.1調(diào)度相關(guān)的服務(wù)類

先介紹一下上文提到的SqlQueryExecution中的NodeScheduler類

主要包括成員
InternalNodeManager nodeManager       //獲取存活的節(jié)點(diǎn)列表,保存在NodeMap里面,定時(shí)更新內(nèi)容,默認(rèn)5秒

主要包括方法
List<Node> selectNodes                //選取存活的Node列表
NodeSelector createNodeSelector       //提供了NodeSelector,其中包括各個(gè)stage中task分配的算法
ResettableRandomizedIterator<Node> randomizedNodes  //打亂給定的NodeMap

InternalNodeManager接口的定義為

public interface InternalNodeManager
{
    Set<Node> getNodes(NodeState state);                         //獲取指定狀態(tài)的節(jié)點(diǎn)列表
    Set<Node> getActiveConnectorNodes(ConnectorId connectorId);  //根據(jù)connectorId獲取節(jié)點(diǎn)列表
    Node getCurrentNode();       //獲取當(dāng)前節(jié)點(diǎn)信息
    Set<Node> getCoordinators(); //獲取Coordinator列表
    AllNodes getAllNodes();      //獲取所有的節(jié)點(diǎn)列表
    void refreshNodes();         //刷新節(jié)點(diǎn)的信息
}
//有DiscoveryNodeManager的實(shí)現(xiàn)類

NodeSelector接口定義為

public interface NodeSelector
{
    void lockDownNodes();
    List<Node> allNodes();                   //選擇所有的節(jié)點(diǎn)
    Node selectCurrentNode();                //選擇當(dāng)前節(jié)點(diǎn)
    List<Node> selectRandomNodes(int limit)  //選擇limit個(gè)隨機(jī)的節(jié)點(diǎn)
    List<Node> selectRandomNodes(int limit, Set<Node> excludedNodes);  //選擇limit個(gè)隨機(jī)的節(jié)點(diǎn)排除給定的節(jié)點(diǎn)
    SplitPlacementResult computeAssignments(Set<Split> splits, List<RemoteTask> existingTasks);
    SplitPlacementResult computeAssignments(Set<Split> splits, List<RemoteTask> existingTasks, NodePartitionMap partitioning);
}
//SimpleNodeSelector和TopologyAwareNodeSelector實(shí)現(xiàn)類 Presto會(huì)根據(jù)不同的網(wǎng)絡(luò)拓?fù)浣Y(jié)構(gòu)來(lái)選擇不同的NodeSelector

//在NodeScheduler的構(gòu)造方法中,只要不是 LEGACY網(wǎng)絡(luò) 就認(rèn)為使用了網(wǎng)絡(luò)拓?fù)洌琇EGACY網(wǎng)絡(luò)指的是歷史的網(wǎng)絡(luò),采用了非TCP/IP的網(wǎng)絡(luò)
this.useNetworkTopology = !config.getNetworkTopology().equals(NetworkTopologyType.LEGACY);

//在createNodeSelector方法中,實(shí)例化了NodeSelector
if (useNetworkTopology) {
    //所以只要你的網(wǎng)絡(luò)使用了TCP/IP協(xié)議,實(shí)例化的NodeSelector都是TopologyAwareNodeSelector
    return new TopologyAwareNodeSelector(
            nodeManager,
            nodeTaskMap,
            includeCoordinator,
            nodeMap,
            minCandidates,
            maxSplitsPerNode,
            maxPendingSplitsPerTask,
            topologicalSplitCounters,
            networkLocationSegmentNames,
            networkLocationCache);
}
else {
    return new SimpleNodeSelector(nodeManager, nodeTaskMap, includeCoordinator, nodeMap, minCandidates, maxSplitsPerNode, maxPendingSplitsPerTask);
}

Node的定義為

public interface Node
{
    HostAddress getHostAndPort();   //host和port
    URI getHttpUri();               //url
    String getNodeIdentifier();
    String getVersion();            //version
    boolean isCoordinator();        //是不是Coordinator
}

創(chuàng)建createNodeSelector過程

public NodeSelector createNodeSelector(ConnectorId connectorId)
  {
      //采用了谷歌的Supplier緩存技術(shù)
      Supplier<NodeMap> nodeMap = Suppliers.memoizeWithExpiration(() -> {
          ImmutableSetMultimap.Builder<HostAddress, Node> byHostAndPort = ImmutableSetMultimap.builder();
          ImmutableSetMultimap.Builder<InetAddress, Node> byHost = ImmutableSetMultimap.builder();
          ImmutableSetMultimap.Builder<NetworkLocation, Node> workersByNetworkPath = ImmutableSetMultimap.builder();
          Set<Node> nodes;
          if (connectorId != null) {
              nodes = nodeManager.getActiveConnectorNodes(connectorId);
          }
          else {
              nodes = nodeManager.getNodes(ACTIVE);
          }

          Set<String> coordinatorNodeIds = nodeManager.getCoordinators().stream()
                  .map(Node::getNodeIdentifier)
                  .collect(toImmutableSet());
          for (Node node : nodes) {
              if (useNetworkTopology && (includeCoordinator || !coordinatorNodeIds.contains(node.getNodeIdentifier()))) {
                  NetworkLocation location = networkLocationCache.get(node.getHostAndPort());
                  for (int i = 0; i <= location.getSegments().size(); i++) {
                      workersByNetworkPath.put(location.subLocation(0, i), node);
                  }
              }
              try {
                  byHostAndPort.put(node.getHostAndPort(), node);

                  InetAddress host = InetAddress.getByName(node.getHttpUri().getHost());
                  byHost.put(host, node);
              }
              catch (UnknownHostException e) {
                  // ignore
              }
          }

          return new NodeMap(byHostAndPort.build(), byHost.build(), workersByNetworkPath.build(), coordinatorNodeIds);
      }, 5, TimeUnit.SECONDS);
      if (useNetworkTopology) {
          return new TopologyAwareNodeSelector(
                  nodeManager,
                  nodeTaskMap,
                  includeCoordinator,
                  nodeMap,
                  minCandidates,
                  maxSplitsPerNode,
                  maxPendingSplitsPerTask,
                  topologicalSplitCounters,
                  networkLocationSegmentNames,
                  networkLocationCache);
      }
      else {
          return new SimpleNodeSelector(nodeManager, nodeTaskMap, includeCoordinator, nodeMap, minCandidates, maxSplitsPerNode, maxPendingSplitsPerTask);
      }
  }

2.4.2調(diào)度選擇策略

Single和Fixed Stage策略,比較簡(jiǎn)單,均為調(diào)用selectRandomNodes

2.4.3生成stage調(diào)度器和分配策略

承接2.3.2.3中間的代碼

Optional<int[]> bucketToPartition;
PartitioningHandle partitioningHandle = plan.getFragment().getPartitioning();

    // 根據(jù)不同的stage類型,創(chuàng)建不同的stage調(diào)度器
if (partitioningHandle.equals(SOURCE_DISTRIBUTION)) {
    // nodes are selected dynamically based on the constraints of the splits and the system load
    Entry<PlanNodeId, SplitSource> entry = Iterables.getOnlyElement(plan.getSplitSources().entrySet());
    PlanNodeId planNodeId = entry.getKey();
    SplitSource splitSource = entry.getValue();
    ConnectorId connectorId = splitSource.getConnectorId();
    if (isInternalSystemConnector(connectorId)) {
        connectorId = null;
    }
    //創(chuàng)建nodeSelector用來(lái)選擇執(zhí)行的節(jié)點(diǎn),主要是通過從nodeManager獲取
    NodeSelector nodeSelector = nodeScheduler.createNodeSelector(connectorId);
    //split動(dòng)態(tài)分配策略
    SplitPlacementPolicy placementPolicy = new DynamicSplitPlacementPolicy(nodeSelector, stage::getAllTasks);

    checkArgument(plan.getFragment().getPipelineExecutionStrategy() == UNGROUPED_EXECUTION);
  
    //source階段的stage選擇simpleSourcePartitionedScheduler
    stageSchedulers.put(stageId, simpleSourcePartitionedScheduler(stage, planNodeId, splitSource, placementPolicy, splitBatchSize));
    bucketToPartition = Optional.of(new int[1]);
}
else if (partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)) {
    bucketToPartition = Optional.of(new int[1]);
}
else {
    // nodes are pre determined by the nodePartitionMap
    NodePartitionMap nodePartitionMap = partitioningCache.apply(plan.getFragment().getPartitioning());
    long nodeCount = nodePartitionMap.getPartitionToNode().values().stream().distinct().count();
    OptionalInt concurrentLifespansPerTask = getConcurrentLifespansPerNode(session);

    Map<PlanNodeId, SplitSource> splitSources = plan.getSplitSources();
    //如果fixed階段的stage 分配到了SplitSource 則創(chuàng)建選擇FixedSourcePartitionedScheduler,該調(diào)度器里面自己創(chuàng)建了一個(gè)FixedSplitPlacementPolicy分配策略
    if (!splitSources.isEmpty()) {
        List<PlanNodeId> schedulingOrder = plan.getFragment().getPartitionedSources();
        List<ConnectorPartitionHandle> connectorPartitionHandles;
        switch (plan.getFragment().getPipelineExecutionStrategy()) {
            case GROUPED_EXECUTION:
                connectorPartitionHandles = nodePartitioningManager.listPartitionHandles(session, partitioningHandle);
                checkState(!ImmutableList.of(NOT_PARTITIONED).equals(connectorPartitionHandles));
                break;
            case UNGROUPED_EXECUTION:
                connectorPartitionHandles = ImmutableList.of(NOT_PARTITIONED);
                break;
            default:
                throw new UnsupportedOperationException();
        }
        stageSchedulers.put(stageId, new FixedSourcePartitionedScheduler(
                stage,
                splitSources,
                plan.getFragment().getPipelineExecutionStrategy(),
                schedulingOrder,
                nodePartitionMap,
                splitBatchSize,
                concurrentLifespansPerTask.isPresent() ? OptionalInt.of(toIntExact(concurrentLifespansPerTask.getAsInt() * nodeCount)) : OptionalInt.empty(),
                nodeScheduler.createNodeSelector(null),
                connectorPartitionHandles));
        bucketToPartition = Optional.of(nodePartitionMap.getBucketToPartition());
    }
    else {
        //存活的node列表
        Map<Integer, Node> partitionToNode = nodePartitionMap.getPartitionToNode();
        // todo this should asynchronously wait a standard timeout period before failing
        checkCondition(!partitionToNode.isEmpty(), NO_NODES_AVAILABLE, "No worker nodes available");

        //如果fixed階段的stage 沒有分配到SplitSource,則選擇FixedSourcePartitionedScheduler
        stageSchedulers.put(stageId, new FixedCountScheduler(stage, partitionToNode));
        bucketToPartition = Optional.of(nodePartitionMap.getBucketToPartition());
    }
}

2.4.4sqlQuery調(diào)度器開始調(diào)度

scheduler.start()啟動(dòng)sqlQueryScheduler的調(diào)度里面涉及到Task的調(diào)度

public void start()
{
    if (started.compareAndSet(false, true)) {
        executor.submit(this::schedule);
    }
}

方法引用調(diào)用schedule()

 
private void schedule()
{
    try (SetThreadName ignored = new SetThreadName("Query-%s", queryStateMachine.getQueryId())) {
        Set<StageId> completedStages = new HashSet<>();
        ExecutionSchedule executionSchedule = executionPolicy.createExecutionSchedule(stages.values());
        while (!executionSchedule.isFinished()) {
            List<ListenableFuture<?>> blockedStages = new ArrayList<>();
            for (SqlStageExecution stage : executionSchedule.getStagesToSchedule()) {
                stage.beginScheduling();

                // 調(diào)用每個(gè)stage上的stage調(diào)度器進(jìn)行task的調(diào)度
                // perform some scheduling work
                ScheduleResult result = stageSchedulers.get(stage.getStageId())
                        .schedule();

                // modify parent and children based on the results of the scheduling
                if (result.isFinished()) {
                    stage.schedulingComplete();
                }
                else if (!result.getBlocked().isDone()) {
                    blockedStages.add(result.getBlocked());
                }
                stageLinkages.get(stage.getStageId())
                        .processScheduleResults(stage.getState(), result.getNewTasks());
                schedulerStats.getSplitsScheduledPerIteration().add(result.getSplitsScheduled());
                if (result.getBlockedReason().isPresent()) {
                    switch (result.getBlockedReason().get()) {
                        case WRITER_SCALING:
                            // no-op
                            break;
                        case WAITING_FOR_SOURCE:
                            schedulerStats.getWaitingForSource().update(1);
                            break;
                        case SPLIT_QUEUES_FULL:
                            schedulerStats.getSplitQueuesFull().update(1);
                            break;
                        case MIXED_SPLIT_QUEUES_FULL_AND_WAITING_FOR_SOURCE:
                        case NO_ACTIVE_DRIVER_GROUP:
                            break;
                        default:
                            throw new UnsupportedOperationException("Unknown blocked reason: " + result.getBlockedReason().get());
                    }
                }
            }

            // make sure to update stage linkage at least once per loop to catch async state changes (e.g., partial cancel)
            for (SqlStageExecution stage : stages.values()) {
                if (!completedStages.contains(stage.getStageId()) && stage.getState().isDone()) {
                    stageLinkages.get(stage.getStageId())
                            .processScheduleResults(stage.getState(), ImmutableSet.of());
                    completedStages.add(stage.getStageId());
                }
            }

            // wait for a state change and then schedule again
            if (!blockedStages.isEmpty()) {
                try (TimeStat.BlockTimer timer = schedulerStats.getSleepTime().time()) {
                    tryGetFutureValue(whenAnyComplete(blockedStages), 1, SECONDS);
                }
                for (ListenableFuture<?> blockedStage : blockedStages) {
                    blockedStage.cancel(true);
                }
            }
        }

        for (SqlStageExecution stage : stages.values()) {
            StageState state = stage.getState();
            if (state != SCHEDULED && state != RUNNING && !state.isDone()) {
                throw new PrestoException(GENERIC_INTERNAL_ERROR, format("Scheduling is complete, but stage %s is in state %s", stage.getStageId(), state));
            }
        }
    }
}

未寫完善待續(xù)…

如有錯(cuò)誤請(qǐng)及時(shí)指出,共同進(jìn)步~

每天晚上更新~

如需轉(zhuǎn)載請(qǐng)附上本文鏈接,原創(chuàng)不易謝謝~

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容