Android使用MQTT通訊

前言

主要講下Android如何使用MQTT通訊。用到的軟件或者框架有:

EMQ:https://www.emqx.io/cn/
org.eclipse.paho的MQTT通訊框架:https://github.com/eclipse/paho.mqtt.android

如果已經(jīng)有MQTT相關(guān)服務(wù),可以跳過第一項,從第二項開始看。

一、安裝EMQ及啟動EMQ

1.安裝所需要的依賴包

$ sudo yum install -y yum-utils device-mapper-persistent-data lvm2

2.使用以下命令設(shè)置穩(wěn)定存儲庫,以 CentOS7 為例

$ sudo yum-config-manager --add-repo https://repos.emqx.io/emqx-ce/redhat/centos/7/emqx-ce.repo

3.安裝最新版本的 EMQ X

$ sudo yum install emqx

4.安裝特定版本的 EMQ X

  • 查詢可用版本
$ yum list emqx --showduplicates | sort -r

emqx.x86_64                     3.1.0-1.el7                        emqx-stable
emqx.x86_64                     3.0.1-1.el7                        emqx-stable
emqx.x86_64                     3.0.0-1.el7                        emqx-stable
  • 根據(jù)第二列中的版本字符串安裝特定版本,例如 3.1.0
$ sudo yum install emqx-3.1.0

5.啟動 EMQ X

  • 直接啟動
$ emqx start
emqx 3.1.0 is started successfully!

$ emqx_ctl status
Node 'emqx@127.0.0.1' is started
emqx v3.1.0 is running
  • systemctl 啟動
$ sudo systemctl start emqx
  • service 啟動
$ sudo service emqx start

EMQ管理后臺

地址:xxx.xxx.xxx:18083,地址為服務(wù)器ip或者域名,端口為18083端口


image.png

二、Android使用MQTT

1.在Android中導(dǎo)入依賴

    implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
    implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0'

項目地址:https://github.com/eclipse/paho.mqtt.android
2.創(chuàng)建MQTT連接的一個Service

public class MqttMessageService extends Service {
 
    /**
     * 訂閱的主題
     */
    private String subTopic;
    @Override
    public void onCreate() {
        super.onCreate();
        //初始化mqtt配置
        initMqtt();
        //連接mqtt
        connectMqtt();

    }



    /**
     * 連接mqtt
     */
    private void connectMqtt() {

        try {
            mqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {

                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    LogUtils.e("MQTT連接失?。。。。? + exception.getCause());
                    new Thread(new Runnable() {
                        @Override
                        public void run() {
                            LogUtils.e("30s后,嘗試重新連接" );
                            try {
                                Thread.sleep(1000*30);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            connectMqtt();
                        }
                    }).start();
                }
            });
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * mqtt初始化
     */
    private void initMqtt() {
        try {

           //寫上自己的url
            String uri = "";
            subTopic = "topic_test" ;
            mqttAndroidClient = new MqttAndroidClient(
                    getApplicationContext(),
                    uri, "test");
            mqttAndroidClient.setCallback(new MqttCallbackExtended() {
                @Override
                public void connectComplete(boolean reconnect, String serverURI) {
                    if (reconnect) {
                        LogUtils.i("MQTT重新連接成功!serverURI:" + serverURI);
                    } else {
                        LogUtils.i("MQTT連接成功!serverURI:" + serverURI);
                    }
                    subscribeAllTopics();
                }

                @Override
                public void connectionLost(Throwable cause) {
                    LogUtils.i("MQTT連接斷開!" + cause.getCause());
                }

                @Override
                public void messageArrived(String topic, MqttMessage message) {
                    LogUtils.i("收到了消息:"+topic+ message.toString()+ message.getQos());
                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken token) {
                    try {
 //                   MqttMessage mqttMessage = token.getMessage();
//                    LogUtils.i("消息發(fā)送成功:" + mqttMessage.toString());
                    } catch (MqttException e) {
                        e.printStackTrace();
                    }
                }
            });

            // 新建連接設(shè)置
            mqttConnectOptions = new MqttConnectOptions();
            mqttConnectOptions.setUserName(MqttConfig.EMQ_USERNAME);
            mqttConnectOptions.setPassword(MqttConfig.EMQ_PASSWORD.toCharArray());
            //斷開后,是否自動連接
            mqttConnectOptions.setAutomaticReconnect(true);
            //是否清空客戶端的連接記錄。若為true,則斷開后,broker將自動清除該客戶端連接信息
            mqttConnectOptions.setCleanSession(true);
            //設(shè)置超時時間,單位為秒
            mqttConnectOptions.setConnectionTimeout(15);
            //心跳時間,單位為秒。即多長時間確認(rèn)一次Client端是否在線
            mqttConnectOptions.setKeepAliveInterval(30);
            //允許同時發(fā)送幾條消息(未收到broker確認(rèn)信息)
            mqttConnectOptions.setMaxInflight(30);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 訂閱所有主題
     */
    private void subscribeAllTopics() {
        //訂閱主消息主題和更新消息主題
        subscribeToTopic(subTopic , 2);
    }


    /**
     * 訂閱一個主主題
     *
     * @param subTopic 主題名稱
     */
    private void subscribeToTopic(String subTopic, int qos) {
        try {
            mqttAndroidClient.subscribe(subTopic, qos, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    LogUtils.i("MQTT訂閱消息成功:" + subTopic);
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    LogUtils.d("MQTT訂閱消息失??!" + subTopic);
                }
            });
        } catch (MqttException ex) {
            LogUtils.d("subscribeToTopic: Exception whilst subscribing");
            ex.printStackTrace();
        }
    }

    /**
     * 發(fā)布主題
     *
     * @param topic 主題
     * @param msg   內(nèi)容
     * @param qos   qos
     */
    public void publishMessage(String topic, String msg, int qos) {
        if (mqttAndroidClient != null && mqttAndroidClient.isConnected()) {
            try {
                LogUtils.d("publishMessage: 發(fā)送" + msg);
                mqttAndroidClient.publish(topic, msg.getBytes(), qos, false);
            } catch (Exception e) {
                LogUtils.e("publishMessage: Error Publishing: " + e.getMessage());
                e.printStackTrace();
            }
        } else {
            LogUtils.e("publishMessage失敗,MQTT未連接 ");
        }
    }

    public void publishMessage(String topic, String msg) {
        publishMessage(topic, msg, 2);
    }

    @Override
    public int onStartCommand(Intent intent, int flags, int startId) {
        return Service.START_NOT_STICKY;
    }

    @Override
    public void onDestroy() {
        LogUtils.e("關(guān)閉MQTT");
        //斷開mqtt連接
        try {
            if (mqttAndroidClient != null && mqttAndroidClient.isConnected()) {
                mqttAndroidClient.disconnect();
                mqttAndroidClient.unregisterResources();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
  
        super.onDestroy();
    }
}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容