利用python完成一個智能插座

這個項目的編程語言使用的都是python,但還是必須稍微懂點前端的知識,愛動手的同學(xué)按著本篇文章應(yīng)該都能自己完成一個智能插座。整個項目代碼我放在了GitHub上,有需要的可以下載https://github.com/grey27/smartsocket

  • 硬件清單:
    esp8266模塊、繼電器、降壓模塊、普通插座
  • 用到的技術(shù):
    MQTT、micropython、flask
項目完整流程圖
流程圖 .png

按照流程總共拆分成三個部分,分別為mqtt、硬件和flask

MQTT(消息隊列遙測傳輸)是ISO 標準(ISO/IEC PRF 20922)下基于發(fā)布/訂閱范式的消息協(xié)議。它工作在 TCP/IP協(xié)議族上,是為硬件性能低下的遠程設(shè)備以及網(wǎng)絡(luò)狀況糟糕的情況下而設(shè)計的發(fā)布/訂閱型消息協(xié)議,為此,它需要一個消息中間件。

簡單解釋下mqtt的工作原理,消息中間件就是一個分發(fā)的服務(wù)器,每個mqtt的客戶端可以向服務(wù)器訂閱和發(fā)布主題,就如同訂報紙一般訂閱一個主題后每期的新報紙便會發(fā)送給你,而每個客戶端也可以是報紙廠可以發(fā)布任意名字主題,在這個項目里我們需要部署一個消息中間件,和兩個客戶端,一個負責(zé)發(fā)布主題對接flask一個負責(zé)訂閱主題對接硬件,這樣便可以實現(xiàn)對插座的控制。

部署消息中間件

中間件最好是在云上部署,我是在阿里云centos上部署開源的mqtt消息中間件mosquitto。

  • 加入yum源
    在/etc/yum.repos.d/目錄中新建一個mosquitto.repo文件,里面寫入:
[home_oojah_mqtt]
  name=mqtt (CentOS_CentOS-7)
type=rpm-md
baseurl=http://download.opensuse.org/repositories/home:/oojah:/mqtt/CentOS_CentOS-7/
gpgcheck=1
gpgkey=http://download.opensuse.org/repositories/home:/oojah:/mqtt/CentOS_CentOS-7//repodata/repomd.xml.key
enabled=1
  • 開始安裝
    yum search all mosquitto
    yum install mosquitto mosquitto-clients
  • 啟動服務(wù)
    mosquitto -c /etc/mosquitto/mosquitto.conf -d
    這樣便部署完了mqtt的消息中間件,mosquitto默認端口是1883
利用python模擬mqtt的發(fā)布/訂閱主題
  • 安裝paho-mqtt庫
    pip install paho-mqtt

  • 訂閱主題

import paho.mqtt.client as mqtt

def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))
    # 連接時訂閱test主題
    client.subscribe("test")
    

def on_message(client, userdata, msg):
    print(msg.topic+" "+str(msg.payload))
    
    
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("127.0.0.1", 1883, 60)
client.loop_forever()
  • 發(fā)布主題
import paho.mqtt.client as mqtt

def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))
    

def on_message(client, userdata, msg):
    print(msg.topic+" "+str(msg.payload))
    
    
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("127.0.0.1", 1883, 60)
# 發(fā)布 test主題
client.publish("test", 'hello')

運行訂閱主題后,運行發(fā)布主題就能在訂閱主題的程序中看到消息了。

第二部分 硬件

先看看整個硬件線路圖

實際線路.png

其實就是一個降壓模塊將220v的交流電轉(zhuǎn)為3.3v的直流電為8266模塊和繼電器供電,最后通過繼電器連接插座。如果懂硬件的同學(xué)自己可以買回模塊來接下線就行了,如果嫌麻煩也可以直接網(wǎng)上買成品的模塊,
模塊.png
淘寶上挺多的自己找找吧。注意用電安全,接入220V一端的部分最好用電工膠布封死,通電時不要接觸模塊

esp8266

說到物聯(lián)網(wǎng)基本上就繞不開這個芯片了,網(wǎng)上資料也很多,開發(fā)方式也有很多種,而我這項目中選擇的就是micropython開發(fā),micropython是由英國劍橋大學(xué)教授Damien George發(fā)明的,Damien花費六個月的時間開發(fā)了micropython。(膜拜大神)在ST公司的微控制器上實現(xiàn)了Python3的基本功能,擁有完善的解析器、編譯器、虛擬機和類庫等。現(xiàn)在在眾多開發(fā)者的努力下已經(jīng)移植到更多的硬件平臺上了。
想要了解micropython和python的不同以及快速上手可以閱讀官方的github文檔

燒寫固件
  • 在micropython的官網(wǎng)上下載最新的固件
  • 下載燒錄軟件uPyCraft
    uPyCraft是國人開發(fā)的一款micropython的IDE,雖然bug還有挺多的但勉強能用了。放上軟件的碼云地址https://gitee.com/dfrobot/upycraft/tree/master,不想登陸下載的也可以用我分享的百度云下載
    鏈接:https://pan.baidu.com/s/1V4rggW4eW3AvlZp2oXCT_A
    提取碼:xpcy
    燒寫時需要將GPIO0置低電平,再用usb轉(zhuǎn)ttl將8266連上電腦后,打開uPyCraft選擇清除flash燒錄固件。
    燒錄固件.png

燒錄完成后將GPIO0懸空再接上電腦打開uPyCraft就能進行代碼編寫了,默認里面只有一個boot.py文件,不需要改動,直接新建一個mian.py,代碼寫在里面便可以執(zhí)行了,這個uPyCraft莫名的bug比較多,比如我遇到過tab鍵和空格數(shù)目不對,但在uPyCraft里顯示是一樣長度,然后編譯代碼一直報錯縮進錯誤,最好是在自己習(xí)慣的編輯器里敲代碼最后復(fù)制到uPyCraft里,或者是使用另一款軟件uPyLoader,這是基于python寫的一個軟件,在python3環(huán)境下需要pyqt5庫和qt5庫,除了中文兼容有點問題外這個軟件我沒遇見過什么bug。

編寫代碼

esp8266支持SmartConfig(微信叫airkiss),但是micropython沒有實現(xiàn)這個功能,所以我們只能使用web網(wǎng)頁手動配置。第一次配置智能插座需要8266發(fā)射一個wifi信號,用戶連接后訪問后臺網(wǎng)頁,在網(wǎng)頁上將家里的wifi信息輸入,永久存儲。之后8266就能連上家里的wifi。連上網(wǎng)絡(luò)后只需要一直監(jiān)聽MQTT主題就行了,通過接收的主題信息對插座進行通電和斷電操作。
稍微介紹下micropython專有的兩個庫

  • machine
    這個庫包含的都是硬件相關(guān)的一些方法,比如控制引腳,通信協(xié)議,RTC等硬件基礎(chǔ)功能。
    我們項目里只需要用到控制引腳功能
    這是官方例子,相信大家看一眼就知道怎么使用了
from machine import Pin

p0 = Pin(0, Pin.OUT)    # create output pin on GPIO0
p0.on()                 # set pin to "on" (high) level
p0.off()                # set pin to "off" (low) level
p0.value(1)             # set pin to on/high

p2 = Pin(2, Pin.IN)     # create input pin on GPIO2
print(p2.value())       # get value, 0 or 1

p4 = Pin(4, Pin.IN, Pin.PULL_UP) # enable internal pull-up resistor
p5 = Pin(5, Pin.OUT, value=1) # set pin high on creation
  • network
    網(wǎng)絡(luò)相關(guān)的庫,用來控制8266的wifi等功能。
    兩種模式分別為STA和AP。直接看官方例子吧
import network

wlan = network.WLAN(network.STA_IF) # create station interface
wlan.active(True)       # activate the interface
wlan.scan()             # scan for access points
wlan.isconnected()      # check if the station is connected to an AP
wlan.connect('essid', 'password') # connect to an AP
wlan.config('mac')      # get the interface's MAC adddress
wlan.ifconfig()         # get the interface's IP/netmask/gw/DNS addresses

ap = network.WLAN(network.AP_IF) # create access-point interface
ap.active(True)         # activate the interface
ap.config(essid='ESP-AP') # set the ESSID of the access point

因為我們還需要使用mqtt,在micropython的github官方倉庫找到需要的庫。直接把umqtt里的simple.py代碼拷下來就行了,使用的例子也可以在上面看到。
為了方便調(diào)用我自己寫了一個類庫

from simple import MQTTClient
from machine import Pin
import utime

class MQTT():
    def __init__(self,id,host,topic,pin=2):
        self.host = host
        self.topic = topic
        self.id = id
        self.pin = Pin(pin,Pin.OUT)
        
    def loop(self):
        c = MQTTClient(self.id, self.host) #建立一個MQTT客戶端,傳入連接id號和主機
        c.set_callback(self.sub_cb) #設(shè)置回調(diào)函數(shù)
        c.connect() #建立連接
        c.subscribe(self.topic) #監(jiān)控主題,接收控制命令,
        while True:
            c.check_msg()
            if utime.time() % 10 == 0:  #每10秒ping一次服務(wù)器,不然很快客戶端就會掉線
                c.ping()


    def sub_cb(self,topic, msg):   #回調(diào)函數(shù),收到服務(wù)器消息后會調(diào)用這個函數(shù)
        print(topic, msg)
        if msg == b'on':
          self.pin.value(1)
        if msg == b'off':
          self.pin.value(0)
          
if __name__ == '__main__':
    mqtt = MQTT('39.108.210.212',b'test',2)
    mqtt.loop()

因為官方提供的也是一個簡單的mqtt客戶端,沒有自動斷線重連的機制,所以再循環(huán)監(jiān)聽時需要定時ping通服務(wù)器,一定要定時,不然循環(huán)ping的話會造成信息阻塞。

接下來就是main.py了,直接上代碼,之前鋪墊了那么多再加上注釋我覺得大家可以看懂了

import socket
import network
import ure
import time
import machine
import mqtt

# 填入你自己服務(wù)器的ip
HOST = '39.108.210.212'

# 提示跳轉(zhuǎn)模板 三個空,分別是提示詞,跳轉(zhuǎn)頁面,等待跳轉(zhuǎn)時間
HINT_HTML = '''
    <!DOCTYPE html>
    <html>
      <head>
        <meta charset="UTF-8">
        <title></title>
      </head>
      <body>
      {}
      <script>window.setTimeout("window.location='{}'",{});</script>
      </body>
    </html>
'''


def ConnectWifi(request):
  # 取出wifi名字密碼
  print(request)
  r = ure.search('ssid=(.*)&password=(.*) H',request)
  ssid = r.group(1)
  password = r.group(2)
  
  # 如果找到中文ssid進行譯碼
  try:
    r = ure.search('(%..)+', ssid)
    cn = r.group(0)
    cn_byte = cn.replace('%', r'\x')
    cn_byte = eval("b"+"\'"+cn_byte+"\'")
    ssid = ssid.replace(cn, cn_byte.decode())
  except:
    pass
    
  # 連接wifi
  sta = network.WLAN(network.STA_IF)
  sta.active(True)
  sta.disconnect()
  sta.connect(ssid,password)
  return ssid,password

# 獲取配置頁面  
def get_index_html(ap_list):
  aplist = []
  for a in ap_list:
    aplist.append(a[0].decode())
  with open('html.py') as fp:
    index_html = fp.read()
  for ssid in aplist:
    index_html = index_html.replace('ssid1',ssid,2)
  return index_html
  
# 打開sta模式
sta = network.WLAN(network.STA_IF)
sta.active(True)

# 嘗試打開ssid文件獲取之前配置的ssid
try:
  with open('ssid.py','r') as fp:
    ssid = fp.readline()
    password = fp.readline()
  sta.connect(ssid,password)
  time.sleep(5)
except:
  pass

# 如果可以聯(lián)網(wǎng)即監(jiān)聽mqtt
if sta.isconnected():
  try:
    mqtt_c = mqtt.MQTT("mqtt_id",HOST,b'switch',2)
    mqtt_c.loop()
  except:
    pass
    # machine.reset()
# 否則打開ap模式讓用戶連接配置
else:
  ap_list = sta.scan()
  index_html = get_index_html(ap_list)
  ap = network.WLAN(network.AP_IF)
  ap.active(False)
  ap.active(True)
  ap.config(essid='智能插座',authmode=0)

# 設(shè)置webserver
addr = socket.getaddrinfo('192.168.4.1', 80)[0][-1]
s = socket.socket()
s.bind(addr)
s.listen(5)
# 循環(huán)監(jiān)聽

while True:
  # 獲取request
  cl, addr = s.accept()
  request = cl.recv(1024).decode()
  get = request.split('\n')[0]
  # 屏蔽對favicon.ico的請求
  if 'favicon.ico' in get:
    cl.sendall('HTTP/1.1 404')
    cl.close()
    continue
  else:
    responseHeaders = "HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nConnection: close\r\n\r\n"
    cl.send(responseHeaders)
    
  if 'ssid' in get:
    cl.sendall(HINT_HTML.format('<h1>連接中。。。</h1>','tips',10000))
    cl.close()
    ssid,password = ConnectWifi(request.split('\n')[0])
    continue
    
  if 'tips' in get:
    if sta.isconnected():
      with open('ssid.py','w') as fp:
        fp.write(ssid)
        fp.write('\n')
        fp.write(password)
      tips = '<h1>配置完成,本機ip:' + sta.ifconfig()[0] + ',設(shè)備即將重啟</h1><br><h1>訪問<a href="http://' + HOST + '">' + HOST + '</a>登陸后臺控制</h1>'
      cl.sendall(HINT_HTML.format(tips,'index',3000))
      cl.close()
      time.sleep(3)
      ap.active(False)
      machine.reset()
    else:
      cl.sendall(HINT_HTML.format('<h1>密碼錯誤</h1>','index',3000))
      cl.close()
    continue
      
  if ('index' in get) or (len(get)==15):
    cl.sendall(index_html)
    cl.close()

關(guān)于html.py 其實就是一個輸入wifi名字密碼的html頁面,因為在燒錄時只能選擇py文件所以才取這個名字。大概就是長這樣:


連接頁面.png

源碼我放在了我的github上大家可以去下載,如果有會前端的同學(xué)可以自己寫一個更好看的頁面。

第三部分 flask

Flask是一個使用python編寫的輕量級 Web 應(yīng)用框架。
我們需要將每次插座操作進行計入所以需要使用flask_sqlalchemy插件,定時開關(guān)插座任務(wù)使用flask_apscheduler插件.寫一個簡單的頁面


主頁面.png

然后再用js發(fā)起ajax請求,使后臺發(fā)送mqtt消息最終達到控制插座的目的。
相關(guān)代碼都在github上了,注意下載的源碼所有的主機ip,mysql配置要根據(jù)自己的設(shè)置。

  • 最后展示一下硬件的完成品


    實物圖.JPG
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容