通過參考koa中間件,socket.io遠(yuǎn)程事件調(diào)用,以一種新的姿勢來使用WebSocket。
瀏覽器端
瀏覽器端使用WebSocket很簡單
// Create WebSocket connection.
const socket = new WebSocket('ws://localhost:8080');
// Connection opened
socket.addEventListener('open', function (event) {
socket.send('Hello Server!');
});
// Listen for messages
socket.addEventListener('message', function (event) {
console.log('Message from server ', event.data);
});
MDN關(guān)于WebSocket的介紹
能注冊的事件有onclose,onerror,onmessage,onopen。用的比較多的是onmessage,從服務(wù)器接受到數(shù)據(jù)后,會觸發(fā)message事件。通過注冊相應(yīng)的事件處理函數(shù),可以根據(jù)后端推送的數(shù)據(jù)做相應(yīng)的操作。
如果只是寫個(gè)demo,單單輸出后端推送的信息,如下使用即可:
socket.addEventListener('message', function (event) {
console.log('Message from server ', event.data);
});
實(shí)際使用過程中,我們需要判斷后端推送的數(shù)據(jù)然后執(zhí)行相應(yīng)的操作。比如聊天室應(yīng)用中,需要判斷消息是廣播的還是私聊的或者群聊的,以及是純文字信息還是圖片等多媒體信息。這時(shí)message處理函數(shù)里可能就是一堆的if else。那么有沒有什么別的優(yōu)雅的姿勢呢?答案就是中間件與事件,跨進(jìn)程的事件的發(fā)布與訂閱。在說遠(yuǎn)程事件發(fā)布訂閱之前,需要先從中間件開始,因?yàn)楹竺鎸?shí)現(xiàn)的遠(yuǎn)程事件發(fā)布訂閱是基于中間件的。
中間件
前面說了,在WebSocket實(shí)例上可以注冊事件有onclose,onerror,onmessage,onopen。每一個(gè)事件的處理函數(shù)里可能需要做各種判斷,特別是message事件。參考koa,可以將事件處理函數(shù)以中間件方式來進(jìn)行使用,將不同的操作邏輯分發(fā)到不同的中間件中,比如聊天室應(yīng)用中,聊天信息與系統(tǒng)信息(比如用戶登錄屬于系統(tǒng)信息)是可以放到不同的中間件中處理的。
koa提供use接口來注冊中間件。我們針對不同的事件提供相應(yīng)的中間件注冊接口,并且對原生的WebSocket做封裝。
export default class EasySocket{
constructor(config) {
this.url = config.url;
this.openMiddleware = [];
this.closeMiddleware = [];
this.messageMiddleware = [];
this.errorMiddleware = [];
this.openFn = Promise.resolve();
this.closeFn = Promise.resolve();
this.messageFn = Promise.resolve();
this.errorFn = Promise.resolve();
}
openUse(fn) {
this.openMiddleware.push(fn);
return this;
}
closeUse(fn) {
this.closeMiddleware.push(fn);
return this;
}
messageUse(fn) {
this.messageMiddleware.push(fn);
return this;
}
errorUse(fn) {
this.errorMiddleware.push(fn);
return this;
}
}
通過xxxUse注冊相應(yīng)的中間件。 xxxMiddleware中就是相應(yīng)的中間件。xxxFn 中間件通過compose處理后的結(jié)構(gòu)
再添加一個(gè)connect方法,處理相應(yīng)的中間件并且實(shí)例化原生WebSocket
connect(url) {
this.url = url || this.url;
if (!this.url) {
throw new Error('url is required!');
}
try {
this.socket = new WebSocket(this.url, 'echo-protocol');
} catch (e) {
throw e;
}
this.openFn = compose(this.openMiddleware);
this.socket.addEventListener('open', (event) => {
let context = { client: this, event };
this.openFn(context).catch(error => { console.log(error) });
});
this.closeFn = compose(this.closeMiddleware);
this.socket.addEventListener('close', (event) => {
let context = { client: this, event };
this.closeFn(context).then(() => {
}).catch(error => {
console.log(error)
});
});
this.messageFn = compose(this.messageMiddleware);
this.socket.addEventListener('message', (event) => {
let res;
try {
res = JSON.parse(event.data);
} catch (error) {
res = event.data;
}
let context = { client: this, event, res };
this.messageFn(context).then(() => {
}).catch(error => {
console.log(error)
});
});
this.errorFn = compose(this.errorMiddleware);
this.socket.addEventListener('error', (event) => {
let context = { client: this, event };
this.errorFn(context).then(() => {
}).catch(error => {
console.log(error)
});
});
return this;
}
使用koa-compose模塊處理中間件。注意context傳入了哪些東西,后續(xù)定義中間件的時(shí)候都已使用。
compose的作用可看這篇文章 傻瓜式解讀koa中間件處理模塊koa-compose
然后就可以使用了:
new EasySocket()
.openUse((context, next) => {
console.log("open");
next();
})
.closeUse((context, next) => {
console.log("close");
next();
})
.errorUse((context, next) => {
console.log("error", context.event);
next();
})
.messageUse((context, next) => {
//用戶登錄處理中間件
if (context.res.action === 'userEnter') {
console.log(context.res.user.name+' 進(jìn)入聊天室');
}
next();
})
.messageUse((context, next) => {
//創(chuàng)建房間處理中間件
if (context.res.action === 'createRoom') {
console.log('創(chuàng)建房間 '+context.res.room.anme);
}
next();
})
.connect('ws://localhost:8080')
可以看到,用戶登錄與創(chuàng)建房間的邏輯放到兩個(gè)中間件中分開處理。不足之處就是每個(gè)中間件都要判斷context.res.action,而這個(gè)context.res就是后端返回的數(shù)據(jù)。怎么消除這個(gè)頻繁的if判斷呢? 我們實(shí)現(xiàn)一個(gè)簡單的消息處理路由。
路由
定義消息路由中間件
messageRouteMiddleware.js
export default (routes) => {
return async (context, next) => {
if (routes[context.req.action]) {
await routes[context.req.action](context,next);
} else {
console.log(context.req)
next();
}
}
}
定義路由
router.js
export default {
userEnter:function(context,next){
console.log(context.res.user.name+' 進(jìn)入聊天室');
next();
},
createRoom:function(context,next){
console.log('創(chuàng)建房間 '+context.res.room.anme);
next();
}
}
使用:
new EasySocket()
.openUse((context, next) => {
console.log("open");
next();
})
.closeUse((context, next) => {
console.log("close");
next();
})
.errorUse((context, next) => {
console.log("error", context.event);
next();
})
.messageUse(messageRouteMiddleware(router))//使用消息路由中間件,并傳入定義好的路由
.connect('ws://localhost:8080')
一切都變得美好了,感覺就像在使用koa。想一個(gè)問題,當(dāng)接收到后端推送的消息時(shí),我們需要做相應(yīng)的DOM操作。比如路由里面定義的userEnter,我們可能需要在對應(yīng)的函數(shù)里操作用戶列表的DOM,追加新用戶。這使用原生JS或JQ都是沒有問題的,但是如果使用vue,react這些,因?yàn)槭墙M件化的,用戶列表可能就是一個(gè)組件,怎么訪問到這個(gè)組件實(shí)例呢?(當(dāng)然也可以訪問vuex,redux的store,但是并不是所有組件的數(shù)據(jù)都是用store管理的)。
我們需要一個(gè)運(yùn)行時(shí)注冊中間件的功能,然后在組件的相應(yīng)的生命周期鉤子里注冊中間件并且傳入組件實(shí)例
運(yùn)行時(shí)注冊中間件,修改如下代碼:
messageUse(fn, runtime) {
this.messageMiddleware.push(fn);
if (runtime) {
this.messageFn = compose(this.messageMiddleware);
}
return this;
}
修改 messageRouteMiddleware.js
export default (routes,component) => {
return async (context, next) => {
if (routes[context.req.action]) {
context.component=component;//將組件實(shí)例掛到context下
await routes[context.req.action](context,next);
} else {
console.log(context.req)
next();
}
}
}
類似vue mounted中使用
mounted(){
let client = this.$wsClients.get("im");//獲取指定EasySocket實(shí)例
client.messageUse(messageRouteMiddleware(router,this),true)//運(yùn)行時(shí)注冊中間件,并傳入定義好的路由以及當(dāng)前組件中的this
}
路由中通過 context.component 即可訪問到當(dāng)前組件。
完美了嗎?每次組件mounted 都注冊一次中間件,問題很大。所以需要一個(gè)判斷中間件是否已經(jīng)注冊的功能。也就是一個(gè)支持具名注冊中間件的功能。這里就暫時(shí)不實(shí)現(xiàn)了,走另外一條路,也就是之前說到的遠(yuǎn)程事件的發(fā)布與訂閱,我們也可以稱之為跨進(jìn)程事件。
跨進(jìn)程事件
看一段socket.io的代碼:
Server (app.js)
var app = require('http').createServer(handler)
var io = require('socket.io')(app);
var fs = require('fs');
app.listen(80);
function handler (req, res) {
fs.readFile(__dirname + '/index.html',
function (err, data) {
if (err) {
res.writeHead(500);
return res.end('Error loading index.html');
}
res.writeHead(200);
res.end(data);
});
}
io.on('connection', function (socket) {
socket.emit('news', { hello: 'world' });
socket.on('my other event', function (data) {
console.log(data);
});
});
Client (index.html)
<script src="/socket.io/socket.io.js"></script>
<script>
var socket = io('http://localhost');
socket.on('news', function (data) {
console.log(data);
socket.emit('my other event', { my: 'data' });
});
</script>
注意力轉(zhuǎn)到這兩部分:
服務(wù)端
socket.emit('news', { hello: 'world' });
socket.on('my other event', function (data) {
console.log(data);
});
客戶端
var socket = io('http://localhost');
socket.on('news', function (data) {
console.log(data);
socket.emit('my other event', { my: 'data' });
});
使用事件,客戶端通過on訂閱'news'事件,并且當(dāng)觸發(fā)‘new’事件的時(shí)候通過emit發(fā)布'my other event'事件。服務(wù)端在用戶連接的時(shí)候發(fā)布'news'事件,并且訂閱'my other event'事件。
一般我們使用事件的時(shí)候,都是在同一個(gè)頁面中on和emit。而socket.io的神奇之處就是同一事件的on和emit是分別在客戶端和服務(wù)端,這就是跨進(jìn)程的事件。
那么,在某一端emit某個(gè)事件的時(shí)候,另一端如果on監(jiān)聽了此事件,是如何知道這個(gè)事件emit(發(fā)布)了呢?
沒有看socket.io源碼之前,我設(shè)想應(yīng)該是emit方法里做了某些事情。就像java或c#,實(shí)現(xiàn)rpc的時(shí)候,可以依據(jù)接口定義動態(tài)生成實(shí)現(xiàn)(也稱為代理),動態(tài)實(shí)現(xiàn)的(代理)方法中,就會將當(dāng)前方法名稱以及參數(shù)通過相應(yīng)協(xié)議進(jìn)行序列化,然后通過http或者tcp等網(wǎng)絡(luò)協(xié)議傳輸?shù)絉PC服務(wù)端,服務(wù)端進(jìn)行反序列化,通過反射等技術(shù)調(diào)用本地實(shí)現(xiàn),并返回執(zhí)行結(jié)果給客戶端。客戶端拿到結(jié)果后,整個(gè)調(diào)用完成,就像調(diào)用本地方法一樣實(shí)現(xiàn)了遠(yuǎn)程方法的調(diào)用。
看了socket.io emit的代碼實(shí)現(xiàn)后,思路也是大同小異,通過將當(dāng)前emit的事件名和參數(shù)按一定規(guī)則組合成數(shù)據(jù),然后將數(shù)據(jù)通過WebSocket的send方法發(fā)送出去。接收端按規(guī)則取到事件名和參數(shù),然后本地觸發(fā)emit。(注意遠(yuǎn)程emit和本地emit,socket.io中直接調(diào)用的是遠(yuǎn)程emit)。
下面是實(shí)現(xiàn)代碼,事件直接用的emitter模塊,并且為了能自定義emit事件名和參數(shù)組合規(guī)則,以中間件的方式提供處理方法:
export default class EasySocket extends Emitter{//繼承Emitter
constructor(config) {
this.url = config.url;
this.openMiddleware = [];
this.closeMiddleware = [];
this.messageMiddleware = [];
this.errorMiddleware = [];
this.remoteEmitMiddleware = [];//新增的部分
this.openFn = Promise.resolve();
this.closeFn = Promise.resolve();
this.messageFn = Promise.resolve();
this.errorFn = Promise.resolve();
this.remoteEmitFn = Promise.resolve();//新增的部分
}
openUse(fn) {
this.openMiddleware.push(fn);
return this;
}
closeUse(fn) {
this.closeMiddleware.push(fn);
return this;
}
messageUse(fn) {
this.messageMiddleware.push(fn);
return this;
}
errorUse(fn) {
this.errorMiddleware.push(fn);
return this;
}
//新增的部分
remoteEmitUse(fn, runtime) {
this.remoteEmitMiddleware.push(fn);
if (runtime) {
this.remoteEmitFn = compose(this.remoteEmitMiddleware);
}
return this;
}
connect(url) {
...
//新增部分
this.remoteEmitFn = compose(this.remoteEmitMiddleware);
}
//重寫emit方法,支持本地調(diào)用以遠(yuǎn)程調(diào)用
emit(event, args, isLocal = false) {
let arr = [event, args];
if (isLocal) {
super.emit.apply(this, arr);
return this;
}
let evt = {
event: event,
args: args
}
let remoteEmitContext = { client: this, event: evt };
this.remoteEmitFn(remoteEmitContext).catch(error => { console.log(error) })
return this;
}
}
下面是一個(gè)簡單的處理中間件:
client.remoteEmitUse((context, next) => {
let client = context.client;
let event = context.event;
if (client.socket.readyState !== 1) {
alert("連接已斷開!");
} else {
client.socket.send(JSON.stringify({
type: 'event',
event: event.event,
args: event.args
}));
next();
}
})
意味著調(diào)用
client.emit('chatMessage',{
from:'admin',
masg:"Hello WebSocket"
});
就會組合成數(shù)據(jù)
{
type: 'event',
event: 'chatMessage',
args: {
from:'admin',
masg:"Hello WebSocket"
}
}
發(fā)送出去。
服務(wù)端接受到這樣的數(shù)據(jù),可以做相應(yīng)的數(shù)據(jù)處理(后面會使用nodejs實(shí)現(xiàn)類似的編程模式),也可以直接發(fā)送給別的客戶端??蛻羰艿筋愃频臄?shù)據(jù),可以寫專門的中間件進(jìn)行處理,比如:
client.messageUse((context, next) => {
if (context.res.type === 'event') {
context.client.emit(context.res.event, context.res.args, true);//注意這里的emit是本地emit。
}
next();
})
如果本地訂閱的chatMessage事件,回到函數(shù)就會被觸發(fā)。
在vue或react中使用,也會比之前使用路由的方式簡單
mounted() {
let client = this.$wsClients.get("im");
client.on("chatMessage", data => {
let isSelf = data.from.id == this.user.id;
let msg = {
name: data.from.name,
msg: data.msg,
createdDate: data.createdDate,
isSelf
};
this.broadcastMessageList.push(msg);
});
}
組件銷毀的時(shí)候移除相應(yīng)的事件訂閱即可,或者清空所有事件訂閱
destroyed() {
let client = this.$wsClients.get("im");
client.removeAllListeners();
}
心跳重連
核心代碼直接從websocket-heartbeat-js copy過來的(用npm包,還得在它的基礎(chǔ)上再包一層),相關(guān)文章 初探和實(shí)現(xiàn)websocket心跳重連。
核心代碼:
heartCheck() {
this.heartReset();
this.heartStart();
}
heartStart() {
this.pingTimeoutId = setTimeout(() => {
//這里發(fā)送一個(gè)心跳,后端收到后,返回一個(gè)心跳消息
this.socket.send(this.pingMsg);
//接收到心跳信息說明連接正常,會執(zhí)行heartCheck(),重置心跳(清除下面定時(shí)器)
this.pongTimeoutId = setTimeout(() => {
//此定時(shí)器有運(yùn)行的機(jī)會,說明發(fā)送ping后,設(shè)置的超時(shí)時(shí)間內(nèi)未收到返回信息
this.socket.close();//不直接調(diào)用reconnect,避免舊WebSocket實(shí)例沒有真正關(guān)閉,導(dǎo)致不可預(yù)料的問題
}, this.pongTimeout);
}, this.pingTimeout);
}
heartReset() {
clearTimeout(this.pingTimeoutId);
clearTimeout(this.pongTimeoutId);
}
最后
源碼地址:easy-socket-browser
nodejs實(shí)現(xiàn)的類似的編程模式(有空再細(xì)說):easy-socket-node
實(shí)現(xiàn)的聊天室例子:online chat demo
聊天室前端源碼:lazy-mock-im
聊天室服務(wù)端源碼:lazy-mock