org.eclipse.paho.client.mqttv3 源碼解析(一) 發(fā)送

官網(wǎng)
這貨是什么,這貨能做消息推送,和其他手機(jī)端push一樣有自己的協(xié)議體和算法講究
首先看下例子使用:

public void testPubSub() throws Exception {
    String methodName = Utility.getMethodName();
    LoggingUtilities.banner(log, cclass, methodName);

    IMqttClient client = null;
    try {
      String topicStr = "topic" + "_02";
      String clientId = methodName;
      client = clientFactory.createMqttClient(serverURI, clientId);

      log.info("Assigning callback...");
      MessageListener listener = new MessageListener();
      client.setCallback(listener);

      log.info("Connecting...(serverURI:" + serverURI + ", ClientId:" + clientId);
      client.connect();

      log.info("Subscribing to..." + topicStr);
      client.subscribe(topicStr);

      log.info("Publishing to..." + topicStr);
      MqttTopic topic = client.getTopic(topicStr);
      MqttMessage message = new MqttMessage("foo".getBytes());
      topic.publish(message);

      log.info("Checking msg");
      MqttMessage msg = listener.getNextMessage();
      Assert.assertNotNull(msg);
      Assert.assertEquals("foo", msg.toString());

      log.info("getTopic name");
      String topicName = topic.getName();
      log.info("topicName = " + topicName);
      Assert.assertEquals(topicName, topicStr);

      log.info("Disconnecting...");
      client.disconnect();
    }
    finally {
      if (client != null) {
        log.info("Close...");
        client.close();
      }
    }
  }

然后看張圖:

![發(fā)送流程]](http://upload-images.jianshu.io/upload_images/4037823-3c30de0d64f0205f.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

先介紹外部接口IMqttClient和MqttMessage消息質(zhì)量,你所有的請(qǐng)求和接收都是從這個(gè)接口實(shí)現(xiàn)的。
Mqtt協(xié)議起初是IBM提出的,關(guān)于clinet端,有很多包,Eclipse只是其中之一

public interface IMqttClient { //extends IMqttAsyncClient {

   public void connect(MqttConnectOptions options) throws MqttSecurityException, MqttException;
  
    public IMqttToken connectWithResult(MqttConnectOptions options) throws MqttSecurityException, MqttException;

    public void disconnect(long quiesceTimeout) throws MqttException;
  
    // Mqtt基于Tcp協(xié)議,在關(guān)閉時(shí)候,有4次握手機(jī)會(huì),為了最后一次不會(huì)超時(shí),造成server資源泄露,強(qiáng)制關(guān)閉
    public void disconnectForcibly(long quiesceTimeout, long disconnectTimeout) throws MqttException;

    public void subscribe(String[] topicFilters, int[] qos, IMqttMessageListener[] messageListeners) throws MqttException; // 訂閱主題,有自己的長(zhǎng)度和規(guī)則限制

    public void unsubscribe(String topicFilter) throws MqttException;

    public void publish(String topic, byte[] payload, int qos, boolean retained) throws MqttException, MqttPersistenceException; // 發(fā)布消息,有QOS3種質(zhì)量

    public MqttTopic getTopic(String topic);

    public boolean isConnected();

    public String getClientId();

    public String getServerURI();

    public IMqttDeliveryToken[] getPendingDeliveryTokens();// 每一個(gè)Message都有個(gè)Token,用于追蹤,cleanSession會(huì)清空前面狀態(tài),推薦使用false
    
    public void setManualAcks(boolean manualAcks); // 為true表示收到ack需要自己手動(dòng)通知messageArrived
    
    public void messageArrivedComplete(int messageId, int qos) throws MqttException; // 接收到所有信息回掉的完成

    public void close() throws MqttException;
}

質(zhì)量有3種
Qos 0 : 這個(gè)消息最多發(fā)送一次,不能被持久化到磁盤(pán),不能通過(guò)網(wǎng)絡(luò)被傳遞,一般內(nèi)部消息轉(zhuǎn)換
Qos 1 : 這個(gè)消息至少發(fā)送一次,能被重傳,能持久化,能通過(guò)網(wǎng)絡(luò)傳遞,需要實(shí)現(xiàn)MqttConnectOptions中的持久化,否則,掛了以后,不能重傳。
Qos 2:這個(gè)消息精準(zhǔn)只發(fā)一次,能持久化,能通過(guò)網(wǎng)絡(luò)傳遞,客戶端和服務(wù)器都會(huì)收到消息確認(rèn)

上面接口方法的用途,基本你都能猜出來(lái),我們接著看實(shí)現(xiàn)類

public class MqttClient implements IMqttClient { //), DestinationProvider {
    //private static final String CLASS_NAME = MqttClient.class.getName();
    //private static final Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT,CLASS_NAME);

    protected MqttAsyncClient aClient = null;  // Delegate implementation to MqttAsyncClient
    protected long timeToWait = -1;             // How long each method should wait for action to complete

    /*
     * (non-Javadoc)
     * 
     * @see org.eclipse.paho.client.mqttv3.IMqttAsyncClient#disconnectForcibly(long, long)
     */
    public void disconnectForcibly(long quiesceTimeout, long disconnectTimeout) throws MqttException {
        aClient.disconnectForcibly(quiesceTimeout, disconnectTimeout);
    }

    /*
     * @see IMqttClient#subscribe(String[], int[])
     */
    public void subscribe(String[] topicFilters, int[] qos) throws MqttException {
        IMqttToken tok = aClient.subscribe(topicFilters, qos, null, null);
        tok.waitForCompletion(getTimeToWait());
        int[] grantedQos = tok.getGrantedQos();
        for (int i = 0; i < grantedQos.length; ++i) {
            qos[i] = grantedQos[i];
        }
        if (grantedQos.length == 1 && qos[0] == 0x80) {
            throw new MqttException(MqttException.REASON_CODE_SUBSCRIBE_FAILED);
        }
    }

    /*
     * @see IMqttClient#publishBlock(String, byte[], int, boolean)
     */
    public void publish(String topic, byte[] payload,int qos, boolean retained) throws MqttException,
            MqttPersistenceException {
        MqttMessage message = new MqttMessage(payload);
        message.setQos(qos);
        message.setRetained(retained);
        this.publish(topic, message);
    }

    public void setManualAcks(boolean manualAcks) {
        aClient.setManualAcks(manualAcks);
    }
    
    public void messageArrivedComplete(int messageId, int qos) throws MqttException {
        aClient.messageArrivedComplete(messageId, qos);
    }

       ........
}

這個(gè)類是代理類,其實(shí)是MqttAsyncClient干活

  • 1.MqttAsyncClient是使用無(wú)阻塞運(yùn)行在后臺(tái)的輕量級(jí)鏈接server?;赥cp,包含了ssl
  • 2.為了確保消息通過(guò)網(wǎng)絡(luò)被送達(dá)和重啟客戶端帶有質(zhì)量標(biāo)志的消息要求被保存直到重新被送達(dá)。mqtt提供了一種自己持久化機(jī)制來(lái)保存這些消息。MqttDefaultFilePersistence是默認(rèn)方式,如果為null則為瞬時(shí)消息保存在內(nèi)存中。MqttClientPersistence可以自己現(xiàn)實(shí)接口
  • 3.如果在connecting中MqttConnectOptions.setCleanSession(boolean)這個(gè)flag,為true也就說(shuō),如果client掉線disconnect,下次重連,將清空內(nèi)存persistence消息,如果為false,就會(huì)使用持久化機(jī)制去重傳
public class MqttAsyncClient implements IMqttAsyncClient { // DestinationProvider {

        .......

    public MqttAsyncClient(String serverURI, String clientId, MqttClientPersistence persistence, MqttPingSender pingSender) throws MqttException {
        final String methodName = "MqttAsyncClient";

        log.setResourceName(clientId);

        if (clientId == null) { //Support empty client Id, 3.1.1 standard
            throw new IllegalArgumentException("Null clientId");
        }
        // Count characters, surrogate pairs count as one character.
        int clientIdLength = 0;
        for (int i = 0; i < clientId.length() - 1; i++) {
            if (Character_isHighSurrogate(clientId.charAt(i)))
                i++;
            clientIdLength++;
        }
        if ( clientIdLength > 65535) {
            throw new IllegalArgumentException("ClientId longer than 65535 characters");
        }

        MqttConnectOptions.validateURI(serverURI);

        this.serverURI = serverURI;
        this.clientId = clientId; // 【: 1】

        this.persistence = persistence;
        if (this.persistence == null) {
            this.persistence = new MemoryPersistence();
        }

        // @TRACE 101=<init> ClientID={0} ServerURI={1} PersistenceType={2}
        log.fine(CLASS_NAME,methodName,"101",new Object[]{clientId,serverURI,persistence});

        this.persistence.open(clientId, serverURI); // 【: 2】
        this.comms = new ClientComms(this, this.persistence, pingSender); // 【: 3】
        this.persistence.close();
        this.topics = new Hashtable();

    }
    
    protected NetworkModule[] createNetworkModules(String address, MqttConnectOptions options) throws MqttException, MqttSecurityException { // 【: 4】
        final String methodName = "createNetworkModules";
        // @TRACE 116=URI={0}
        log.fine(CLASS_NAME, methodName, "116", new Object[]{address});

        NetworkModule[] networkModules = null;
        String[] serverURIs = options.getServerURIs();
        String[] array = null;
        if (serverURIs == null) {
            array = new String[]{address};
        } else if (serverURIs.length == 0) {
            array = new String[]{address};
        } else {
            array = serverURIs;
        }

        networkModules = new NetworkModule[array.length];
        for (int i = 0; i < array.length; i++) {
            networkModules[i] = createNetworkModule(array[i], options);
        }

        log.fine(CLASS_NAME, methodName, "108");
        return networkModules;
    }

    private NetworkModule createNetworkModule(String address, MqttConnectOptions options) throws MqttException, MqttSecurityException { 
        final String methodName = "createNetworkModule";
        // @TRACE 115=URI={0}
        log.fine(CLASS_NAME,methodName, "115", new Object[] {address});

        NetworkModule netModule;
        String shortAddress;
        String host;
        int port;
        SocketFactory factory = options.getSocketFactory();

        int serverURIType = MqttConnectOptions.validateURI(address); // 【: 5】

        switch (serverURIType) {
        case MqttConnectOptions.URI_TYPE_TCP :
            shortAddress = address.substring(6);
            host = getHostName(shortAddress);
            port = getPort(shortAddress, 1883);
            if (factory == null) {
                factory = SocketFactory.getDefault();
            }
            else if (factory instanceof SSLSocketFactory) {
                throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_SOCKET_FACTORY_MISMATCH);
            }
            netModule = new TCPNetworkModule(factory, host, port, clientId); // 【: 7】
            ((TCPNetworkModule)netModule).setConnectTimeout(options.getConnectionTimeout());
            break;
        case MqttConnectOptions.URI_TYPE_SSL:
            shortAddress = address.substring(6);
            host = getHostName(shortAddress);
            port = getPort(shortAddress, 8883);
            SSLSocketFactoryFactory factoryFactory = null;
            if (factory == null) {
//              try {
                    factoryFactory = new SSLSocketFactoryFactory();
                    Properties sslClientProps = options.getSSLProperties();
                    if (null != sslClientProps)
                        factoryFactory.initialize(sslClientProps, null);
                    factory = factoryFactory.createSocketFactory(null);
//              }
//              catch (MqttDirectException ex) {
//                  throw ExceptionHelper.createMqttException(ex.getCause());
//              }
            }
            else if ((factory instanceof SSLSocketFactory) == false) {
                throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_SOCKET_FACTORY_MISMATCH);
            }

            // Create the network module...
            netModule = new SSLNetworkModule((SSLSocketFactory) factory, host, port, clientId);
            ((SSLNetworkModule)netModule).setSSLhandshakeTimeout(options.getConnectionTimeout());
            // Ciphers suites need to be set, if they are available
            if (factoryFactory != null) {
                String[] enabledCiphers = factoryFactory.getEnabledCipherSuites(null);
                if (enabledCiphers != null) {
                    ((SSLNetworkModule) netModule).setEnabledCiphers(enabledCiphers);
                }
            }
            break;
        case MqttConnectOptions.URI_TYPE_WS:
            shortAddress = address.substring(5);
            host = getHostName(shortAddress);
            port = getPort(shortAddress, 80);
            if (factory == null) {
                factory = SocketFactory.getDefault();
            }
            else if (factory instanceof SSLSocketFactory) {
                throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_SOCKET_FACTORY_MISMATCH);
            }
            netModule = new WebSocketNetworkModule(factory, address, host, port, clientId);
            ((WebSocketNetworkModule)netModule).setConnectTimeout(options.getConnectionTimeout());
            break;
        case MqttConnectOptions.URI_TYPE_WSS:
            shortAddress = address.substring(6);
            host = getHostName(shortAddress);
            port = getPort(shortAddress, 443);
            SSLSocketFactoryFactory wSSFactoryFactory = null;
            if (factory == null) {
                wSSFactoryFactory = new SSLSocketFactoryFactory();
                    Properties sslClientProps = options.getSSLProperties();
                    if (null != sslClientProps)
                        wSSFactoryFactory.initialize(sslClientProps, null);
                    factory = wSSFactoryFactory.createSocketFactory(null);

            }
            else if ((factory instanceof SSLSocketFactory) == false) {
                throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_SOCKET_FACTORY_MISMATCH);
            }

            // Create the network module... 
            netModule = new WebSocketSecureNetworkModule((SSLSocketFactory) factory, address, host, port, clientId);
            ((WebSocketSecureNetworkModule)netModule).setSSLhandshakeTimeout(options.getConnectionTimeout());
            // Ciphers suites need to be set, if they are available
            if (wSSFactoryFactory != null) {
                String[] enabledCiphers = wSSFactoryFactory.getEnabledCipherSuites(null);
                if (enabledCiphers != null) {
                    ((SSLNetworkModule) netModule).setEnabledCiphers(enabledCiphers);
                }
            }
            break;
        case MqttConnectOptions.URI_TYPE_LOCAL :
            netModule = new LocalNetworkModule(address.substring(8));
            break;
        default:
            // This shouldn't happen, as long as validateURI() has been called.
            netModule = null;
        }
        return netModule;
    }

       .......  


    public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback)
            throws MqttException, MqttSecurityException {
        final String methodName = "connect";
        if (comms.isConnected()) {
            throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_CONNECTED);
        }
        if (comms.isConnecting()) {
            throw new MqttException(MqttException.REASON_CODE_CONNECT_IN_PROGRESS);
        }
        if (comms.isDisconnecting()) {
            throw new MqttException(MqttException.REASON_CODE_CLIENT_DISCONNECTING);
        }
        if (comms.isClosed()) {
            throw new MqttException(MqttException.REASON_CODE_CLIENT_CLOSED);
        }

        this.connOpts = options;
        this.userContext = userContext;
        final boolean automaticReconnect = options.isAutomaticReconnect();

        // @TRACE 103=cleanSession={0} connectionTimeout={1} TimekeepAlive={2} userName={3} password={4} will={5} userContext={6} callback={7}
        log.fine(CLASS_NAME,methodName, "103",
                new Object[]{
                Boolean.valueOf(options.isCleanSession()),
                new Integer(options.getConnectionTimeout()),
                new Integer(options.getKeepAliveInterval()),
                options.getUserName(),
                ((null == options.getPassword())?"[null]":"[notnull]"),
                ((null == options.getWillMessage())?"[null]":"[notnull]"),
                userContext,
                callback });
        comms.setNetworkModules(createNetworkModules(serverURI, options));
        comms.setReconnectCallback(new MqttCallbackExtended() {
            
            public void messageArrived(String topic, MqttMessage message) throws Exception {
            }
            public void deliveryComplete(IMqttDeliveryToken token) {
            }
            public void connectComplete(boolean reconnect, String serverURI) {
            }

            public void connectionLost(Throwable cause) {
                if(automaticReconnect){
                        // Automatic reconnect is set so make sure comms is in resting state
                        comms.setRestingState(true);
                        reconnecting = true;
                        startReconnectCycle();
                    }
            }
        });

        // Insert our own callback to iterate through the URIs till the connect succeeds
        MqttToken userToken = new MqttToken(getClientId()); // 【: 8】....
        ConnectActionListener connectActionListener = new ConnectActionListener(this, persistence, comms, options, userToken, userContext, callback, reconnecting);
        userToken.setActionCallback(connectActionListener);
        userToken.setUserContext(this);

        // If we are using the MqttCallbackExtended, set it on the connectActionListener
        if(this.mqttCallback instanceof MqttCallbackExtended){
            connectActionListener.setMqttCallbackExtended((MqttCallbackExtended)this.mqttCallback);
        }

        comms.setNetworkModuleIndex(0);
        connectActionListener.connect();

        return userToken;
    }

        ........

    public IMqttToken disconnect(long quiesceTimeout, Object userContext, IMqttActionListener callback) throws MqttException {
        final String methodName = "disconnect";

        MqttToken token = new MqttToken(getClientId());
        token.setActionCallback(callback);
        token.setUserContext(userContext);

        MqttDisconnect disconnect = new MqttDisconnect();
        try {
            comms.disconnect(disconnect, quiesceTimeout, token);
        } catch (MqttException ex) {
            throw ex;
        }
        return token;
    }
    
    
    public String getCurrentServerURI(){
        return comms.getNetworkModules()[comms.getNetworkModuleIndex()].getServerURI();
    }

    protected MqttTopic getTopic(String topic) {
        MqttTopic.validate(topic, false/*wildcards NOT allowed*/);

        MqttTopic result = (MqttTopic)topics.get(topic);
        if (result == null) {
            result = new MqttTopic(topic, comms);
            topics.put(topic,result);
        }
        return result;
    }
    
    public IMqttToken checkPing(Object userContext, IMqttActionListener callback) throws MqttException{  // 【: 9】
        final String methodName = "ping";
        MqttToken token;
        
        token = comms.checkForActivity();   
        return token;
    }
        
        .........   
   
    public IMqttToken subscribe(String[] topicFilters, int[] qos, Object userContext, IMqttActionListener callback) throws MqttException {
        final String methodName = "subscribe";

        if (topicFilters.length != qos.length) {
            throw new IllegalArgumentException();
        }
        
        // remove any message handlers for individual topics 
        for (int i = 0; i < topicFilters.length; ++i) {
            this.comms.removeMessageListener(topicFilters[i]);
        }
        
        String subs = "";
        for (int i=0;i<topicFilters.length;i++) {
            if (i>0) {
                subs+=", ";
            }
            subs+= "topic="+ topicFilters[i]+" qos="+qos[i];
            
            //Check if the topic filter is valid before subscribing
            MqttTopic.validate(topicFilters[i], true/*allow wildcards*/);
        }
        //@TRACE 106=Subscribe topicFilter={0} userContext={1} callback={2}
        log.fine(CLASS_NAME,methodName,"106",new Object[]{subs, userContext, callback});

        MqttToken token = new MqttToken(getClientId());
        token.setActionCallback(callback);
        token.setUserContext(userContext);
        token.internalTok.setTopics(topicFilters);

        MqttSubscribe register = new MqttSubscribe(topicFilters, qos);

        comms.sendNoWait(register, token);
        //@TRACE 109=<
        log.fine(CLASS_NAME,methodName,"109");

        return token;
    }
    
    /* (non-Javadoc)
     * @see org.eclipse.paho.client.mqttv3.IMqttAsyncClient#unsubscribe(java.lang.String, java.lang.Object, org.eclipse.paho.client.mqttv3.IMqttActionListener)
     */
    public IMqttToken unsubscribe(String topicFilter,  Object userContext, IMqttActionListener callback) throws MqttException {
        return unsubscribe(new String[] {topicFilter}, userContext, callback);
    }

    public IMqttToken unsubscribe(String[] topicFilters, Object userContext, IMqttActionListener callback) throws MqttException {
        final String methodName = "unsubscribe";
        String subs = "";
        for (int i=0;i<topicFilters.length;i++) {
            if (i>0) {
                subs+=", ";
            }
            subs+=topicFilters[i];
            
            // Check if the topic filter is valid before unsubscribing
            // Although we already checked when subscribing, but invalid
            // topic filter is meanless for unsubscribing, just prohibit it
            // to reduce unnecessary control packet send to broker.
            MqttTopic.validate(topicFilters[i], true/*allow wildcards*/);
        }
        
        // remove message handlers from the list for this client
        for (int i = 0; i < topicFilters.length; ++i) {
            this.comms.removeMessageListener(topicFilters[i]);
        }

        MqttToken token = new MqttToken(getClientId());
        token.setActionCallback(callback);
        token.setUserContext(userContext);
        token.internalTok.setTopics(topicFilters);

        MqttUnsubscribe unregister = new MqttUnsubscribe(topicFilters);

        comms.sendNoWait(unregister, token);

        return token;
    }

    public void setCallback(MqttCallback callback) {
        this.mqttCallback = callback;
        comms.setCallback(callback);
    }
    
    public void setManualAcks(boolean manualAcks) {
        comms.setManualAcks(manualAcks);
    }
    
    public void messageArrivedComplete(int messageId, int qos) throws MqttException {
        comms.messageArrivedComplete(messageId, qos);
    }

    public static String generateClientId() { // 【: 10】
        //length of nanoTime = 15, so total length = 19  < 65535(defined in spec) 
        return CLIENT_ID_PREFIX + System.nanoTime();
    }

    public IMqttDeliveryToken publish(String topic, MqttMessage message, Object userContext, IMqttActionListener callback) throws MqttException,
            MqttPersistenceException {
        final String methodName = "publish";

        //Checks if a topic is valid when publishing a message.
        MqttTopic.validate(topic, false/*wildcards NOT allowed*/);

        MqttDeliveryToken token = new MqttDeliveryToken(getClientId());
        token.setActionCallback(callback);
        token.setUserContext(userContext);
        token.setMessage(message);
        token.internalTok.setTopics(new String[] {topic});

        MqttPublish pubMsg = new MqttPublish(topic, message);
        comms.sendNoWait(pubMsg, token);

        //@TRACE 112=<
        log.fine(CLASS_NAME,methodName,"112");

        return token;
    }

    public void reconnect() throws MqttException {
        final String methodName = "reconnect";
        //@Trace 500=Attempting to reconnect client: {0}
        log.fine(CLASS_NAME, methodName, "500", new Object[]{this.clientId});
        // Some checks to make sure that we're not attempting to reconnect an already connected client
        if (comms.isConnected()) {
            throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_CONNECTED);
        }
        if (comms.isConnecting()) {
            throw new MqttException(MqttException.REASON_CODE_CONNECT_IN_PROGRESS);
        }
        if (comms.isDisconnecting()) {
            throw new MqttException(MqttException.REASON_CODE_CLIENT_DISCONNECTING);
        }
        if (comms.isClosed()) {
            throw new MqttException(MqttException.REASON_CODE_CLIENT_CLOSED);
        }
        // We don't want to spam the server
        stopReconnectCycle();

        attemptReconnect();
    }
    
    private void attemptReconnect(){
        final String methodName = "attemptReconnect";   
        //@Trace 500=Attempting to reconnect client: {0}
        log.fine(CLASS_NAME, methodName, "500", new Object[]{this.clientId});
        try {
            connect(this.connOpts, this.userContext,new IMqttActionListener() {
                
                public void onSuccess(IMqttToken asyncActionToken) {
                    //@Trace 501=Automatic Reconnect Successful: {0}
                    log.fine(CLASS_NAME, methodName, "501", new Object[]{asyncActionToken.getClient().getClientId()});
                    comms.setRestingState(false);
                    stopReconnectCycle();
                }
                
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    //@Trace 502=Automatic Reconnect failed, rescheduling: {0}
                    log.fine(CLASS_NAME, methodName, "502", new Object[]{asyncActionToken.getClient().getClientId()});
                    if(reconnectDelay < 128000){
                        reconnectDelay = reconnectDelay * 2;
                    }
                    rescheduleReconnectCycle(reconnectDelay);
                }
            });
        } catch (MqttSecurityException ex) {
            //@TRACE 804=exception
            log.fine(CLASS_NAME,methodName,"804",null, ex);
        } catch (MqttException ex) {
            //@TRACE 804=exception
            log.fine(CLASS_NAME,methodName,"804",null, ex);
        }
    }

    private void startReconnectCycle(){
        String methodName = "startReconnectCycle";
        //@Trace 503=Start reconnect timer for client: {0}, delay: {1}
        log.fine(CLASS_NAME, methodName, "503", new Object[]{this.clientId, new Long(reconnectDelay)});
        reconnectTimer = new Timer("MQTT Reconnect: " + clientId);
        reconnectTimer.schedule(new ReconnectTask(), reconnectDelay);
    }
    
    private void stopReconnectCycle(){
        String methodName = "stopReconnectCycle";
        //@Trace 504=Stop reconnect timer for client: {0}
        log.fine(CLASS_NAME, methodName, "504", new Object[]{this.clientId});
        reconnectTimer.cancel();
        reconnectDelay = 1000; // Reset Delay Timer
        
    }
    
    private void rescheduleReconnectCycle(int delay){
        String methodName = "rescheduleReconnectCycle";
        //@Trace 505=Rescheduling reconnect timer for client: {0}, delay: {1}
        log.fine(CLASS_NAME, methodName, "505", new Object[]{this.clientId, new Long(reconnectDelay)});
        reconnectTimer.schedule(new ReconnectTask(), reconnectDelay);

    }
    
    private class ReconnectTask extends TimerTask {
        private static final String methodName = "ReconnectTask.run";
        public void run() {
            //@Trace 506=Triggering Automatic Reconnect attempt.
            log.fine(CLASS_NAME, methodName, "506");
            attemptReconnect();
        }
    }

      ..........
}
  • 1.從MqttAsyncClient入口開(kāi)始,需要獲得一個(gè)url和id,url會(huì)做validateURI檢查,細(xì)節(jié)可以自己查看,無(wú)非就是那個(gè)幾個(gè)網(wǎng)絡(luò)前綴的合法性。id這個(gè)是由自己生產(chǎn)generateClientId也就是(10)
  • 2.這個(gè)persistence就是持久化方案,在org.eclipse.paho.client.mqttv3.persist包下,可以自己查看MqttDefaultFilePersistence和MemoryPersistence,這里open就是初始化,一個(gè)File,一個(gè)創(chuàng)建Hashtable,每次new一個(gè)MqttAsyncClient要清空這個(gè)目錄或者Hashtable
  • 3.這個(gè)ClientComms這個(gè)很重要了,轉(zhuǎn)換器,業(yè)務(wù)邏輯在這個(gè)類開(kāi)始實(shí)現(xiàn),稍等介紹。
  • 4.對(duì)于每一個(gè)url,Mqtt會(huì)其url做網(wǎng)絡(luò)模式的匹配,最終5我們按上面列子的話就是TCPNetworkModule (7)
  • 5.校驗(yàn)uri,對(duì)應(yīng)其type
    /**
     * Validate a URI
     * @param srvURI
     * @return the URI type
     */

    protected static int validateURI(String srvURI) {
        try {
            URI vURI = new URI(srvURI);
            if (vURI.getScheme().equals("ws")){
                return URI_TYPE_WS;
            }
            else if (vURI.getScheme().equals("wss")) {
                return URI_TYPE_WSS;
            }

            if (!vURI.getPath().equals("")) {
                throw new IllegalArgumentException(srvURI);
            }
            if (vURI.getScheme().equals("tcp")) {
                return URI_TYPE_TCP;
            }
            else if (vURI.getScheme().equals("ssl")) {
                return URI_TYPE_SSL;
            }
            else if (vURI.getScheme().equals("local")) {
                return URI_TYPE_LOCAL;
            }
            else {
                throw new IllegalArgumentException(srvURI);
            }
        } catch (URISyntaxException ex) {
            throw new IllegalArgumentException(srvURI);
        }
    }
  • 7.TCPNetworkModule看下這個(gè)tcp底層實(shí)現(xiàn)
/**
 * A network module for connecting over TCP. 
 */
public class TCPNetworkModule implements NetworkModule {

        .......

    public TCPNetworkModule(SocketFactory factory, String host, int port, String resourceContext) {
        log.setResourceName(resourceContext);
        this.factory = factory;
        this.host = host;
        this.port = port;
        
    }

    /**
     * Starts the module, by creating a TCP socket to the server.
     */
    public void start() throws IOException, MqttException {
        final String methodName = "start";
        try {
//          InetAddress localAddr = InetAddress.getLocalHost();
//          socket = factory.createSocket(host, port, localAddr, 0);
            // @TRACE 252=connect to host {0} port {1} timeout {2}
            log.fine(CLASS_NAME,methodName, "252", new Object[] {host, new Integer(port), new Long(conTimeout*1000)});
            SocketAddress sockaddr = new InetSocketAddress(host, port);
            socket = factory.createSocket();
            socket.connect(sockaddr, conTimeout*1000);
        
            // SetTcpNoDelay was originally set ot true disabling Nagle's algorithm. 
            // This should not be required.
//          socket.setTcpNoDelay(true); // TCP_NODELAY on, which means we do not use Nagle's algorithm
        }
        catch (ConnectException ex) {
            //@TRACE 250=Failed to create TCP socket
            log.fine(CLASS_NAME,methodName,"250",null,ex);
            throw new MqttException(MqttException.REASON_CODE_SERVER_CONNECT_ERROR, ex);
        }
    }

         .......
}
  • 8.MqttToken提供了一種追蹤異步操作的機(jī)制
public class MqttToken implements IMqttToken {
    /**
     * A reference to the the class that provides most of the implementation of the 
     * MqttToken.  MQTT application programs must not use the internal class.
     */
    public Token internalTok = null;
        
    public MqttToken(String logContext) {
        internalTok = new Token(logContext);
    }

    public void setActionCallback(IMqttActionListener listener) {
        internalTok.setActionCallback(listener);

    }
    public IMqttActionListener getActionCallback() {
        return internalTok.getActionCallback();
    }

    public void waitForCompletion() throws MqttException {
        internalTok.waitForCompletion(-1);
    }

    public void waitForCompletion(long timeout) throws MqttException {
        internalTok.waitForCompletion(timeout);
    }

         ......

}

實(shí)際是個(gè)代理類,實(shí)現(xiàn)在內(nèi)部Token,我們看幾點(diǎn)重點(diǎn)方法即可

public class Token {

    public boolean checkResult() throws MqttException {
        if ( getException() != null)  {
            throw getException();
        }
        return true;
    }

    public void waitForCompletion() throws MqttException {
        waitForCompletion(-1);
    }

    public void waitForCompletion(long timeout) throws MqttException {
        final String methodName = "waitForCompletion";
        //@TRACE 407=key={0} wait max={1} token={2}
        log.fine(CLASS_NAME,methodName, "407",new Object[]{getKey(), new Long(timeout), this});

        MqttWireMessage resp = waitForResponse(timeout);
        if (resp == null && !completed) {
            //@TRACE 406=key={0} timed out token={1}
            log.fine(CLASS_NAME,methodName, "406",new Object[]{getKey(), this});
            exception = new MqttException(MqttException.REASON_CODE_CLIENT_TIMEOUT);
            throw exception;
        }
        checkResult();
    }

    protected MqttWireMessage waitForResponse() throws MqttException {
        return waitForResponse(-1);
    }
    
    protected MqttWireMessage waitForResponse(long timeout) throws MqttException {
        final String methodName = "waitForResponse";
        synchronized (responseLock) {
            //@TRACE 400=>key={0} timeout={1} sent={2} completed={3} hasException={4} response={5} token={6}
            log.fine(CLASS_NAME, methodName, "400",new Object[]{getKey(), new Long(timeout),new Boolean(sent),new Boolean(completed),(exception==null)?"false":"true",response,this},exception);

            while (!this.completed) {
                if (this.exception == null) {
                    try {
                        //@TRACE 408=key={0} wait max={1}
                        log.fine(CLASS_NAME,methodName,"408",new Object[] {getKey(),new Long(timeout)});
    
                        if (timeout <= 0) {
                            responseLock.wait();
                        } else {
                            responseLock.wait(timeout);
                        }
                    } catch (InterruptedException e) {
                        exception = new MqttException(e);
                    }
                }
                if (!this.completed) {
                    if (this.exception != null) {
                        //@TRACE 401=failed with exception
                        log.fine(CLASS_NAME,methodName,"401",null,exception);
                        throw exception;
                    }
                    
                    if (timeout > 0) {
                        // time up and still not completed
                        break;
                    }
                }
            }
        }
        //@TRACE 402=key={0} response={1}
        log.fine(CLASS_NAME,methodName, "402",new Object[]{getKey(), this.response});
        return this.response;
    }
    
    /**
     * Mark the token as complete and ready for users to be notified.
     * @param msg response message. Optional - there are no response messages for some flows
     * @param ex if there was a problem store the exception in the token.
     */
    protected void markComplete(MqttWireMessage msg, MqttException ex) {
        final String methodName = "markComplete";
        //@TRACE 404=>key={0} response={1} excep={2}
        log.fine(CLASS_NAME,methodName,"404",new Object[]{getKey(),msg,ex});
                
        synchronized(responseLock) {
            // ACK means that everything was OK, so mark the message for garbage collection.
            if (msg instanceof MqttAck) {
                this.message = null;
            }
            this.pendingComplete = true;
            this.response = msg;
            this.exception = ex;
        }
    }
    /**
     * Notifies this token that a response message (an ACK or NACK) has been
     * received.
     */
        protected void notifyComplete() {
            final String methodName = "notifyComplete";
            //@TRACE 411=>key={0} response={1} excep={2}
            log.fine(CLASS_NAME,methodName,"404",new Object[]{getKey(),this.response, this.exception});

            synchronized (responseLock) {
                // If pending complete is set then normally the token can be marked
                // as complete and users notified. An abnormal error may have 
                // caused the client to shutdown beween pending complete being set
                // and notifying the user.  In this case - the action must be failed.
                if (exception == null && pendingComplete) {
                    completed = true;
                    pendingComplete = false;
                } else {
                    pendingComplete = false;
                }
                
                responseLock.notifyAll();
            }
            synchronized (sentLock) {
                sent=true;  
                sentLock.notifyAll();
            }
        }
    
    public void waitUntilSent() throws MqttException {
        final String methodName = "waitUntilSent";
        synchronized (sentLock) {
            synchronized (responseLock) {
                if (this.exception != null) {
                    throw this.exception;
                }
            }
            while (!sent) {
                try {
                    //@TRACE 409=wait key={0}
                    log.fine(CLASS_NAME,methodName, "409",new Object[]{getKey()});

                    sentLock.wait();
                } catch (InterruptedException e) {
                }
            }
            
            while (!sent) {
                if (this.exception == null) {
                    throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR);
                }
                throw this.exception;
            }
        }
    }
    
    protected void notifySent() {
        final String methodName = "notifySent";
        //@TRACE 403=> key={0}
        log.fine(CLASS_NAME, methodName, "403",new Object[]{getKey()});
        synchronized (responseLock) {
            this.response = null;
            this.completed = false;
        }
        synchronized (sentLock) {
            sent = true;
            sentLock.notifyAll();
        }
    }
}

重要的方法實(shí)現(xiàn)在waitForResponse,會(huì)實(shí)現(xiàn)消費(fèi)者與生產(chǎn)者模式。可以看出當(dāng)timeout為-1的時(shí)候是死鎖的,需要notify,什么時(shí)候重置completed
notifyComplete這個(gè)是消費(fèi)者調(diào)用,提示生產(chǎn)者可以繼續(xù)了,否則就等待

  • 9.checkPing是需要開(kāi)發(fā)者手動(dòng)調(diào)用的,這個(gè)會(huì)安排一個(gè)Task維持這個(gè)鏈接心跳
  • 10.generateClientId()獨(dú)特id生成

好接下來(lái)就是ClientComms里面發(fā)送和業(yè)務(wù)邏輯了,前面都是鋪墊有個(gè)概念。

public class ClientComms {
    

    public ClientComms(IMqttAsyncClient client, MqttClientPersistence persistence, MqttPingSender pingSender) throws MqttException {
        this.conState = DISCONNECTED;
        this.client     = client;
        this.persistence = persistence;
        this.pingSender = pingSender;
        this.pingSender.init(this);
        
        this.tokenStore = new CommsTokenStore(getClient().getClientId()); // 【: 11】
        this.callback   = new CommsCallback(this); // 【: 12】
        this.clientState = new ClientState(persistence, tokenStore, this.callback, this, pingSender); // 【: 13】

        callback.setClientState(clientState);
        log.setResourceName(getClient().getClientId());
    }


    /**
     * Sends a message to the server. Does not check if connected this validation must be done
     * by invoking routines.
     * @param message
     * @param token
     * @throws MqttException
     */
    void internalSend(MqttWireMessage message, MqttToken token) throws MqttException {

        if (token.getClient() == null ) {
            // Associate the client with the token - also marks it as in use.
            token.internalTok.setClient(getClient());
        } else {
            // Token is already in use - cannot reuse
            //@TRACE 213=fail: token in use: key={0} message={1} token={2}
            log.fine(CLASS_NAME, methodName, "213", new Object[]{message.getKey(), message, token});

            throw new MqttException(MqttException.REASON_CODE_TOKEN_INUSE);
        }

        try {
            // Persist if needed and send the message
            this.clientState.send(message, token); // 【: 14】
        } catch(MqttException e) {
            if (message instanceof MqttPublish) {
                this.clientState.undo((MqttPublish)message);
            }
            throw e;
        }
    }

    /**
     * Sends a message to the broker if in connected state, but only waits for the message to be
     * stored, before returning.
     */
    public void sendNoWait(MqttWireMessage message, MqttToken token) throws MqttException { // 【: 15】
        final String methodName = "sendNoWait";
        if (isConnected() ||
                (!isConnected() && message instanceof MqttConnect) ||
                (isDisconnecting() && message instanceof MqttDisconnect)) {
            if(disconnectedMessageBuffer != null && disconnectedMessageBuffer.getMessageCount() != 0){
                this.clientState.persistBufferedMessage(message);
                disconnectedMessageBuffer.putMessage(message, token);
            } else {
                this.internalSend(message, token);
            }
        } else if(disconnectedMessageBuffer != null && isResting()){
            this.clientState.persistBufferedMessage(message);
            disconnectedMessageBuffer.putMessage(message, token);
        } else {
            throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
        }
    }

    /**
     * Sends a connect message and waits for an ACK or NACK.
     * Connecting is a special case which will also start up the
     * network connection, receive thread, and keep alive thread.
     */
    public void connect(MqttConnectOptions options, MqttToken token) throws MqttException {
        final String methodName = "connect";
        synchronized (conLock) {
            if (isDisconnected() && !closePending) {
                conState = CONNECTING;
                conOptions = options;

                MqttConnect connect = new MqttConnect(client.getClientId(),
                        conOptions.getMqttVersion(),
                        conOptions.isCleanSession(),
                        conOptions.getKeepAliveInterval(),
                        conOptions.getUserName(),
                        conOptions.getPassword(),
                        conOptions.getWillMessage(),
                        conOptions.getWillDestination());

                this.clientState.setKeepAliveSecs(conOptions.getKeepAliveInterval());
                this.clientState.setCleanSession(conOptions.isCleanSession());
                this.clientState.setMaxInflight(conOptions.getMaxInflight());

                tokenStore.open();
                ConnectBG conbg = new ConnectBG(this, token, connect);
                conbg.start();
            }
            else {
                if (isClosed() || closePending) {
                    throw new MqttException(MqttException.REASON_CODE_CLIENT_CLOSED);
                } else if (isConnecting()) {
                    throw new MqttException(MqttException.REASON_CODE_CONNECT_IN_PROGRESS);
                } else if (isDisconnecting()) {
                    throw new MqttException(MqttException.REASON_CODE_CLIENT_DISCONNECTING);
                } else {
                    throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_CONNECTED);
                }
            }
        }
    }

    public void connectComplete( MqttConnack cack, MqttException mex) throws MqttException {
        final String methodName = "connectComplete";
        int rc = cack.getReturnCode();
        synchronized (conLock) {
            if (rc == 0) {
                conState = CONNECTED;
                return;
            }
        }
        throw mex;
    }


    // Tidy up. There may be tokens outstanding as the client was
    // not disconnected/quiseced cleanly! Work out what tokens still
    // need to be notified and waiters unblocked. Store the
    // disconnect or connect token to notify after disconnect is
    // complete.
    private MqttToken handleOldTokens(MqttToken token, MqttException reason) { // 【: 16】

        MqttToken tokToNotifyLater = null;
        try {
            // First the token that was related to the disconnect / shutdown may
            // not be in the token table - temporarily add it if not
            if (token != null) {
                if (tokenStore.getToken(token.internalTok.getKey())==null) {
                    tokenStore.saveToken(token, token.internalTok.getKey());
                }
            }

            Vector toksToNot = clientState.resolveOldTokens(reason);
            Enumeration toksToNotE = toksToNot.elements();
            while(toksToNotE.hasMoreElements()) {
                MqttToken tok = (MqttToken)toksToNotE.nextElement();

                if (tok.internalTok.getKey().equals(MqttDisconnect.KEY) ||
                        tok.internalTok.getKey().equals(MqttConnect.KEY)) {
                    // Its con or discon so remember and notify @ end of disc routine
                    tokToNotifyLater = tok;
                } else {
                    // notify waiters and callbacks of outstanding tokens
                    // that a problem has occurred and disconnect is in
                    // progress
                    callback.asyncOperationComplete(tok);
                }
            }
        }catch(Exception ex) {
            // Ignore as we are shutting down
        }
        return tokToNotifyLater;
    }

    
    public void messageArrivedComplete(int messageId, int qos) throws MqttException {
        this.callback.messageArrivedComplete(messageId, qos);
    }

    // Kick off the connect processing in the background so that it does not block. For instance
    // the socket could take time to create.
    private class ConnectBG implements Runnable {
        ClientComms     clientComms = null;
        Thread          cBg = null;
        MqttToken       conToken;
        MqttConnect     conPacket;

        ConnectBG(ClientComms cc, MqttToken cToken, MqttConnect cPacket) {
            clientComms = cc;
            conToken    = cToken;
            conPacket   = cPacket;
            cBg = new Thread(this, "MQTT Con: "+getClient().getClientId());
        }

        void start() {
            cBg.start();
        }

        public void run() { // 【: 17】
            final String methodName = "connectBG:run";
            MqttException mqttEx = null;
            //@TRACE 220=>
            log.fine(CLASS_NAME, methodName, "220");

            try {
                // Reset an exception on existing delivery tokens.
                // This will have been set if disconnect occured before delivery was
                // fully processed.
                MqttDeliveryToken[] toks = tokenStore.getOutstandingDelTokens();
                for (int i=0; i<toks.length; i++) {
                    toks[i].internalTok.setException(null);
                }

                // Save the connect token in tokenStore as failure can occur before send
                tokenStore.saveToken(conToken,conPacket);

                // Connect to the server at the network level e.g. TCP socket and then
                // start the background processing threads before sending the connect
                // packet.
                NetworkModule networkModule = networkModules[networkModuleIndex];
                networkModule.start();
                receiver = new CommsReceiver(clientComms, clientState, tokenStore, networkModule.getInputStream());
                receiver.start("MQTT Rec: "+getClient().getClientId());
                sender = new CommsSender(clientComms, clientState, tokenStore, networkModule.getOutputStream());
                sender.start("MQTT Snd: "+getClient().getClientId());
                callback.start("MQTT Call: "+getClient().getClientId());                
                internalSend(conPacket, conToken);
            } catch (MqttException ex) {
                mqttEx = ex;
            } catch (Exception ex) {
                mqttEx =  ExceptionHelper.createMqttException(ex);
            }

            if (mqttEx != null) {
                shutdownConnection(conToken, mqttEx);
            }
        }
    }

    
    /*
     * Check and send a ping if needed and check for ping timeout.
     * Need to send a ping if nothing has been sent or received 
     * in the last keepalive interval.
     * Passes an IMqttActionListener to ClientState.checkForActivity
     * so that the callbacks are attached as soon as the token is created
     * (Bug 473928) 
     */
    public MqttToken checkForActivity(IMqttActionListener pingCallback){
        MqttToken token = null;
        try{
            token = clientState.checkForActivity(pingCallback);
        }catch(MqttException e){
            handleRunException(e);
        }catch(Exception e){
            handleRunException(e);
        }
        return token;
    }   
    

   ......
}

重連和關(guān)閉我都干掉了,無(wú)非就是自己加入Task和判斷flag,是否關(guān)閉和連接,大家可以自行查看

  • 11.CommsTokenStore提供了保存和追蹤token的機(jī)制即使多線程,當(dāng)一條消息被sent,所關(guān)聯(lián)的Token將會(huì)被保存,任何一個(gè)感興趣的追蹤狀態(tài)通過(guò)getTkoen在wait方法或者使用監(jiān)聽(tīng)進(jìn)行回調(diào),這個(gè)類保存token僅僅只有request時(shí)候才有唯一id,其他消息是無(wú)固定的。
public class CommsTokenStore {
    private Hashtable tokens;

    protected void saveToken(MqttToken token, String key) {
        final String methodName = "saveToken";

        synchronized(tokens) {
            //@TRACE 307=key={0} token={1}
            log.fine(CLASS_NAME,methodName,"307",new Object[]{key,token.toString()});
            token.internalTok.setKey(key);
            this.tokens.put(key, token);
        }
    }

          ......
}
  • 12.CommsCallback是橋接Receiver和外部api回掉的,它將轉(zhuǎn)換MQTT message objects進(jìn)行最終回掉外部。下節(jié)介紹
  • 13.ClientState核心類,包含了飛行窗口的滑動(dòng)機(jī)制和消息質(zhì)量控制flag,稍后。
  • 14.每一條消息都由對(duì)應(yīng)的Token
  • 15.用于重連后重新發(fā)送的,跟消息質(zhì)量有關(guān)
  • 16.也是斷線后保證消息達(dá)到機(jī)制
  • 17.在外部調(diào)用connect時(shí)候就開(kāi)始run這個(gè)Task,然后開(kāi)開(kāi)啟了CommsReceiver和CommsSender,然后保存和發(fā)送第一條connect消息

ClientState 我拆開(kāi)來(lái)看,太長(zhǎng)了
這個(gè)類是客戶端核心類,它包含了等待中和飛行消息的信息狀態(tài)。
這里消息能被多個(gè)對(duì)象傳遞,當(dāng)它在傳送移動(dòng)被接收的時(shí)候:

  1. 當(dāng)客戶端重啟,會(huì)使用持久化的消息,進(jìn)行重新傳遞。
  2. 當(dāng)客戶端或者特殊消息的客戶端狀態(tài)被實(shí)例化將會(huì)讀下面持久化
    Qos 為 2 PUBLISH 或者 PUBREL 將在 outboundqos2 hashtable中
    Qos 為 1 PUBLISH 將在 outboundqos1 hashtable
  3. 在連接的時(shí)候,復(fù)制messages從outbound hashtables到pendingMessages 或者
    pendingFlows vector按照messageid排序
    初始化的消息發(fā)布通過(guò)pendingmessages buffer(等待數(shù)據(jù)隊(duì)列)
    PUBREL 消息通過(guò) pendingflows buffer (飛行窗口隊(duì)列)
  4. 發(fā)送線程同時(shí)從pendingflows和pendingmessages拿取數(shù)據(jù)。這個(gè)消息從pendingbuffer移除的時(shí)候,還會(huì)有備份在outbound* hashtable
  5. 接收線程
    如果接受到 Qos 1的消息,將會(huì)移除持久化和outboundqos1對(duì)應(yīng)消息
    如果接受到 QoS 2的PUBREC send PUBREL消息,將會(huì)更新outboundqos2整個(gè)隊(duì)列關(guān)于PUBREL的數(shù)據(jù),并且刷新持久化。
    如果接受到 QoS 2的PUBCOMP消息,將會(huì)移除持久化和outboundqos2對(duì)應(yīng)消息
public class ClientState {

    private static final int MIN_MSG_ID = 1;        // Lowest possible MQTT message ID to use
    private static final int MAX_MSG_ID = 65535;    // Highest possible MQTT message ID to use
    private int nextMsgId = MIN_MSG_ID - 1;         // The next available message ID to use
    private Hashtable inUseMsgIds;                  // Used to store a set of in-use message IDs

    volatile private Vector pendingMessages;
    volatile private Vector pendingFlows;
    
    private CommsTokenStore tokenStore;
    private ClientComms clientComms = null;
    private CommsCallback callback = null;
    private long keepAlive;
    private boolean cleanSession;
    private MqttClientPersistence persistence;
    
    private int maxInflight = 0;    
    private int actualInFlight = 0;
    private int inFlightPubRels = 0;
    
    private Object queueLock = new Object();
    private Object quiesceLock = new Object();
    private boolean quiescing = false;
    
    private long lastOutboundActivity = 0;
    private long lastInboundActivity = 0;
    private long lastPing = 0;
    private MqttWireMessage pingCommand;
    private Object pingOutstandingLock = new Object();
    private int pingOutstanding = 0;

    private boolean connected = false;
    
    private Hashtable outboundQoS2 = null;
    private Hashtable outboundQoS1 = null;
    private Hashtable outboundQoS0 = null;
    private Hashtable inboundQoS2 = null;
    
    private MqttPingSender pingSender = null;

    protected ClientState(MqttClientPersistence persistence, CommsTokenStore tokenStore, 
            CommsCallback callback, ClientComms clientComms, MqttPingSender pingSender) throws MqttException {
        
        log.setResourceName(clientComms.getClient().getClientId());
        log.finer(CLASS_NAME, "<Init>", "" );

        inUseMsgIds = new Hashtable();
        pendingFlows = new Vector();
        outboundQoS2 = new Hashtable();
        outboundQoS1 = new Hashtable();
        outboundQoS0 = new Hashtable();
        inboundQoS2 = new Hashtable();
        pingCommand = new MqttPingReq();
        inFlightPubRels = 0;
        actualInFlight = 0;
        
        this.persistence = persistence;
        this.callback = callback;
        this.tokenStore = tokenStore;
        this.clientComms = clientComms;
        this.pingSender = pingSender;
        
        restoreState();
    }

先有個(gè)類的概念我拆出來(lái)看。

    
    /**
     * Submits a message for delivery. This method will block until there is
     * room in the inFlightWindow for the message. The message is put into
     * persistence before returning.
     * 
     * @param message  the message to send
     * @param token the token that can be used to track delivery of the message
     * @throws MqttException
     */
    public void send(MqttWireMessage message, MqttToken token) throws MqttException {
        final String methodName = "send";
        if (message.isMessageIdRequired() && (message.getMessageId() == 0)) {
            message.setMessageId(getNextMessageId());
        }
        if (token != null ) {
            try {
                token.internalTok.setMessageID(message.getMessageId());
            } catch (Exception e) {
            }
        }
            
        if (message instanceof MqttPublish) {
            synchronized (queueLock) {
                if (actualInFlight >= this.maxInflight) {
                    throw new MqttException(MqttException.REASON_CODE_MAX_INFLIGHT);
                }
                MqttMessage innerMessage = ((MqttPublish) message).getMessage();
                switch(innerMessage.getQos()) {
                    case 2:
                        outboundQoS2.put(new Integer(message.getMessageId()), message);
                        persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
                        break;
                    case 1:
                        outboundQoS1.put(new Integer(message.getMessageId()), message);
                        persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
                        break;
                }
                tokenStore.saveToken(token, message);
                pendingMessages.addElement(message);
                queueLock.notifyAll();
            }
        } else {    
            if (message instanceof MqttConnect) {
                synchronized (queueLock) {
                    // Add the connect action at the head of the pending queue ensuring it jumps
                    // ahead of any of other pending actions.
                    tokenStore.saveToken(token, message);
                    pendingFlows.insertElementAt(message,0);
                    queueLock.notifyAll();
                }
            } else {
                if (message instanceof MqttPingReq) {
                    this.pingCommand = message;
                }
                else if (message instanceof MqttPubRel) {
                    outboundQoS2.put(new Integer(message.getMessageId()), message);
                    persistence.put(getSendConfirmPersistenceKey(message), (MqttPubRel) message);
                }
                else if (message instanceof MqttPubComp)  {
                    persistence.remove(getReceivedPersistenceKey(message));
                }
                
                synchronized (queueLock) {
                    if ( !(message instanceof MqttAck )) {
                        tokenStore.saveToken(token, message);
                    }
                    pendingFlows.addElement(message);
                    queueLock.notifyAll();
                }
            }
        }
    }
    
}

send方法是阻塞的直到飛行窗口有消息,每一個(gè)消息進(jìn)來(lái)之前都會(huì)被持久化
我們看發(fā)送,一開(kāi)始進(jìn)來(lái)MqttConnect,而后才是MqttPublish方法,涉及重連和其他特殊message才走else的邏輯。
先判斷真實(shí)的滑動(dòng)指針是否超過(guò)最大的滑動(dòng)容量,然后進(jìn)行消息質(zhì)量判斷,持久化以后然后加入相應(yīng)的隊(duì)列中,最后在CommsSender中write出去

public class CommsSender implements Runnable {
    private static final String CLASS_NAME = CommsSender.class.getName();
    private static final Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME);

    //Sends MQTT packets to the server on its own thread
    private boolean running         = false;
    private Object lifecycle        = new Object();
    private ClientState clientState = null;
    private MqttOutputStream out;
    private ClientComms clientComms = null;
    private CommsTokenStore tokenStore = null;
    private Thread  sendThread      = null;
    
    public CommsSender(ClientComms clientComms, ClientState clientState, CommsTokenStore tokenStore, OutputStream out) {
        this.out = new MqttOutputStream(clientState, out);
        this.clientComms = clientComms;
        this.clientState = clientState;
        this.tokenStore = tokenStore;
        log.setResourceName(clientComms.getClient().getClientId());
    }

    public void run() {
        final String methodName = "run";
        MqttWireMessage message = null;
        while (running && (out != null)) {
            try {
                message = clientState.get();
                if (message != null) {
                    if (message instanceof MqttAck) {
                        out.write(message);
                        out.flush();
                    } else {
                        MqttToken token = tokenStore.getToken(message);
                        // While quiescing the tokenstore can be cleared so need 
                        // to check for null for the case where clear occurs
                        // while trying to send a message.
                        if (token != null) {
                            synchronized (token) {
                                out.write(message);
                                try {
                                    out.flush();
                                } catch (IOException ex) {
                                    // The flush has been seen to fail on disconnect of a SSL socket
                                    // as disconnect is in progress this should not be treated as an error
                                    if (!(message instanceof MqttDisconnect)) {
                                        throw ex;
                                    }
                                }
                                clientState.notifySent(message);
                            }
                        }
                    }
                } else { // null message
                    running = false;
                }
            } catch (MqttException me) {
                handleRunException(message, me);
            } catch (Exception ex) {        
                handleRunException(message, ex);    
            }
        } // end while
        

我們看看clientState.get(),get方法是阻塞的。

    /**
     * This returns the next piece of work, ie message, for the CommsSender
     * to send over the network.
     * Calls to this method block until either:
     *  - there is a message to be sent
     *  - the keepAlive interval is exceeded, which triggers a ping message
     *    to be returned
     *  - {@link #disconnected(MqttException, boolean)} is called
     * @return the next message to send, or null if the client is disconnected
     */
    protected MqttWireMessage get() throws MqttException {
        final String methodName = "get";
        MqttWireMessage result = null;

        synchronized (queueLock) {
            while (result == null) {
                
                // If there is no work wait until there is work.
                // If the inflight window is full and no flows are pending wait until space is freed.
                // In both cases queueLock will be notified.
                if ((pendingMessages.isEmpty() && pendingFlows.isEmpty()) || 
                    (pendingFlows.isEmpty() && actualInFlight >= this.maxInflight)) {
                    try {
                        queueLock.wait();
                    } catch (InterruptedException e) {
                    }
                }
                
                // Handle the case where not connected. This should only be the case if: 
                // - in the process of disconnecting / shutting down
                // - in the process of connecting
                if (!connected && 
                        (pendingFlows.isEmpty() || !((MqttWireMessage)pendingFlows.elementAt(0) instanceof MqttConnect))) {
                    return null;
                }

                // Check if there is a need to send a ping to keep the session alive. 
                // Note this check is done before processing messages. If not done first
                // an app that only publishes QoS 0 messages will prevent keepalive processing
                // from functioning. 
//              checkForActivity(); //Use pinger, don't check here
                
                // Now process any queued flows or messages
                if (!pendingFlows.isEmpty()) {
                    // Process the first "flow" in the queue
                    result = (MqttWireMessage)pendingFlows.remove(0);
                    if (result instanceof MqttPubRel) {
                        inFlightPubRels++;

                        //@TRACE 617=+1 inflightpubrels={0}
                        log.fine(CLASS_NAME,methodName,"617", new Object[]{new Integer(inFlightPubRels)});
                    }
        
                    checkQuiesceLock();
                } else if (!pendingMessages.isEmpty()) {
                    
                    // If the inflight window is full then messages are not 
                    // processed until the inflight window has space. 
                    if (actualInFlight < this.maxInflight) {
                        // The in flight window is not full so process the 
                        // first message in the queue
                        result = (MqttWireMessage)pendingMessages.elementAt(0);
                        pendingMessages.removeElementAt(0);
                        actualInFlight++;
    
                        //@TRACE 623=+1 actualInFlight={0}
                        log.fine(CLASS_NAME,methodName,"623",new Object[]{new Integer(actualInFlight)});
                    } else {
                        //@TRACE 622=inflight window full
                        log.fine(CLASS_NAME,methodName,"622");              
                    }
                }           
            }
        }
        return result;
    }

每次拿去數(shù)據(jù)從飛行窗口開(kāi)始的,雙向隊(duì)列,如果飛行窗口沒(méi)有了,才從等待數(shù)據(jù)隊(duì)列進(jìn)入,其中產(chǎn)生的message會(huì)根據(jù)flag優(yōu)先加入哪些隊(duì)列,如斷線后,重連的消息會(huì)置頂。

隊(duì)列形式大致就這樣

其中pendingMessages和pendingFlows聯(lián)系不大不會(huì)把其中數(shù)據(jù)做拷貝到pendingMessages中,pendingFlows只做特殊的消息處理,它有自帶一套響應(yīng)回調(diào)機(jī)制(如MqttConnect),而pendingMessages專門(mén)給外部Publish做緩存隊(duì)列的。
接收端

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,711評(píng)論 19 139
  • Android 自定義View的各種姿勢(shì)1 Activity的顯示之ViewRootImpl詳解 Activity...
    passiontim閱讀 179,365評(píng)論 25 708
  • 一、在后臺(tái)工作量少 當(dāng)用戶沒(méi)有主動(dòng)使用你的應(yīng)用程序時(shí),系統(tǒng)會(huì)將其轉(zhuǎn)換為背景狀態(tài)。該系統(tǒng)最終可能會(huì)暫停您的應(yīng)用程序,...
    弗利撒閱讀 991評(píng)論 1 0
  • 夢(mèng)想還是要有的,萬(wàn)一撞了鬼呢。還是先好好工作,要細(xì)心點(diǎn),慢慢來(lái)呢。
    LIli狐閱讀 306評(píng)論 0 0
  • HTTPS HTTPS是以安全為目標(biāo)的http通道,即在http下加入SSL層(HTTP +SSL/ TLS),S...
    K__M閱讀 780評(píng)論 0 1

友情鏈接更多精彩內(nèi)容