Java開源框架中的設(shè)計(jì)模式以及應(yīng)用場(chǎng)景

前言

設(shè)計(jì)模式是軟件設(shè)計(jì)中常見(jiàn)問(wèn)題的典型解決方案,你可以通過(guò)對(duì)其進(jìn)行定制來(lái)解決代碼中的特定設(shè)計(jì)問(wèn)題。

關(guān)于設(shè)計(jì)模式,網(wǎng)上有很多講解。但大部分都是Demo示例,看完有可能還是不知道怎么用。

本文筆者將從設(shè)計(jì)模式入手,看一看在優(yōu)秀的Java框架/中間件產(chǎn)品中,不同的設(shè)計(jì)模式應(yīng)用場(chǎng)景在哪里。

一、單例模式

單例模式是 Java 中最簡(jiǎn)單的設(shè)計(jì)模式之一,它提供了一種創(chuàng)建對(duì)象的最佳方式。這種模式涉及到一個(gè)單一的類,該類負(fù)責(zé)創(chuàng)建自己的對(duì)象,同時(shí)確保只有單個(gè)對(duì)象被創(chuàng)建。這個(gè)類提供了一種訪問(wèn)其唯一的對(duì)象的方式,可以直接訪問(wèn),不需要實(shí)例化該類的對(duì)象。

單例模式雖然很簡(jiǎn)單,但它的花樣一點(diǎn)都不少,我們一一來(lái)看。

1、餓漢式

餓漢式,顧名思義,就是我很餓,迫不及待。不管有沒(méi)有人用,我先創(chuàng)建了再說(shuō)。

比如在Dubbo中的這段代碼,創(chuàng)建一個(gè)配置管理器。

public class ConfigManager {
    private static final ConfigManager configManager = new ConfigManager(); 
    private ConfigManager() {}
    public static ConfigManager getInstance() {
        return configManager;
    }
}

又或者在RocketMQ中,創(chuàng)建一個(gè)MQ客戶端實(shí)例的時(shí)候。

public class MQClientManager {

    private static MQClientManager instance = new MQClientManager();
    private MQClientManager() {}
    public static MQClientManager getInstance() {
        return instance;
    }
}

2、懶漢式

懶漢式是對(duì)應(yīng)餓漢式而言的。它旨在第一次調(diào)用才初始化,避免內(nèi)存浪費(fèi)。但為了線程安全和性能,一般都會(huì)使用雙重檢查鎖的方式來(lái)創(chuàng)建。

我們來(lái)看Seata框架中,通過(guò)這種方式來(lái)創(chuàng)建一個(gè)配置類。

public class ConfigurationFactory{

    private static volatile Configuration CONFIG_INSTANCE = null;
    public static Configuration getInstance() {
        if (CONFIG_INSTANCE == null) {
            synchronized (Configuration.class) {
                if (CONFIG_INSTANCE == null) {
                    CONFIG_INSTANCE = buildConfiguration();
                }
            }
        }
        return CONFIG_INSTANCE;
    }
}

3、靜態(tài)內(nèi)部類

可以看到,通過(guò)雙重檢查鎖的方式來(lái)創(chuàng)建單例對(duì)象,還是比較復(fù)雜的。又是加鎖,又是判斷兩次,還需要加volatile修飾的。

使用靜態(tài)內(nèi)部類的方式,可以達(dá)到雙重檢查鎖相同的功效,但實(shí)現(xiàn)上簡(jiǎn)單了。

在Seata框架中,創(chuàng)建RM事件處理程序器的時(shí)候,就使用了靜態(tài)內(nèi)部類的方式來(lái)創(chuàng)建單例對(duì)象。

public class DefaultRMHandler extends AbstractRMHandler{

    protected DefaultRMHandler() {
        initRMHandlers();
    }
    private static class SingletonHolder {
        private static AbstractRMHandler INSTANCE = new DefaultRMHandler();
    }
    public static AbstractRMHandler get() {
        return DefaultRMHandler.SingletonHolder.INSTANCE;
    }
}

還有可以通過(guò)枚舉的方式來(lái)創(chuàng)建單例對(duì)象,但這種方式并沒(méi)有被廣泛采用,至少筆者在常見(jiàn)的開源框架中沒(méi)見(jiàn)過(guò),所以就不再列舉。

有人說(shuō),餓漢式的單例模式不好,不能做到延遲加載,浪費(fèi)內(nèi)存。但筆者認(rèn)為似乎過(guò)于吹毛求疵,事實(shí)上很多開源框架中,用的最多的就是這種方式。

如果明確希望實(shí)現(xiàn)懶加載效果時(shí),可以考慮用靜態(tài)內(nèi)部類的方式;如果還有其他特殊的需求,比如創(chuàng)建對(duì)象的過(guò)程比較繁瑣,可以用雙重檢查鎖的方式。

二、工廠模式

工廠模式是 Java 中最常用的設(shè)計(jì)模式之一。這種類型的設(shè)計(jì)模式屬于創(chuàng)建型模式,它提供了一種創(chuàng)建對(duì)象的最佳方式。

簡(jiǎn)單來(lái)說(shuō),在工廠模式中,就是代替new實(shí)例化具體類的一種模式。

1、簡(jiǎn)單工廠

簡(jiǎn)單工廠,的確比較簡(jiǎn)單,它的作用就是把對(duì)象的創(chuàng)建放到一個(gè)工廠類中,通過(guò)參數(shù)來(lái)創(chuàng)建不同的對(duì)象。

在分布式事務(wù)框架Seata中,如果發(fā)生異常,則需要進(jìn)行二階段回滾。

它的過(guò)程是,通過(guò)事務(wù)id找到undoLog記錄,然后解析里面的數(shù)據(jù)生成SQL,將一階段執(zhí)行的SQL給撤銷掉。

問(wèn)題是SQL的種類包含了比如INSERT、UPDATE、DELETE,所以它們反解析的過(guò)程也不一樣,就需要不同的執(zhí)行器去解析。

在Seata中,有一個(gè)抽象的撤銷執(zhí)行器,可以生成一條SQL。

public abstract class AbstractUndoExecutor{
    //生成撤銷SQL
    protected abstract String buildUndoSQL();
}

然后有一個(gè)獲取撤銷執(zhí)行器的工廠,根據(jù)SQL的類型,創(chuàng)建不同類型的執(zhí)行器并返回。

public class UndoExecutorFactory {

    public static AbstractUndoExecutor getUndoExecutor(String dbType, SQLUndoLog sqlUndoLog) {
        switch (sqlUndoLog.getSqlType()) {
            case INSERT:
                return new MySQLUndoInsertExecutor(sqlUndoLog);
            case UPDATE:
                return new MySQLUndoUpdateExecutor(sqlUndoLog);
            case DELETE:
                return new MySQLUndoDeleteExecutor(sqlUndoLog);
            default:
                throw new ShouldNeverHappenException();
        }
    }
}   

使用的時(shí)候,直接通過(guò)工廠類獲取執(zhí)行器。

AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(dataSourceProxy.getDbType(),sqlUndoLog);
undoExecutor.executeOn(conn);

簡(jiǎn)單工廠模式的優(yōu)點(diǎn),想必各位都能領(lǐng)會(huì),我們不再贅述。但它還有個(gè)小小的缺點(diǎn):

一旦有了新的實(shí)現(xiàn)類,就需要修改工廠實(shí)現(xiàn),有可能造成工廠邏輯過(guò)于復(fù)雜,不利于系統(tǒng)的擴(kuò)展和維護(hù)。

2、工廠方法

工廠方法模式解決了上面那個(gè)問(wèn)題。它可以創(chuàng)建一個(gè)工廠接口和多個(gè)工廠實(shí)現(xiàn)類,這樣如果增加新的功能,只需要添加新的工廠類就可以,不需要修改之前的代碼。

另外,工廠方法模式還可以和模板方法模式結(jié)合一起,將他們共同的基礎(chǔ)邏輯抽取到父類中,其它的交給子類去實(shí)現(xiàn)。

在Dubbo中,有一個(gè)關(guān)于緩存的設(shè)計(jì)完美的體現(xiàn)了工廠方法模式+模板方法模式。

首先,有一個(gè)緩存的接口,它提供了設(shè)置緩存和獲取緩存兩個(gè)方法。

public interface Cache {
    void put(Object key, Object value);
    Object get(Object key);
}

然后呢,還有一個(gè)緩存工廠,它返回一個(gè)緩存的實(shí)現(xiàn)。

public interface CacheFactory {
    Cache getCache(URL url, Invocation invocation);
}

由于結(jié)合了模板方法模式,所以Dubbo又搞了個(gè)抽象的緩存工廠類,它實(shí)現(xiàn)了緩存工廠的接口。

public abstract class AbstractCacheFactory implements CacheFactory {
    
    //具體的緩存實(shí)現(xiàn)類
    private final ConcurrentMap<String, Cache> caches = new ConcurrentHashMap<String, Cache>();
    
    @Override
    public Cache getCache(URL url, Invocation invocation) {
        url = url.addParameter(Constants.METHOD_KEY, invocation.getMethodName());
        String key = url.toFullString();
        Cache cache = caches.get(key);
        if (cache == null) {
            //創(chuàng)建緩存實(shí)現(xiàn)類,交給子類實(shí)現(xiàn)
            caches.put(key, createCache(url));
            cache = caches.get(key);
        }
        return cache;
    }
    //抽象方法,交給子類實(shí)現(xiàn)
    protected abstract Cache createCache(URL url);
}

在這里,公共的邏輯就是通過(guò)getCahce()創(chuàng)建緩存實(shí)現(xiàn)類,那具體創(chuàng)建什么樣的緩存實(shí)現(xiàn)類,就由子類去決定。

所以,每個(gè)子類都是一個(gè)個(gè)具體的緩存工廠類,比如包括:

ExpiringCacheFactory、JCacheFactory、LruCacheFactory、ThreadLocalCacheFactory。

這些工廠類,只有一個(gè)方法,就是創(chuàng)建具體的緩存實(shí)現(xiàn)類。

public class ThreadLocalCacheFactory extends AbstractCacheFactory {
    @Override
    protected Cache createCache(URL url) {
        return new ThreadLocalCache(url);
    }
}

這里的ThreadLocalCache就是具體的緩存實(shí)現(xiàn)類,比如它是通過(guò)ThreadLocal來(lái)實(shí)現(xiàn)緩存功能。

public class ThreadLocalCache implements Cache {

    private final ThreadLocal<Map<Object, Object>> store;

    public ThreadLocalCache(URL url) {
        this.store = new ThreadLocal<Map<Object, Object>>() {
            @Override
            protected Map<Object, Object> initialValue() {
                return new HashMap<Object, Object>();
            }
        };
    }
    @Override
    public void put(Object key, Object value) {
        store.get().put(key, value);
    }
    @Override
    public Object get(Object key) {
        return store.get().get(key);
    }
}

那在客戶端使用的時(shí)候,還是通過(guò)工廠來(lái)獲取緩存對(duì)象。

public static void main(String[] args) {
    URL url = URL.valueOf("http://localhost:8080/cache=jacache&.cache.write.expire=1");
    Invocation invocation = new RpcInvocation();
    CacheFactory cacheFactory = new ThreadLocalCacheFactory();
    Cache cache = cacheFactory.getCache(url, invocation);
    cache.put("java","java");
    System.out.println(cache.get("java"));
}

這樣做的好處有兩點(diǎn)。

第一,如果增加新的緩存實(shí)現(xiàn),只要添加一個(gè)新的緩存工廠類就可以,別的都無(wú)需改動(dòng)。

第二,通過(guò)模板方法模式,封裝不變部分,擴(kuò)展可變部分。 提取公共代碼,便于維護(hù)。

另外,在Dubbo中,注冊(cè)中心的獲取也是通過(guò)工廠方法來(lái)實(shí)現(xiàn)的。

3、抽象工廠

抽象工廠模式,它能創(chuàng)建一系列相關(guān)的對(duì)象, 而無(wú)需指定其具體類。

工廠方法模式和抽象工廠模式,它們之間最大的區(qū)別在于:

  • 工廠方法模式只有一個(gè)抽象產(chǎn)品類,具體工廠類只能創(chuàng)建一個(gè)具體產(chǎn)品類的實(shí)例;
  • 抽象工廠模式有多個(gè)抽象產(chǎn)品類,具體工廠類可以創(chuàng)建多個(gè)具體產(chǎn)品類的實(shí)例。

我們拿上面緩存的例子來(lái)繼續(xù)往下說(shuō)。

如果我們現(xiàn)在有一個(gè)數(shù)據(jù)訪問(wèn)程序,需要同時(shí)操作緩存和數(shù)據(jù)庫(kù),那就需要多個(gè)抽象產(chǎn)品和多個(gè)具體產(chǎn)品實(shí)現(xiàn)。

緩存相關(guān)的產(chǎn)品類都已經(jīng)有了,我們接著來(lái)創(chuàng)建數(shù)據(jù)庫(kù)相關(guān)的產(chǎn)品實(shí)現(xiàn)。

首先,有一個(gè)數(shù)據(jù)庫(kù)接口,它是抽象產(chǎn)品類。

public interface DataBase {
    void insert(Object tableName, Object record);
    Object select(Object tableName);
}

然后,我們創(chuàng)建兩個(gè)具體產(chǎn)品類MysqlDataBase和OracleDataBase。

public class MysqlDataBase implements DataBase{
    Map<Object,Object> mysqlDb = new HashMap<>();
    @Override
    public void insert(Object tableName, Object record) {
        mysqlDb.put(tableName,record);
    }
    @Override
    public Object select(Object tableName) {
        return mysqlDb.get(tableName);
    }
}

public class OracleDataBase implements DataBase {
    Map<Object,Object> oracleDb = new HashMap<>();
    @Override
    public void insert(Object tableName, Object record) {
        oracleDb.put(tableName,record);
    }
    @Override
    public Object select(Object tableName) {
        return oracleDb.get(tableName);
    }
}

其次,創(chuàng)建抽象的工廠類,它可以返回一個(gè)緩存對(duì)象和數(shù)據(jù)庫(kù)對(duì)象。

public interface DataAccessFactory {
    Cache getCache(URL url);
    DataBase getDb();
}

最后是具體的工廠類,可以根據(jù)實(shí)際的需求,任意組合每一個(gè)具體的產(chǎn)品。

比如我們需要一個(gè)基于ThreadLocal的緩存實(shí)現(xiàn)和基于Mysql的數(shù)據(jù)庫(kù)對(duì)象。

public class DataAccessFactory1 implements DataAccessFactory {
    @Override
    public Cache getCache(URL url) {
        return new ThreadLocalCache(url);
    }
    @Override
    public DataBase getDb() {
        return new MysqlDataBase();
    }
}

如果需要一個(gè)基于Lru的緩存實(shí)現(xiàn)和基于Oracle的數(shù)據(jù)庫(kù)對(duì)象。


public class DataAccessFactory2 implements DataAccessFactory {
    @Override
    public Cache getCache(URL url) {
        return new LruCache(url);
    }
    @Override
    public DataBase getDb() {
        return new OracleDataBase();
    }
}

可以看到,抽象工廠模式隔離了具體類的生成,使得客戶并不需要知道什么被創(chuàng)建。由于這種隔離,更換一個(gè)具體工廠就變得相對(duì)容易,所有的具體工廠都實(shí)現(xiàn)了抽象工廠中定義的那些公共接口,因此只需改變具體工廠的實(shí)例,就可以在某種程度上改變整個(gè)軟件系統(tǒng)的行為。

三、模板方法模式

在模板模式中,一個(gè)抽象類公開定義了執(zhí)行它的方法的方式/模板。它的子類可以按需要重寫方法實(shí)現(xiàn),但調(diào)用將以抽象類中定義的方式進(jìn)行。

簡(jiǎn)單來(lái)說(shuō),有多個(gè)子類共有的方法,且邏輯相同,可以考慮作為模板方法。

在上面Dubbo緩存的例子中,我們已經(jīng)看到了模板方法模式的應(yīng)用。但那個(gè)是和工廠方法模式結(jié)合在一塊的,我們?cè)賳为?dú)找找模板方法模式的應(yīng)用。

我們知道,當(dāng)我們的Dubbo應(yīng)用出現(xiàn)多個(gè)服務(wù)提供者時(shí),服務(wù)消費(fèi)者需要通過(guò)負(fù)載均衡算法,選擇其中一個(gè)服務(wù)來(lái)進(jìn)行調(diào)用。

首先,有一個(gè)LoadBalance接口,返回一個(gè)Invoker。

public interface LoadBalance {
    <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
}

然后定義一個(gè)抽象類,AbstractLoadBalance,實(shí)現(xiàn)LoadBalance接口。

public abstract class AbstractLoadBalance implements LoadBalance {

    @Override
    public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        if (invokers == null || invokers.isEmpty()) {
            return null;
        }
        if (invokers.size() == 1) {
            return invokers.get(0);
        }
        return doSelect(invokers, url, invocation);
    }
    //抽象方法,由子類選擇一個(gè)Invoker
    protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
}

這里的公共邏輯就是兩個(gè)判斷,判斷invokers集合是否為空或者是否只有一個(gè)實(shí)例。如果是的話,就無(wú)需調(diào)用子類,直接返回就好了。

具體的負(fù)載均衡實(shí)現(xiàn)有四個(gè):

  • 基于權(quán)重隨機(jī)算法的 RandomLoadBalance
  • 基于最少活躍調(diào)用數(shù)算法的 LeastActiveLoadBalance
  • 基于 hash 一致性的 ConsistentHashLoadBalance
  • 基于加權(quán)輪詢算法的 RoundRobinLoadBalance
public class RandomLoadBalance extends AbstractLoadBalance {
    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        //省略邏輯....
        return invokers.get(ThreadLocalRandom.current().nextInt(length));
    }
}

它們根據(jù)不同的算法實(shí)現(xiàn),來(lái)返回一個(gè)具體的Invoker對(duì)象。

四、構(gòu)造器模式

構(gòu)造器模式使用多個(gè)簡(jiǎn)單的對(duì)象一步一步構(gòu)建成一個(gè)復(fù)雜的對(duì)象。這種類型的設(shè)計(jì)模式屬于創(chuàng)建型模式,它提供了一種創(chuàng)建對(duì)象的最佳方式。

這種模式,常見(jiàn)于在構(gòu)建一個(gè)復(fù)雜的對(duì)象,里面可能包含一些業(yè)務(wù)邏輯,比如檢查,屬性轉(zhuǎn)換等。如果都在客戶端手動(dòng)去設(shè)置,那么會(huì)產(chǎn)生大量的冗余代碼。那么這時(shí)候,我們就可以考慮使用構(gòu)造器模式。

比如,在Mybatis中,MappedStatement的創(chuàng)建過(guò)程就使用了構(gòu)造器模式。

我們知道,XML文件中的每一個(gè)SQL標(biāo)簽就要生成一個(gè)MappedStatement對(duì)象,它里面包含很多個(gè)屬性,我們要構(gòu)造的對(duì)象也是它。

public final class MappedStatement {
    private String resource;
    private Configuration configuration;
    private String id;
    private SqlSource sqlSource;
    private ParameterMap parameterMap;
    private List<ResultMap> resultMaps;
    //.....省略大部分屬性
}

然后有一個(gè)內(nèi)部類Builder,它負(fù)責(zé)完成MappedStatement對(duì)象的構(gòu)造。

首先,這個(gè)Builder類,通過(guò)默認(rèn)的構(gòu)造函數(shù),先完成對(duì)MappedStatement對(duì)象,部分的構(gòu)造。

public static class Builder {

    private MappedStatement mappedStatement = new MappedStatement();
    
    public Builder(Configuration configuration, String id, SqlSource sqlSource, SqlCommandType sqlCommandType) {
        mappedStatement.configuration = configuration;
        mappedStatement.id = id;
        mappedStatement.sqlSource = sqlSource;
        mappedStatement.statementType = StatementType.PREPARED;
        mappedStatement.resultSetType = ResultSetType.DEFAULT;
        //.....省略大部分過(guò)程
    }
}

然后,通過(guò)一系列方法,可以設(shè)置特定的屬性,并返回這個(gè)Builder類,這里的方法適合處理一些業(yè)務(wù)邏輯。

public static class Builder {

    public Builder parameterMap(ParameterMap parameterMap) {
      mappedStatement.parameterMap = parameterMap;
      return this;
    }
    
    public Builder resultMaps(List<ResultMap> resultMaps) {
      mappedStatement.resultMaps = resultMaps;
      for (ResultMap resultMap : resultMaps) {
        mappedStatement.hasNestedResultMaps = mappedStatement.hasNestedResultMaps || resultMap.hasNestedResultMaps();
      }
      return this;
    }
    
    public Builder statementType(StatementType statementType) {
      mappedStatement.statementType = statementType;
      return this;
    }

    public Builder resultSetType(ResultSetType resultSetType) {
      mappedStatement.resultSetType = resultSetType == null ? ResultSetType.DEFAULT : resultSetType;
      return this;
    }
}

最后呢,就是提供一個(gè)build方法,返回構(gòu)建完成的對(duì)象就好了。

public MappedStatement build() {
    assert mappedStatement.configuration != null;
    assert mappedStatement.id != null;
    assert mappedStatement.sqlSource != null;
    assert mappedStatement.lang != null;
    mappedStatement.resultMaps = Collections.unmodifiableList(mappedStatement.resultMaps);
    return mappedStatement;
}

在客戶端使用的時(shí)候,先創(chuàng)建一個(gè) Builder,然后鏈?zhǔn)降恼{(diào)用一堆方法,最后再調(diào)用一次 build() 方法,我們需要的對(duì)象就有了,這就是構(gòu)造器模式的應(yīng)用。

MappedStatement.Builder statementBuilder = new MappedStatement.Builder(configuration, id, sqlSource, sqlCommandType)
    .resource(resource)
    .fetchSize(fetchSize)
    .timeout(timeout)
    .statementType(statementType)
    .keyGenerator(keyGenerator)
    .keyProperty(keyProperty)
    .keyColumn(keyColumn)
    .databaseId(databaseId)
    .lang(lang)
    .resultOrdered(resultOrdered)
    .resultSets(resultSets)
    .resultMaps(getStatementResultMaps(resultMap, resultType, id))
    .resultSetType(resultSetType)
    .flushCacheRequired(valueOrDefault(flushCache, !isSelect))
    .useCache(valueOrDefault(useCache, isSelect))
    .cache(currentCache);

ParameterMap statementParameterMap = getStatementParameterMap(parameterMap, parameterType, id);
MappedStatement statement = statementBuilder.build();
configuration.addMappedStatement(statement);
return statement;

五、適配器模式

適配器模式是作為兩個(gè)不兼容的接口之間的橋梁。這種類型的設(shè)計(jì)模式屬于結(jié)構(gòu)型模式,它結(jié)合了兩個(gè)獨(dú)立接口的功能。

適配器模式一般用于屏蔽業(yè)務(wù)邏輯與第三方服務(wù)的交互,或者是新老接口之間的差異。

我們知道,在Dubbo中,所有的數(shù)據(jù)都是通過(guò)Netty來(lái)負(fù)責(zé)傳輸?shù)?,然后這就涉及了消息編解碼的問(wèn)題。

所以,首先它有一個(gè)編解碼器的接口,負(fù)責(zé)編碼和解碼。

@SPI
public interface Codec2 {

    @Adaptive({Constants.CODEC_KEY})
    void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException;
    
    @Adaptive({Constants.CODEC_KEY})
    Object decode(Channel channel, ChannelBuffer buffer) throws IOException;
    
    enum DecodeResult {
        NEED_MORE_INPUT, SKIP_SOME_INPUT
    }
}

然后,有幾個(gè)實(shí)現(xiàn)類,比如DubboCountCodec、DubboCodec、ExchangeCodec等。

但是,當(dāng)我們打開這些類的時(shí)候,就會(huì)發(fā)現(xiàn),他們都是Dubbo中普通的類,只是實(shí)現(xiàn)了Codec2接口,其實(shí)不能直接作用于Netty編解碼。

這是因?yàn)?,Netty編解碼需要實(shí)現(xiàn)ChannelHandler接口,這樣才會(huì)被聲明成Netty的處理組件。比如像MessageToByteEncoder、ByteToMessageDecoder那樣。

鑒于此,Dubbo搞了一個(gè)適配器,專門來(lái)適配編解碼器接口。

final public class NettyCodecAdapter {

    private final ChannelHandler encoder = new InternalEncoder();
    private final ChannelHandler decoder = new InternalDecoder();
    private final Codec2 codec;
    private final URL url;
    private final org.apache.dubbo.remoting.ChannelHandler handler;
    
    public NettyCodecAdapter(Codec2 codec, URL url, org.apache.dubbo.remoting.ChannelHandler handler) {
        this.codec = codec;
        this.url = url;
        this.handler = handler;
    }
    public ChannelHandler getEncoder() {
        return encoder;
    }
    public ChannelHandler getDecoder() {
        return decoder;
    }
    
    private class InternalEncoder extends MessageToByteEncoder {
        @Override
        protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
            org.apache.dubbo.remoting.buffer.ChannelBuffer buffer = new NettyBackedChannelBuffer(out);
            Channel ch = ctx.channel();
            NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
            codec.encode(channel, buffer, msg);
        }
    }
    private class InternalDecoder extends ByteToMessageDecoder {
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
            ChannelBuffer message = new NettyBackedChannelBuffer(input);
            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
            //解碼對(duì)象
            codec.decode(channel, message);
            //省略部分代碼...
        }
    }
}

上面的代碼中,我們看到,NettyCodecAdapter類適配的是Codec2接口,通過(guò)構(gòu)造函數(shù)傳遞實(shí)現(xiàn)類,然后定義了內(nèi)部的編碼器實(shí)現(xiàn)和解碼器實(shí)現(xiàn),同時(shí)它們都是ChannelHandler。

這樣的話,在內(nèi)部類里面的編碼和解碼邏輯,真正調(diào)用的還是Codec2接口。

最后我們?cè)賮?lái)看看,該適配器的調(diào)用方式。

//通過(guò)SPI方式獲取編解碼器的實(shí)現(xiàn)類,比如這里是DubboCountCodec
Codec2 codec = ExtensionLoader.getExtensionLoader(Codec2.class).getExtension("dubbo");
URL url = new URL("dubbo", "localhost", 22226);
//創(chuàng)建適配器
NettyCodecAdapter adapter = new NettyCodecAdapter(codec, url, NettyClient.this);
//向ChannelPipeline中添加編解碼處理器
ch.pipeline()
    .addLast("decoder", adapter.getDecoder())
    .addLast("encoder", adapter.getEncoder())

以上,就是Dubbo中關(guān)于編解碼器對(duì)于適配器模式的應(yīng)用。

六、責(zé)任鏈模式

責(zé)任鏈模式為請(qǐng)求創(chuàng)建了一個(gè)接收者對(duì)象的鏈。允許你將請(qǐng)求沿著處理者鏈進(jìn)行發(fā)送。收到請(qǐng)求后,每個(gè)處理者均可對(duì)請(qǐng)求進(jìn)行處理, 或?qū)⑵鋫鬟f給鏈上的下個(gè)處理者。

我們來(lái)看一個(gè)Netty中的例子。我們知道,在Netty中服務(wù)端處理消息,就要添加一個(gè)或多個(gè)ChannelHandler。那么,承載這些ChannelHandler的就是ChannelPipeline,它的實(shí)現(xiàn)過(guò)程就體現(xiàn)了責(zé)任鏈模式的應(yīng)用。

ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
    protected void initChannel(NioSocketChannel channel) {
        channel.pipeline()
            .addLast(new ChannelHandler1())
            .addLast(new ChannelHandler2())
            .addLast(new ChannelHandler3());
    }
});

需要知道的是,在 Netty 整個(gè)框架里面,一條連接對(duì)應(yīng)著一個(gè) Channel,每一個(gè)新創(chuàng)建的 Channel 都將會(huì)被分配一個(gè)新的 ChannelPipeline。

ChannelPipeline里面保存的是ChannelHandlerContext對(duì)象,它是Channel相關(guān)的上下文對(duì)象,里面包著我們定義的處理器ChannelHandler。

根據(jù)事件的起源,IO事件將會(huì)被 ChannelInboundHandler或者ChannelOutboundHandler處理。隨后,通過(guò)調(diào)用ChannelHandlerContext 實(shí)現(xiàn),它將被轉(zhuǎn)發(fā)給同一超類型的下一個(gè)ChannelHandler。

1、ChannelHandler

首先,我們來(lái)看責(zé)任處理器接口,Netty中的ChannelHandler,它充當(dāng)了所有處理入站和出站數(shù)據(jù)的應(yīng)用程序邏輯的容器。

public interface ChannelHandler {
    //當(dāng)把 ChannelHandler 添加到 ChannelPipeline 中時(shí)被調(diào)用
    void handlerAdded(ChannelHandlerContext ctx) throws Exception;
    //當(dāng)從 ChannelPipeline 中移除 ChannelHandler 時(shí)被調(diào)用
    void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
    //當(dāng)處理過(guò)程中在 ChannelPipeline 中有錯(cuò)誤產(chǎn)生時(shí)被調(diào)用
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}

然后 Netty 定義了下面兩個(gè)重要的 ChannelHandler 子接口:

  • ChannelInboundHandler,處理入站數(shù)據(jù)以及各種狀態(tài)變化;
public interface ChannelInboundHandler extends ChannelHandler {
    //當(dāng) Channel 已經(jīng)注冊(cè)到它的 EventLoop 并且能夠處理 I/O 時(shí)被調(diào)用
    void channelRegistered(ChannelHandlerContext ctx) throws Exception;
    //當(dāng) Channel 從它的 EventLoop 注銷并且無(wú)法處理任何 I/O 時(shí)被調(diào)用
    void channelUnregistered(ChannelHandlerContext ctx) throws Exception; 
    //當(dāng) Channel 處于活動(dòng)狀態(tài)時(shí)被調(diào)用;Channel 已經(jīng)連接/綁定并且已經(jīng)就緒
    void channelActive(ChannelHandlerContext ctx) throws Exception;   
    //當(dāng) Channel 離開活動(dòng)狀態(tài)并且不再連接它的遠(yuǎn)程節(jié)點(diǎn)時(shí)被調(diào)用
    void channelInactive(ChannelHandlerContext ctx) throws Exception;  
    當(dāng)從 Channel 讀取數(shù)據(jù)時(shí)被調(diào)用
    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;   
    //當(dāng) Channel上的一個(gè)讀操作完成時(shí)被調(diào)用
    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception; 
    void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
  • ChannelOutboundHandler,處理出站數(shù)據(jù)并且允許攔截所有的操作;
public interface ChannelOutboundHandler extends ChannelHandler {
    
    //當(dāng)請(qǐng)求將 Channel 綁定到本地地址時(shí)被調(diào)用
    void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
    //當(dāng)請(qǐng)求將 Channel 連接到遠(yuǎn)程節(jié)點(diǎn)時(shí)被調(diào)用
    void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,SocketAddress localAddress, 
        ChannelPromise promise) throws Exception;
    //當(dāng)請(qǐng)求將 Channel 從遠(yuǎn)程節(jié)點(diǎn)斷開時(shí)被調(diào)用
    void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
    //當(dāng)請(qǐng)求關(guān)閉 Channel 時(shí)被調(diào)用
    void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
    //當(dāng)請(qǐng)求將 Channel 從它的 EventLoop 注銷時(shí)被調(diào)用
    void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
    //當(dāng)請(qǐng)求從 Channel 讀取更多的數(shù)據(jù)時(shí)被調(diào)用
    void read(ChannelHandlerContext ctx) throws Exception;
    //當(dāng)請(qǐng)求通過(guò) Channel 將數(shù)據(jù)寫到遠(yuǎn)程節(jié)點(diǎn)時(shí)被調(diào)用
    void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
    //當(dāng)請(qǐng)求通過(guò) Channel 將入隊(duì)數(shù)據(jù)沖刷到遠(yuǎn)程節(jié)點(diǎn)時(shí)被調(diào)用
    void flush(ChannelHandlerContext ctx) throws Exception;
}

2、ChannelPipeline

既然叫做責(zé)任鏈模式,那就需要有一個(gè)“鏈”,在Netty中就是ChannelPipeline。

ChannelPipeline 提供了 ChannelHandler 鏈的容器,并定義了用于在該鏈上傳播入站和出站事件流的方法,另外它還具有添加刪除責(zé)任處理器接口的功能。

public interface ChannelPipeline{
    ChannelPipeline addFirst(String name, ChannelHandler handler);
    ChannelPipeline addLast(String name, ChannelHandler handler);
    ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);
    ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);
    ChannelPipeline remove(ChannelHandler handler);
    ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);
    @Override
    ChannelPipeline fireChannelRegistered();
    @Override
    ChannelPipeline fireChannelActive();
    @Override
    ChannelPipeline fireExceptionCaught(Throwable cause);
    @Override
    ChannelPipeline fireUserEventTriggered(Object event);
    @Override
    ChannelPipeline fireChannelRead(Object msg);
    @Override
    ChannelPipeline flush();
    //省略部分方法.....
}

然后我們看它的實(shí)現(xiàn),默認(rèn)有兩個(gè)節(jié)點(diǎn),頭結(jié)點(diǎn)和尾結(jié)點(diǎn)。并在構(gòu)造函數(shù)中,使它們首尾相連。這就是標(biāo)準(zhǔn)的鏈?zhǔn)浇Y(jié)構(gòu)。

public class DefaultChannelPipeline implements ChannelPipeline {

    final AbstractChannelHandlerContext head;
    final AbstractChannelHandlerContext tail;
    private final Channel channel;
    
    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        tail = new TailContext(this);
        head = new HeadContext(this);
        head.next = tail;
        tail.prev = head;
    }
}

當(dāng)有新的ChannelHandler被添加時(shí),則將其封裝為ChannelHandlerContext對(duì)象,然后插入到鏈表中。

private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;
}

3、ChannelHandlerContext

ChannelHandlerContext 代表了 ChannelHandler 和 ChannelPipeline 之間的關(guān)聯(lián),每當(dāng)有 ChannelHandler 添加到 ChannelPipeline 中時(shí),都會(huì)創(chuàng)建 ChannelHandlerContext。

ChannelHandlerContext 的主要功能是管理它所關(guān)聯(lián)的 ChannelHandler 和在同一個(gè) ChannelPipeline 中的其他 ChannelHandler 之間的交互。

public interface ChannelHandlerContext{
    Channel channel();
    EventExecutor executor();
    ChannelHandler handler();
    ChannelPipeline pipeline();
    @Override
    ChannelHandlerContext fireChannelRegistered();
    @Override
    ChannelHandlerContext fireChannelUnregistered();
    @Override
    ChannelHandlerContext fireChannelActive();
    @Override
    ChannelHandlerContext fireChannelRead(Object msg);
    @Override
    ChannelHandlerContext read();
    @Override
    ChannelHandlerContext flush();
    //省略部分方法...
}

ChannelHandlerContext負(fù)責(zé)在鏈上傳播責(zé)任處理器接口的事件。

它有兩個(gè)重要的方法,查找Inbound類型和Outbound類型的處理器。

值得注意的是,如果一個(gè)入站事件被觸發(fā),它將被從ChannelPipeline的頭部開始一直被傳播到ChannelPipeline的尾端;一個(gè)出站事件將從ChannelPipeline的最右邊開始,然后向左傳播。

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
    
    volatile AbstractChannelHandlerContext next;
    volatile AbstractChannelHandlerContext prev;
    
    //查找下一個(gè)Inbound類型的處理器,左 > 右
    private AbstractChannelHandlerContext findContextInbound(int mask) {
        AbstractChannelHandlerContext ctx = this;
        EventExecutor currentExecutor = executor();
        do {
            ctx = ctx.next;
        } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
        return ctx;
    }
    //查找下一個(gè)Outbound類型的處理器,右 > 左
    private AbstractChannelHandlerContext findContextOutbound(int mask) {
        AbstractChannelHandlerContext ctx = this;
        EventExecutor currentExecutor = executor();
        do {
            ctx = ctx.prev;
        } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
        return ctx;
    }
}

4、處理流程

當(dāng)我們向服務(wù)端發(fā)送消息的時(shí)候,將會(huì)觸發(fā)read方法。

public abstract class AbstractNioByteChannel extends AbstractNioChannel {
    public final void read() {  
        //從Channel中獲取對(duì)應(yīng)的ChannelPipeline
        final ChannelPipeline pipeline = pipeline();
        //數(shù)據(jù)載體
        ByteBuf byteBuf = allocHandle.allocate(allocator);
        //傳遞數(shù)據(jù)
        pipeline.fireChannelRead(byteBuf);
    }
}

上面的代碼中,就會(huì)調(diào)用到ChannelPipeline,它會(huì)從Head節(jié)點(diǎn)開始,根據(jù)上下文對(duì)象依次調(diào)用處理器。

public class DefaultChannelPipeline implements ChannelPipeline {
    public final ChannelPipeline fireChannelRead(Object msg) {
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    }
}

因?yàn)榈谝粋€(gè)節(jié)點(diǎn)是默認(rèn)的頭結(jié)點(diǎn)HeadContext,所以它是從ChannelHandlerContext開始的。

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
    //找到下一個(gè)ChannelHandler并執(zhí)行
    public ChannelHandlerContext fireChannelRead(final Object msg) {
        invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
        return this;
    }
    
}

然后在我們自定義的ChannelHandler中,就會(huì)被調(diào)用到。

public class ChannelHandler1 extends ChannelInboundHandlerAdapter {
    public void channelRead(ChannelHandlerContext ctx, Object msg){
        System.out.println("ChannelHandler1:"+msg);
        ctx.fireChannelRead(msg);
    }
}

如果消息有多個(gè)ChannelHandler,你可以自由選擇是否繼續(xù)往下傳遞請(qǐng)求。

比如,如果你認(rèn)為消息已經(jīng)被處理且不應(yīng)該繼續(xù)往下調(diào)用,把上面的ctx.fireChannelRead(msg);注釋掉就終止了整個(gè)責(zé)任鏈。

七、策略模式

該模式定義了一系列算法,并將每個(gè)算法封裝起來(lái),使它們可以相互替換,且算法的變化不會(huì)影響使用算法的客戶。

策略模式是一個(gè)很常見(jiàn),而且也很好用的設(shè)計(jì)模式,如果你的業(yè)務(wù)代碼中有大量的if...else,那么就可以考慮是否可以使用策略模式改造一下。

RocketMQ我們大家都熟悉,是一款優(yōu)秀的分布式消息中間件。消息中間件,簡(jiǎn)單來(lái)說(shuō),就是客戶端發(fā)送一條消息,服務(wù)端存儲(chǔ)起來(lái)并提供給消費(fèi)者去消費(fèi)。

請(qǐng)求消息的類型多種多樣,處理過(guò)程肯定也不一樣,每次都判斷一下再處理就落了下乘。在RocketMQ里,它會(huì)把所有處理器注冊(cè)起來(lái),然后根據(jù)請(qǐng)求消息的 code ,讓對(duì)應(yīng)的處理器處理請(qǐng)求,這就是策略模式的應(yīng)用。

首先,它們需要實(shí)現(xiàn)同一個(gè)接口,在這里就是請(qǐng)求處理器接口。

public interface NettyRequestProcessor {
    RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)throws Exception;
    boolean rejectRequest();
}

這個(gè)接口只做一件事,就是處理來(lái)自客戶端的請(qǐng)求。不同類型的請(qǐng)求封裝成不同的RemotingCommand對(duì)象。

RocketMQ大概有90多種請(qǐng)求類型,都在RequestCode里以 code 來(lái)區(qū)分。

然后,定義一系列策略類。我們來(lái)看幾個(gè)。

//默認(rèn)的消息處理器
public class DefaultRequestProcessor implements NettyRequestProcessor {}
//發(fā)送消息的處理器
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {}
//拉取消息的處理器
public class PullMessageProcessor implements NettyRequestProcessor {}
//查詢消息的處理器
public class QueryMessageProcessor implements NettyRequestProcessor {}
//消費(fèi)者端管理的處理器
public class ConsumerManageProcessor implements NettyRequestProcessor {}

接著,將這些策略類封裝起來(lái)。在RocketMQ中,在啟動(dòng)Broker服務(wù)器的時(shí)候,注冊(cè)這些處理器。

public class BrokerController {

    public void registerProcessor() {
    
        SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE,this.pullMessageProcessor,this.pullMessageExecutor);
        
        this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
        
        NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
        this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
        
        ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
        this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
        this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
        //省略部分注冊(cè)過(guò)程.....
    }
}

最后,在Netty接收到客戶端的請(qǐng)求之后,就會(huì)根據(jù)消息的類型,找到對(duì)應(yīng)的策略類,去處理消息。

public abstract class NettyRemotingAbstract {

    public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
        //根據(jù)請(qǐng)求類型找到對(duì)應(yīng)的策略類
        final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
        //如果沒(méi)有找到就使用默認(rèn)的
        final Pair<NettyRequestProcessor, ExecutorService> pair = 
                    null == matched ? this.defaultRequestProcessor : matched;
        //執(zhí)行策略
        final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
        //省略大部分代碼......
    }
}

如果有了新的請(qǐng)求消息類型,RocketMQ也無(wú)需修改業(yè)務(wù)代碼,新增策略類并將其注冊(cè)進(jìn)來(lái)就好了。

八、代理模式

代理模式,為其他對(duì)象提供一種代理以控制對(duì)這個(gè)對(duì)象的訪問(wèn)。

在一些開源框架或中間件產(chǎn)品中,代理模式會(huì)非常常見(jiàn)。我們使用的時(shí)候越簡(jiǎn)便,框架在背后幫我們做的事就可能越復(fù)雜。這里面往往都體現(xiàn)著代理模式的應(yīng)用,頗有移花接木的味道。

1、Dubbo

Dubbo作為一個(gè)RPC框架,其中有一個(gè)很重要的功能就是:

提供高性能的基于代理的遠(yuǎn)程調(diào)用能力,服務(wù)以接口為粒度,為開發(fā)者屏蔽遠(yuǎn)程調(diào)用底層細(xì)節(jié)。

這里我們關(guān)注兩個(gè)重點(diǎn):

  • 面向接口代理;
  • 屏蔽調(diào)用底層細(xì)節(jié)。

比如我們有一個(gè)庫(kù)存服務(wù),它提供一個(gè)扣減庫(kù)存的接口。

public interface StorageDubboService {
    int decreaseStorage(StorageDTO storage);
}

在別的服務(wù)里,需要扣減庫(kù)存的時(shí)候,就會(huì)通過(guò)Dubbo引用這個(gè)接口,也比較簡(jiǎn)單。

@Reference
StorageDubboService storageDubboService;

我們使用起來(lái)很簡(jiǎn)單,可 StorageDubboService 只是一個(gè)普通的服務(wù)類,并不具備遠(yuǎn)程調(diào)用的能力。

Dubbo就是給這些服務(wù)類,創(chuàng)建了代理類。通過(guò)ReferenceBean來(lái)創(chuàng)建并返回一個(gè)代理對(duì)象。

public class ReferenceBean<T>{
    @Override
    public Object getObject() {
        return get();
    }
    public synchronized T get() {
        if (ref == null) {
            init();
        }
        return ref;
    }
}

在我們使用的時(shí)候,實(shí)則調(diào)用的是代理對(duì)象,代理對(duì)象完成復(fù)雜的遠(yuǎn)程調(diào)用。比如連接注冊(cè)中心、負(fù)載均衡、集群容錯(cuò)、連接服務(wù)器發(fā)送消息等功能。

2、Mybatis

還有一個(gè)典型的應(yīng)用,就是我們經(jīng)常在用的Mybatis。我們?cè)谑褂玫臅r(shí)候,一般只操作Mapper接口,然后Mybatis會(huì)找到對(duì)應(yīng)的SQL語(yǔ)句來(lái)執(zhí)行。

public interface UserMapper {   
    List<User> getUserList();
}

如上代碼,UserMapper也只是一個(gè)普通的接口,它是怎樣最終執(zhí)行到我們的SQL語(yǔ)句的呢?

答案也是代理。當(dāng)Mybatis掃描到我們定義的Mapper接口時(shí),會(huì)將其設(shè)置為MapperFactoryBean,并創(chuàng)建返回一個(gè)代理對(duì)象。

protected T newInstance(SqlSession sqlSession) {
    final MapperProxy<T> mapperProxy = new MapperProxy<>(sqlSession, mapperInterface, methodCache);
    return (T) Proxy.newProxyInstance(mapperInterface.getClassLoader(), new Class[] { mapperInterface }, mapperProxy);
}

代理對(duì)象去通過(guò)請(qǐng)求的方法名找到MappedStatement對(duì)象,調(diào)用執(zhí)行器,解析SqlSource對(duì)象來(lái)生成SQL,執(zhí)行并解析返回結(jié)果等。

以上案例具體的實(shí)現(xiàn)過(guò)程,在這里就不再深入細(xì)聊。有興趣可能翻閱筆者其他文章~

九、裝飾器模式

裝飾器模式,在不改變現(xiàn)有對(duì)象結(jié)構(gòu)的情況下,動(dòng)態(tài)地給該對(duì)象增加一些職責(zé)(即增加其額外功能)的模式,它屬于對(duì)象結(jié)構(gòu)型模式。

Mybatis里的緩存設(shè)計(jì),就是裝飾器模式的典型應(yīng)用。

首先,我們知道,MyBatis 執(zhí)行器是 MyBatis 調(diào)度的核心,它負(fù)責(zé)SQL語(yǔ)句的生成和執(zhí)行。

在創(chuàng)建SqlSession的時(shí)候,會(huì)創(chuàng)建這個(gè)執(zhí)行器,默認(rèn)的執(zhí)行器是SimpleExecutor。

但是為了給執(zhí)行器增加緩存的職責(zé),就變成了在SimpleExecutor上一層添加了CachingExecutor。

在CachingExecutor中的實(shí)際操作還是委托給SimpleExecutor去執(zhí)行,只是在執(zhí)行前后增加了緩存的操作。

首先,我們來(lái)看看它的裝飾過(guò)程。

public Executor newExecutor(Transaction transaction, ExecutorType executorType) {
    //默認(rèn)的執(zhí)行器
    executorType = executorType == null ? defaultExecutorType : executorType;
    executorType = executorType == null ? ExecutorType.SIMPLE : executorType;
    Executor executor;
    if (ExecutorType.BATCH == executorType) {
        executor = new BatchExecutor(this, transaction);
    } else if (ExecutorType.REUSE == executorType) {
        executor = new ReuseExecutor(this, transaction);
    } else {
        executor = new SimpleExecutor(this, transaction);
    }
    //使用緩存執(zhí)行器來(lái)裝飾
    if (cacheEnabled) {
        executor = new CachingExecutor(executor);
    }
    executor = (Executor) interceptorChain.pluginAll(executor);
    return executor;
}

當(dāng)SqlSession執(zhí)行方法的時(shí)候,則會(huì)先調(diào)用到CachingExecutor,我們來(lái)看查詢方法。

public class CachingExecutor implements Executor {
    @Override
    public <E> List<E> query()throws SQLException {
        Cache cache = ms.getCache();
        if (cache != null) {
            flushCacheIfRequired(ms);
            if (ms.isUseCache() && resultHandler == null) {
                List<E> list = (List<E>) tcm.getObject(cache, key);
                if (list == null) {
                    list = delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
                    tcm.putObject(cache, key, list); // issue #578 and #116
                }
                return list;
            }
        }
        return delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
    }
}

這里的代碼,如果開啟了緩存,則先從緩存中獲取結(jié)果。如果沒(méi)有開啟緩存或者緩存中沒(méi)有結(jié)果,則再調(diào)用SimpleExecutor執(zhí)行器去數(shù)據(jù)庫(kù)中查詢。

十、觀察者模式

觀察者模式,定義對(duì)象間的一種一對(duì)多的依賴關(guān)系,當(dāng)一個(gè)對(duì)象的狀態(tài)發(fā)生改變時(shí),所有依賴于它的對(duì)象都得到通知并被自動(dòng)更新。

在Spring或者SpringBoot項(xiàng)目中,有時(shí)候我們需要在Spring容器啟動(dòng)并加載完之后,做一些系統(tǒng)初始化的事情。這時(shí)候,我們可以配置一個(gè)觀察者ApplicationListener,來(lái)達(dá)到這一目的。這就是觀察者模式的實(shí)踐。

@Component
public class ApplicationStartup implements ApplicationListener<ContextRefreshedEvent> {
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        System.out.println("干一些系統(tǒng)初始化的事情....");
        ApplicationContext context = event.getApplicationContext();
        String[] names = context.getBeanDefinitionNames();
        for (String beanName:names){
            System.out.println("----------"+beanName+"---------");
        }
    }
}

首先,我們知道,ApplicationContext是 Spring 中的核心容器。

public abstract class AbstractApplicationContext extends DefaultResourceLoader implements ConfigurableApplicationContext {
    
    //觀察者容器
    private final Set<ApplicationListener<?>> applicationListeners = new LinkedHashSet<>();
    //被觀察者
    private ApplicationEventMulticaster applicationEventMulticaster;
}

在ApplicationContext容器刷新的時(shí)候,會(huì)初始化一個(gè)被觀察者,并注冊(cè)到Spring容器中。

然后,注冊(cè)各種觀察者到被觀察者中,形成一對(duì)多的依賴。

public abstract class AbstractApplicationContext{
    
    protected void registerListeners() {
        for (ApplicationListener<?> listener : getApplicationListeners()) {
            getApplicationEventMulticaster().addApplicationListener(listener);
        }
        String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
        for (String listenerBeanName : listenerBeanNames) {
            getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
        }
        Set<ApplicationEvent> earlyEventsToProcess = this.earlyApplicationEvents;
        this.earlyApplicationEvents = null;
        if (earlyEventsToProcess != null) {
            for (ApplicationEvent earlyEvent : earlyEventsToProcess) {
                getApplicationEventMulticaster().multicastEvent(earlyEvent);
            }
        }
    }
}

這時(shí)候,我們自定義的觀察者對(duì)象也被注冊(cè)到了applicationEventMulticaster里面。

最后,當(dāng)ApplicationContext完成刷新后,則發(fā)布ContextRefreshedEvent事件。

protected void finishRefresh() {
    publishEvent(new ContextRefreshedEvent(this));
}

通知觀察者,調(diào)用ApplicationListener.onApplicationEvent()。

private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
    listener.onApplicationEvent(event);
}

接下來(lái)我們?cè)倏纯丛贒ubbo是如何應(yīng)用這一機(jī)制的。

Dubbo服務(wù)導(dǎo)出過(guò)程始于 Spring 容器發(fā)布刷新事件,Dubbo 在接收到事件后,會(huì)立即執(zhí)行服務(wù)導(dǎo)出邏輯。

public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean,
        ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware,
        ApplicationEventPublisherAware {

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (!isExported() && !isUnexported()) {
            if (logger.isInfoEnabled()) {
                logger.info("The service ready on spring started. service: " + getInterface());
            }
            export();
        }
    }
}

我們看到,Dubbo中的ServiceBean也實(shí)現(xiàn)了 ApplicationListener 接口,在 Spring 容器發(fā)布刷新事件之后就會(huì)執(zhí)行導(dǎo)出方法。我們重點(diǎn)關(guān)注,在Dubbo執(zhí)行完導(dǎo)出之后,它也發(fā)布了一個(gè)事件。

public class ServiceBean<T>{
    
    public void export() {
        super.export();
        publishExportEvent();
    }
    private void publishExportEvent() {
        ServiceBeanExportedEvent exportEvent = new ServiceBeanExportedEvent(this);
        applicationEventPublisher.publishEvent(exportEvent);
    }
}

ServiceBeanExportedEvent,服務(wù)導(dǎo)出事件,需要繼承Spring中的事件對(duì)象ApplicationEvent。

public class ServiceBeanExportedEvent extends ApplicationEvent {
    public ServiceBeanExportedEvent(ServiceBean serviceBean) {
        super(serviceBean);
    }
    public ServiceBean getServiceBean() {
        return (ServiceBean) super.getSource();
    }
}

然后我們自定義一個(gè)ApplicationListener,也就是觀察者,就可以監(jiān)聽(tīng)到Dubbo服務(wù)接口導(dǎo)出事件了。

@Component
public class ServiceBeanListener implements ApplicationListener<ServiceBeanExportedEvent> {
    @Override
    public void onApplicationEvent(ServiceBeanExportedEvent event) {
        ServiceBean serviceBean = event.getServiceBean();
        String beanName = serviceBean.getBeanName();
        Service service = serviceBean.getService();
        System.out.println(beanName+":"+service);
    }
}

十一、命令模式

命令模式是一種行為設(shè)計(jì)模式,它可將請(qǐng)求轉(zhuǎn)換為一個(gè)包含與請(qǐng)求相關(guān)的所有信息的獨(dú)立對(duì)象。該轉(zhuǎn)換讓你能根據(jù)不同的請(qǐng)求將方法參數(shù)化、延遲請(qǐng)求執(zhí)行或?qū)⑵浞湃腙?duì)列中,且能實(shí)現(xiàn)可撤銷操作。

Hystrix是Netflix開源的一款容錯(cuò)框架,具有自我保護(hù)能力??梢宰柚构收系倪B鎖反應(yīng),快速失敗和優(yōu)雅降級(jí)。

它用一個(gè)HystrixCommand或者HystrixObservableCommand包裝所有對(duì)外部系統(tǒng)/依賴的調(diào)用,每個(gè)命令在單獨(dú)線程中/信號(hào)授權(quán)下執(zhí)行。這正是命令模式的典型應(yīng)用。

我們來(lái)看一個(gè)Hystrix應(yīng)用的例子。

首先,我們需要?jiǎng)?chuàng)建一個(gè)具體的命令類,通過(guò)構(gòu)造函數(shù)傳遞接收者對(duì)象。

public class OrderServiceHystrixCommand extends HystrixCommand<Object> {
    
    //接收者,處理業(yè)務(wù)邏輯
    private OrderService orderService;

    public OrderServiceHystrixCommand(OrderService orderService) {
        super(setter());
        this.orderService = orderService;
    }
    //設(shè)置Hystrix相關(guān)參數(shù)
    public static Setter setter() {
        HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("orderGroup");
        HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("orderService");
        HystrixThreadPoolProperties.Setter threadPoolProperties = HystrixThreadPoolProperties.Setter().withCoreSize(1)
                .withQueueSizeRejectionThreshold(1);
        HystrixCommandProperties.Setter commandProperties = HystrixCommandProperties.Setter();
        return Setter.withGroupKey(groupKey)
                .andCommandKey(commandKey)
                .andThreadPoolPropertiesDefaults(threadPoolProperties)
                .andCommandPropertiesDefaults(commandProperties);

    }
    @Override
    protected Object run() throws InterruptedException {
        Thread.sleep(500);
        return orderService.orders();
    }
    @Override
    protected Object getFallback() {
        System.out.println("-------------------------------");
        return new ArrayList();
    }
}

然后,在客戶端調(diào)用的時(shí)候,創(chuàng)建這個(gè)命令類并執(zhí)行即可。

@RestController
public class OrderController {

    @Autowired
    OrderService orderService;

    @RequestMapping("/orders")
    public Object orders(){
        OrderServiceHystrixCommand command = new OrderServiceHystrixCommand(orderService);
        return command.execute();
    }
}

看上去,命令模式和策略模式有些相像,它們都可以通過(guò)某些行為來(lái)參數(shù)化對(duì)象。但它們的思想有很大區(qū)別。

比如說(shuō)我們可以使用命令來(lái)將任何操作轉(zhuǎn)換為對(duì)象,操作的參數(shù)將成為對(duì)象的成員變量。同樣的,我們也可以對(duì)請(qǐng)求做任何操作,比如延遲執(zhí)行,記錄日志,保存歷史命令等。

而策略模式側(cè)重點(diǎn)在于描述完成某件事的不同方式, 讓你能夠在同一個(gè)上下文類中切換算法。

總結(jié)

本文重點(diǎn)介紹了設(shè)計(jì)模式在不同框架中的實(shí)現(xiàn),以期讓大家更好地理解模式背后的思想和應(yīng)用場(chǎng)景。歡迎有不同想法的朋友,留言探討~

原創(chuàng)不易,客官們點(diǎn)個(gè)贊再走嘛,這將是筆者持續(xù)寫作的動(dòng)力~

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容