5. sharding-jdbc源碼之結(jié)果合并

阿飛Javaer,轉(zhuǎn)載請注明原創(chuàng)出處,謝謝!

單表查詢之結(jié)果合并

接下來以執(zhí)行SELECT o.* FROM t_order o where o.user_id=10 order by o.order_id desc limit 2,3分析下面這段Java代碼是如何對結(jié)果進行合并的:

result = new ShardingResultSet(resultSets, new MergeEngine(resultSets, (SelectStatement) routeResult.getSqlStatement()).merge());

MergeEngine.merge()方法的源碼如下:

public ResultSetMerger merge() throws SQLException {
    selectStatement.setIndexForItems(columnLabelIndexMap);
    return decorate(build());
}

build()方法源碼如下:

private ResultSetMerger build() throws SQLException {
    // 說明:GroupBy***ResultSetMerger在第六篇文章單獨講解,所以此次分析的SQL條件中沒有g(shù)roup by
    if (!selectStatement.getGroupByItems().isEmpty() || !selectStatement.getAggregationSelectItems().isEmpty()) {
        if (selectStatement.isSameGroupByAndOrderByItems()) {
            return new GroupByStreamResultSetMerger(columnLabelIndexMap, resultSets, selectStatement);
        } else {
            return new GroupByMemoryResultSetMerger(columnLabelIndexMap, resultSets, selectStatement);
        }
    }
    // 如果select語句中有order by字段,那么需要OrderByStreamResultSetMerger對結(jié)果處理
    if (!selectStatement.getOrderByItems().isEmpty()) {
        return new OrderByStreamResultSetMerger(resultSets, selectStatement.getOrderByItems());
    }
    return new IteratorStreamResultSetMerger(resultSets);
}

根據(jù)這段代碼可知,其作用是根據(jù)sql語句選擇多個不同的ResultSetMerger對結(jié)果進行合并處理,ResultSetMerger實現(xiàn)有這幾種:GroupByStreamResultSetMerger,GroupByMemoryResultSetMerger,OrderByStreamResultSetMerger,IteratorStreamResultSetMerger,LimitDecoratorResultSetMerger;以測試SQLSELECT o.* FROM t_order o where o.user_id=10 order by o.order_id desc limit 2,3為例,沒有g(shù)roup by,但是有order by,所以使用到了OrderByStreamResultSetMerger和LimitDecoratorResultSetMerger對結(jié)果進行合并(GroupByStreamResultSetMerger&GroupByMemoryResultSetMerger后面單獨講解)

decorate()源碼如下:

private ResultSetMerger decorate(final ResultSetMerger resultSetMerger) throws SQLException {
    ResultSetMerger result = resultSetMerger;
    // 如果SQL語句中有l(wèi)imist,還需要LimitDecoratorResultSetMerger配合進行結(jié)果歸并;
    if (null != selectStatement.getLimit()) {
        result = new LimitDecoratorResultSetMerger(result, selectStatement.getLimit());
    }
    return result;
}

接下來將以執(zhí)行SQL:SELECT o.* FROM t_order o where o.user_id=10 order by o.order_id desc limit 2,3(該SQL會被改寫成SELECT o.* , o.order_id AS ORDER_BY_DERIVED_0 FROM t_order_0 o where o.user_id=? order by o.order_id desc limit 2,3)為例,一一講解OrderByStreamResultSetMerger,LimitDecoratorResultSetMerger和IteratorStreamResultSetMerger,了解這幾個ResultSetMerger的原理;

OrderByStreamResultSetMerger

OrderByStreamResultSetMerger的核心源碼如下:

private final Queue<OrderByValue> orderByValuesQueue;

public OrderByStreamResultSetMerger(final List<ResultSet> resultSets, final List<OrderItem> orderByItems) throws SQLException {
    // sql中order by列的信息,實例sql是order by order_id desc,即此處就是order_id
    this.orderByItems = orderByItems;
    // 初始化一個優(yōu)先級隊列,優(yōu)先級隊列中的元素會根據(jù)OrderByValue中compareTo()方法排序,并且SQL重寫后發(fā)送到多少個目標實際表,List<ResultSet>的size就有多大,Queue的capacity就有多大;
    this.orderByValuesQueue = new PriorityQueue<>(resultSets.size());
    // 將結(jié)果壓入隊列中
    orderResultSetsToQueue(resultSets);
    isFirstNext = true;
}

private void orderResultSetsToQueue(final List<ResultSet> resultSets) throws SQLException {
    // 遍歷resultSets--在多少個目標實際表上執(zhí)行SQL,該集合的size就有多大
    for (ResultSet each : resultSets) {
        // 將ResultSet和排序列信息封裝成一個OrderByValue類型
        OrderByValue orderByValue = new OrderByValue(each, orderByItems);
        // 如果值存在,那么壓入隊列中
        if (orderByValue.next()) {
            orderByValuesQueue.offer(orderByValue);
        }
    }
    // 重置currentResultSet的位置:如果隊列不為空,那么將隊列的頂部(peek)位置設(shè)置為currentResultSet的位置
    setCurrentResultSet(orderByValuesQueue.isEmpty() ? resultSets.get(0) : orderByValuesQueue.peek().getResultSet());
}

@Override
public boolean next() throws SQLException {
    // 調(diào)用next()判斷是否還有值, 如果隊列為空, 表示沒有任何值, 那么直接返回false
    if (orderByValuesQueue.isEmpty()) {
        return false;
    }
    // 如果隊列不為空, 那么第一次一定返回true;即有結(jié)果可?。ㄇ覍sFirstNext置為false,表示接下來的請求都不是第一次請求next()方法)
    if (isFirstNext) {
        isFirstNext = false;
        return true;
    }
    // 從隊列中彈出第一個元素(因為是優(yōu)先級隊列,所以poll()返回的值,就是此次要取的值)
    OrderByValue firstOrderByValue = orderByValuesQueue.poll();
    // 如果它的next()存在,那么將它的next()再添加到隊列中
    if (firstOrderByValue.next()) {
        orderByValuesQueue.offer(firstOrderByValue);
    }
    // 隊列中所有元素全部處理完后就返回false
    if (orderByValuesQueue.isEmpty()) {
        return false;
    }
    // 再次重置currentResultSet的位置為隊列的頂部位置;
    setCurrentResultSet(orderByValuesQueue.peek().getResultSet());
    return true;
}

繼續(xù)深入剖析:這段代碼初看可能有點繞,假設(shè)運行SQLSELECT o.* FROM t_order o where o.user_id=10 order by o.order_id desc limit 3會分發(fā)到兩個目標實際表,且第一個實際表返回的結(jié)果是1,3,5,7,9;第二個實際表返回的結(jié)果是2,4,6,8,10;那么,經(jīng)過OrderByStreamResultSetMerger的構(gòu)造方法中的orderResultSetsToQueue()方法后,Queue<OrderByValue> orderByValuesQueue中包含兩個OrderByValue,一個是10,一個是9;接下來取值運行過程如下:

  1. 取得10,并且10的next()是8,然后執(zhí)行orderByValuesQueue.offer(8);,這時候orderByValuesQueue中包含8和9;
  2. 取得9,并且9的next()是7,然后執(zhí)行orderByValuesQueue.offer(7);,這時候orderByValuesQueue中包含7和8;
  3. 取得8,并且8的next()是6,然后執(zhí)行orderByValuesQueue.offer(6);,這時候orderByValuesQueue中包含7和6;
    取值數(shù)量已經(jīng)達到limit 3的限制(源碼在LimitDecoratorResultSetMerger中的next()方法中),退出;

這段代碼運行示意圖如下所示:


image-1.png

image-2.png

LimitDecoratorResultSetMerger

LimitDecoratorResultSetMerger核心源碼如下:

public LimitDecoratorResultSetMerger(final ResultSetMerger resultSetMerger, final Limit limit) throws SQLException {
    super(resultSetMerger);
    // limit賦值(Limit對象包括limit m,n中的m和n兩個值)
    this.limit = limit;
    // 判斷是否會跳過所有的結(jié)果項,即判斷是否有符合條件的結(jié)果
    skipAll = skipOffset();
}

private boolean skipOffset() throws SQLException {
    // 假定limit.getOffsetValue()就是offset,實例sql中為limit 2,3,所以offset=2
    for (int i = 0; i < limit.getOffsetValue(); i++) {
        // 嘗試從OrderByStreamResultSetMerger生成的優(yōu)先級隊列中跳過offset個元素,如果.next()一直為true,表示有足夠符合條件的結(jié)果,那么返回false;否則沒有足夠符合條件的結(jié)果,那么返回true;即skilAll=true就表示跳過了所有沒有符合條件的結(jié)果;
        if (!getResultSetMerger().next()) {
            return true;
        }
    }
    // limit m,n的sql會被重寫為limit 0, m+n,所以limit.isRowCountRewriteFlag()為true,rowNumber的值為0;
    rowNumber = limit.isRowCountRewriteFlag() ? 0 : limit.getOffsetValue();
    return false;
}

@Override
public boolean next() throws SQLException {
    // 如果skipAll為true,即跳過所有,表示沒有任何符合條件的值,那么返回false
    if (skipAll) {
        return false;
    }
    if (limit.getRowCountValue() > -1) {
        // 每次next()獲取值后,rowNumber自增,當自增rowCountValue次后,就不能再往下繼續(xù)取值了,因為條件limit 2,3(rowCountValue=3)限制了
        return ++rowNumber <= limit.getRowCountValue() && getResultSetMerger().next();
    }
    return getResultSetMerger().next();
}

IteratorStreamResultSetMerger

構(gòu)造方法核心源碼:

private final Iterator<ResultSet> resultSets;

public IteratorStreamResultSetMerger(final List<ResultSet> resultSets) {
    // 將List<ResultSet>改成Iterator<ResultSet>,方便接下來迭代取得結(jié)果;
    this.resultSets = resultSets.iterator();
    // 重置currentResultSet
    setCurrentResultSet(this.resultSets.next());
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

  • 1. Java基礎(chǔ)部分 基礎(chǔ)部分的順序:基本語法,類相關(guān)的語法,內(nèi)部類的語法,繼承相關(guān)的語法,異常的語法,線程的語...
    子非魚_t_閱讀 34,854評論 18 399
  • 一. Java基礎(chǔ)部分.................................................
    wy_sure閱讀 4,036評論 0 11
  • 50個常用的sql語句Student(S#,Sname,Sage,Ssex) 學生表Course(C#,Cname...
    哈哈海閱讀 1,339評論 0 7
  • 木犀漸濃桂影深 可憐清香最先聞 浮云略日何所至 一樹秋風落九塵
    迷失的指南針閱讀 247評論 0 4
  • 現(xiàn)實生活中,我們天天掙錢花錢,錢可以說是離我們最近的東西。 可很多人對金錢連沒一個正確的認識都沒有,說什么“金錢是...
    化蛹為碟閱讀 852評論 0 0

友情鏈接更多精彩內(nèi)容