并發(fā)
創(chuàng)建進(jìn)程
使用 erlang:spawn/1,2,3,4 用來(lái)創(chuàng)建一個(gè) erlang 進(jìn)程。Erlang 進(jìn)程不是操作系統(tǒng)的進(jìn)程,而是類(lèi)似其他語(yǔ)言里“協(xié)程”的概念,它由 Erlang 虛擬機(jī)調(diào)度。本文以后說(shuō)“進(jìn)程”,就是指 Erlang 進(jìn)程。
進(jìn)程之間是互相獨(dú)立的,一個(gè)進(jìn)程要想與另外一個(gè)進(jìn)程通信,就必須通過(guò)消息傳遞。消息會(huì)被發(fā)送到對(duì)方進(jìn)程的信箱存儲(chǔ)起來(lái),對(duì)方進(jìn)程可以在合適的時(shí)間,按照自定的順序讀取信箱里的消息。
Erlang 里進(jìn)程非常輕量,啟動(dòng)速度很快,并且可以同時(shí)運(yùn)行千千萬(wàn)萬(wàn)個(gè),默認(rèn)的進(jìn)程個(gè)數(shù)上限是 262144 ,但可以在啟動(dòng)時(shí)使用 erl +P 修改這個(gè)配置。
1> HelloParallel = fun() -> io:format("hello parallel!~n") end.
#Fun<erl_eval.20.99386804>
2> spawn(HelloParallel). %% spawn/1 BIF 接受一個(gè)函數(shù)做為參數(shù)。
hello parallel!
<0.63.0>
3> PID = pid(0,63,0). %% 使用 pid 來(lái)生成一個(gè) PID
4> is_pid(PID). %% 檢查是否是 PID 類(lèi)型
true
5> is_process_alive(PID). %% 檢查 Process 是否還活著。顯示 false 是因?yàn)樗呀?jīng)運(yùn)行完成終止了。
false
spawn 函數(shù)返回一個(gè)新進(jìn)程的 pid,我們可以使用這個(gè) pid 與其交互。
erlang shell 也是有 pid 的。前面說(shuō)到一個(gè)運(yùn)行時(shí)錯(cuò)誤會(huì)使得當(dāng)前的shell 進(jìn)程崩潰,并重新啟動(dòng)一個(gè)新的進(jìn)程,我們驗(yàn)證一下:
1> self(). %% self/1 返回當(dāng)前進(jìn)程的 pid
<0.60.0>
2> 1 = 2.
** exception error: no match of right hand side value 2
3> self().
<0.63.0>
消息發(fā)送和接收
使用消息發(fā)送運(yùn)算符 ! 發(fā)送消息。
4> self() ! "hello". %% 向自己所在的進(jìn)程發(fā)送一個(gè) List 類(lèi)型的 "hello". `!` 操作的返回值是消息內(nèi)容, "hello".
"hello"
5> flush(). %% flush() 將當(dāng)前 process 的信箱里的所有消息清空并打印。
Shell got "hello"
ok
receive ... end 語(yǔ)句使用 pattern matching 來(lái)從自己進(jìn)程的信箱里讀取消息,可以使用 after 語(yǔ)句來(lái)設(shè)置等待超時(shí)時(shí)間:
1> self() ! "msg1".
"msg1"
2> self() ! "msg2".
"msg2"
3> self() ! "msg3".
"msg3"
4> receive Msg -> Msg after 3000 -> no_more end. %% 讀取任意消息并返回這條消息,如果信箱里沒(méi)有消息了,等待 3 秒后結(jié)束并返回 no_more.
"msg1"
5> receive Msg -> Msg after 3000 -> no_more end. %% 后面這兩條為什么返回 no_more ? 不應(yīng)該是 "msg2", "msg3" 嗎?
no_more
6> receive Msg -> Msg after 3000 -> no_more end.
no_more
上面的第 4 行 receive 語(yǔ)句里,erlang shell 進(jìn)程查看郵箱,查到第一個(gè)消息是 "msg1", Msg 被綁定為 "msg1"。再次運(yùn)行 receive 語(yǔ)句的時(shí)候,由于 Msg 的值已經(jīng)為 "msg1",與信箱里的 "msg2", "msg3" 都不匹配,所以后面兩條 receive 語(yǔ)句都沒(méi)有從信箱里讀取新消息,"msg2" 和 "msg3" 仍然存儲(chǔ)在信箱里:
16> flush().
Shell got "msg2"
Shell got "msg3"
ok
注意雖然后面兩個(gè) receive 語(yǔ)句都沒(méi)有從信箱里讀取消息,但在 receive 語(yǔ)句的執(zhí)行過(guò)程中,它仍然是從頭到尾遍歷了整個(gè)郵箱,并嘗試拿郵箱里的各個(gè)消息跟代碼里的 Msg 進(jìn)行匹配,這是消耗資源的,等后面消息堆積越多越麻煩。這個(gè)叫 Selective Message Reception. 消息的讀取順序是接收方?jīng)Q定的。
所以一般情況下我們?cè)谧x取信箱消息時(shí),讀到我們不感興趣的消息也取出來(lái),打個(gè) error log 然后扔掉它,不要讓它一直在信箱里耗費(fèi)資源。
在 Erlang shell 已經(jīng)伸展不開(kāi)拳腳了。讓我們來(lái)寫(xiě)個(gè)復(fù)雜點(diǎn)的程序:
我們的程序?qū)崿F(xiàn)一個(gè) 消息緩存,具體需求是:
- 我們需要一個(gè)消息棧,用于存儲(chǔ)用戶發(fā)來(lái)的消息。
- 考慮到用戶發(fā)來(lái)的消息可能有很多,我們需要好幾個(gè)這樣的消息棧來(lái)分擔(dān)負(fù)載。
- 我們還想能夠給消息棧命名,以便區(qū)分。
-module(msg_cache).
%% APIs
-export([start_one/1]).
%% for spawns
-export([loop/1]).
%% 定義進(jìn)程的 state。
%% 我們一般說(shuō),一個(gè)服務(wù)、或 “對(duì)象” 會(huì)維護(hù)自己內(nèi)部的 '狀態(tài)'
%% 狀態(tài)可能是一個(gè)字符串緩存,可能是某個(gè)資源的引用,這個(gè)跟業(yè)務(wù)相關(guān)。
%% 狀態(tài)存在于內(nèi)存中,跟外界隔離,通過(guò) API 接口與外界交互。
%% 面向?qū)ο笳Z(yǔ)言里用 類(lèi)和對(duì)象來(lái)存儲(chǔ)狀態(tài),Erlang 里我們用 process。
%% 所以我們又說(shuō) Erlang 是 “面向Process 編程的”
-record(state, {
name, %% 消息棧的名字
length = 0, %% 消息棧長(zhǎng)度
buff = [] %% 消息棧的存儲(chǔ)列表
}).
loop(State = #state{name = Name, length = Len, buff = Buff}) ->
receive
{get_name, From}->
From ! {ok, Name},
loop(State);
{get_length, From}->
From ! {ok, Len},
loop(State);
{set_name, NewName, From} ->
From ! ok,
loop(State#state{name = NewName});
{push, Msg, From} ->
From ! ok,
loop(State#state{buff = [Msg | Buff], length = Len + 1});
{pop, [], From} ->
From ! {error, empty},
loop(State);
{pop, [TopMsg | Msgs], From} ->
From ! {ok, TopMsg},
loop(State#state{buff = Msgs, length = Len - 1});
_Unsupported ->
erlang:error(io_libs:format("unsupported msg: ", [_Unsupported]) )
end.
start_one(BuffName) ->
%% 啟動(dòng)一個(gè)消息棧,并返回其 PID
Pid = spawn(msg_cache, loop, [#state{name=BuffName}]),
io:format("Buff ~s created! Pid = ~p~n", [BuffName, Pid]),
Pid
其實(shí)除了 loop/1 長(zhǎng)一點(diǎn),其他的都挺容易理解的。
注意 loop/1 里的每個(gè)分支的最后一個(gè)語(yǔ)句都是尾遞歸,意味著只要不出錯(cuò),loop/1 就一直循環(huán)下去,所以進(jìn)程就不會(huì)停止。
思考:如果把上面代碼里 receive 語(yǔ)句的最后一個(gè) _Unsupported -> 分支刪掉的話,會(huì)發(fā)生什么?
receive 語(yǔ)句里,接受消息時(shí),都要求消息發(fā)送方將自己的 Pid 帶過(guò)來(lái),放到 From 變量里,以便我們回復(fù)消息給對(duì)方。
我們來(lái)試試:
1> PID = msg_cache:start_one("cache2").
Buff cache2 created! Pid = <0.62.0>
<0.62.0>
2> PID ! {get_length, self()}.
{get_length,<0.60.0>}
3> flush().
Shell got {ok,0}
ok
4> PID ! {pop, self()}.
{pop,<0.60.0>}
5> flush().
Shell got {error,empty}
ok
6> PID ! {push, "msg1", self()}.
{push,"msg1",<0.60.0>}
7> PID ! {push, "msg2", self()}.
{push,"msg2",<0.60.0>}
8> PID ! {push, "msg3", self()}.
{push,"msg3",<0.60.0>}
9> PID ! {get_length, self()}.
{get_length,<0.60.0>}
10> flush().
Shell got ok
Shell got ok
Shell got ok
Shell got {ok,3}
ok
11> PID ! {pop, self()}.
{pop,<0.60.0>}
12> flush().
Shell got {ok,"msg3"}
ok
13> PID ! {get_length, self()}.
{get_length,<0.60.0>}
14> flush().
Shell got {ok,2}
ok
繼續(xù)往下閱讀之前,仔細(xì)看一下這個(gè)例子,確保你完全理解了這段代碼。
挺厲害的吧?但我們還有兩個(gè)問(wèn)題沒(méi)有解決:
- 沒(méi)有一個(gè)易用易維護(hù)的 API。 PID ! {get_length, self()}. 這種調(diào)用方式實(shí)在有些反人類(lèi)。
- 沒(méi)有管理進(jìn)程。我們調(diào)用一次 msg_cache:start_one/1 就啟動(dòng)了一個(gè)msg_cache, 但是現(xiàn)在我不知道當(dāng)前已經(jīng)啟動(dòng)了幾個(gè) msg_cache.
我們來(lái)解決這第一個(gè)問(wèn)題,重新整理一下代碼:
-module(msg_cache).
%% APIs
-export([start_one/1,
get_name/1,
get_length/1,
pop/1,
set_name/2,
push/2
]).
%% for spawns
-export([loop/1]).
-define(API_TIMEOUT, 3000).
-record(state, {
name,
length = 0,
buff = []
}).
start_one(BuffName) ->
Pid = spawn(msg_cache, loop, [#state{name=BuffName}]),
io:format("Buff ~s created! Pid = ~p~n", [BuffName, Pid]),
Pid.
%% 加了這幾個(gè) API
get_name(CacheID) ->
call(CacheID, {get_name, self()}).
get_length(CacheID) ->
call(CacheID, {get_length, self()}).
set_name(CacheID, NewName) ->
call(CacheID, {set_name, NewName, self()}).
pop(CacheID) ->
call(CacheID, {pop, self()}).
push(CacheID, Msg) ->
call(CacheID, {push, Msg, self()}).
%% 由于發(fā)送和接受消息的處理方面,各個(gè) API 都差不多,就提取出來(lái)專(zhuān)門(mén)寫(xiě)個(gè) call 函數(shù),提高代碼復(fù)用。
call(Pid, Request) ->
Pid ! Request,
receive
Response -> Response
after ?API_TIMEOUT ->
{error, api_timeout}
end.
%% loop 這一部分我們沒(méi)改動(dòng)任何代碼
loop(State = #state{name = Name, length = Len, buff = Buff}) ->
receive
{get_name, From}->
From ! {ok, Name},
loop(State);
{get_length, From}->
From ! {ok, Len},
loop(State);
{set_name, NewName, From} ->
From ! ok,
loop(State#state{name = NewName});
{push, Msg, From} ->
From ! ok,
loop(State#state{buff = [Msg | Buff], length = Len + 1});
{pop, From} ->
case Buff of
[] ->
From ! {error, empty},
loop(State);
[TopMsg | Msgs] ->
From ! {ok, TopMsg},
loop(State#state{buff = Msgs, length = Len - 1})
end;
_Unsupported ->
erlang:error(io_libs:format("unsupported msg: ", [_Unsupported]) )
end.
再試一下:
1> PID = msg_cache:start_one("cache_worker_1").
Buff cache_worker_1 created! Pid = <0.62.0>
<0.62.0>
2> msg_cache:get_name(PID).
{ok,"cache_worker_1"}
3> msg_cache:get_length(PID).
{ok,0}
4> msg_cache:pop(PID).
{error,empty}
5> msg_cache:push(PID, "msg1").
ok
6> msg_cache:push(PID, "msg2").
ok
7> msg_cache:get_length(PID).
{ok,2}
8> msg_cache:pop(PID).
{ok,"msg2"}
9> msg_cache:pop(PID).
{ok,"msg1"}
10> msg_cache:pop(PID).
{error,empty}
11> msg_cache:get_length(PID).
{ok,0}
還闊以吧?
留個(gè)作業(yè)
上面那個(gè) "管理進(jìn)程" 我們沒(méi)有實(shí)現(xiàn)。你來(lái)實(shí)現(xiàn)它。
我想這么調(diào)用:
%% 啟動(dòng)兩個(gè) worker:
1> msg_cache:start_cache_workers(["c_worker_1", "c_worker_2"]).
[<0.62.0>, <0.65.0>]
%% 列出所有 workers, 返回值是個(gè) worker 列表, 元素展示了每個(gè) worker 的 name, pid, 和 length 。
2> CachePidList = msg_cache:list_cache_workers().
[{"c_worker_1", <0.62.0>, 0}, {"c_worker_2", <0.65.0>, 0}]
%% 負(fù)載均衡, 會(huì)往隨機(jī)的一個(gè) cache worker 里 push.
%% 注意我這里調(diào)用 msg_cache:push 的時(shí)候,沒(méi)有提供某個(gè) cache worker 的 PID
3> ok = msg_cache:push("msg1").
ok
4> ok = msg_cache:push("msg2").
ok
5> CachePidList = msg_cache:list_cache_workers().
[{"c_worker_1", <0.62.0>, 1}, {"c_worker_2", <0.65.0>, 1}]
%% 至于 pop 不用管順序了,有消息就隨便 pop 出一個(gè)來(lái)。
4> msg_cache:pop().
{ok, "msg1"}
提示:
- erlang:register/2 可以給一個(gè) PID 注冊(cè)一個(gè)名字,以后使用這個(gè) PID 就可以使用這個(gè)名字代替。比如
register(msg_cache_manger, Pid).
msg_cache:list_cache_workers() ->
msg_cache_manger ! get_all_workers.
課后必讀文章
Erlang 中的錯(cuò)誤處理機(jī)制, Link、Monitor:
Errors and Processes
ETS
ETS (Erlang Term Storage) 是設(shè)計(jì)來(lái)存放大量的 Erlang 數(shù)據(jù)的。跟 ETS 打交道不用消息格式轉(zhuǎn)換,可直接存放 Erlang 數(shù)據(jù)格式 (erlang 各種數(shù)據(jù)格式的統(tǒng)稱(chēng)叫做 erlang terms)。
ETS 非??欤L問(wèn)時(shí)間是常數(shù)級(jí)的,自動(dòng)幫你解決了多進(jìn)程訪問(wèn)的各種競(jìng)態(tài)條件問(wèn)題,讓我們?cè)?Erlang 中做并發(fā)編程一身輕松。ETS 是非常優(yōu)秀的緩存系統(tǒng),是我們開(kāi)發(fā)中不可或缺的利器之一。這比起用某種流行語(yǔ)言來(lái)說(shuō),舒服太多[1]。
ETS 只將數(shù)據(jù)存儲(chǔ)在內(nèi)存里,如果想保存到磁盤(pán),或者要在多個(gè) Erlang Node 之間共享數(shù)據(jù),OTP 基于 ETS 和 DETS 實(shí)現(xiàn)了 mnesia.
NODE: mnesia 只適合用來(lái)做緩存,在多個(gè) Node 之間共享少量數(shù)據(jù),非??焖佟5遣⒉贿m合當(dāng)做數(shù)據(jù)庫(kù)存儲(chǔ)大量的數(shù)據(jù),因?yàn)?mnesia 在啟動(dòng)時(shí)會(huì)加載所有數(shù)據(jù)到內(nèi)存里,導(dǎo)致啟動(dòng)緩慢、新節(jié)點(diǎn)加入緩慢。并且 mnesia 是強(qiáng)一致性的數(shù)據(jù)庫(kù),其本身并不處理由于集群腦裂導(dǎo)致的不一致性,這可能不太符合你的預(yù)期。
ETS 支持幾種數(shù)據(jù)類(lèi)型:
- set: set 是普通的 key - value 存儲(chǔ)類(lèi)型,一個(gè) ETS table 里,兩個(gè)數(shù)據(jù)的 key 不能相同。重復(fù)插入 key 相同的兩條數(shù)據(jù),后面的那條會(huì)覆蓋前面的那條。
- ordered_set: 有序的 set 表。
- bag: bag 允許多個(gè) key 相同的數(shù)據(jù)的存在,但 key, value 都完全相同的數(shù)據(jù)只能留一個(gè)。
- duplicate_bag: 允許多個(gè) key, value 完全相同的數(shù)據(jù)的存在。
我們來(lái)試試 set 類(lèi)型的 table,這也是最常用的類(lèi)型。我們創(chuàng)建一個(gè)命名表,叫 users, 然后插入兩條數(shù)據(jù):
1> ets:new(users, [set, named_table]).
users
2> ets:info(users). %% 注意默認(rèn)的權(quán)限是 protected
[{read_concurrency,false},
{write_concurrency,false},
{compressed,false},
{memory,304},
{owner,<0.57.0>},
{heir,none},
{name,users},
{size,0},
{node,nonode@nohost},
{named_table,true},
{type,set},
{keypos,1},
{protection,protected}]
3> ets:insert(users, {1, <<"Shawn">>, 27}).
true
4> ets:insert(users, {2, <<"Scarlett">>, 25}).
true
5> ets:lookup(users, 1).
[{1,<<"Shawn">>,27}]
6> ets:lookup(users, 2).
[{2,<<"Scarlett">>,25}]
7> ets:info(users).
[{read_concurrency,false},
{write_concurrency,false},
{compressed,false},
{memory,332},
{owner,<0.57.0>},
{heir,none},
{name,users},
{size,2},
{node,nonode@nohost},
{named_table,true},
{type,set},
{keypos,1},
{protection,protected}]
8>
注意上邊的示例里:
- 創(chuàng)建 ETS table 時(shí)給了兩個(gè) Options 參數(shù):[set, named_table]。set 是指定創(chuàng)建 set 類(lèi)型的表,named_table 是創(chuàng)建命名表,命名為
users,后面可以用這個(gè)表名來(lái)引用。 - 插入數(shù)據(jù)
{1, <<"Shawn">>, 27}和{2, <<"Scarlett">>, 25}時(shí),兩個(gè) tuple 的第一項(xiàng)就是默認(rèn)的 key,tuple 里其他項(xiàng)都是 values。如果你想用其他的項(xiàng)作為 key,可以在 ets:new 的時(shí)候,指定{keypos, Pos}參數(shù),設(shè)置 key 在 tuple 中的位置。
ETS 表的其他類(lèi)型你可以自己試驗(yàn)一下。
需要注意的是:
- ETS 表里的任何數(shù)據(jù)都不參加 GC
- ETS 表有自己的
owner進(jìn)程,默認(rèn)情況下,創(chuàng)建表的那個(gè)進(jìn)程就是 ETS table 的 owner。如果 owner 進(jìn)程掛了,ETS 表也就被釋放了。我們上邊的例子里,erlang shell 進(jìn)程就是usertable 的 owner。 - ETS 表也是有訪問(wèn)權(quán)限的,默認(rèn)是
protected:- public:任何人可以讀寫(xiě)這張表。
- protected: owner 可以讀寫(xiě),但其他進(jìn)程只能讀。
- private:只有 owner 可以讀寫(xiě)。別的進(jìn)程無(wú)法訪問(wèn)。
由于 ETS 表非常高效,一般情況下我們都直接使用 public,然后設(shè)置 {read_concurrency, true} 或 {write_concurrency,true} 選項(xiàng)來(lái)提高并發(fā)讀或?qū)懙男?,在?xiě)一個(gè)管理模塊來(lái)直接訪問(wèn) ets 表,讓什么封裝什么設(shè)計(jì)模式都去 shi。
OTP
OTP 已經(jīng)失去了字面意思,基本上指的就是 Erlang 生態(tài)環(huán)境的官方部分。Erlang 世界的組成是這樣的:
- Erlang 以及 Elixir 等語(yǔ)言。
- 工具和函數(shù)庫(kù),包括 erlang runtime,kernel,stdlib(像 lists 這種的官方庫(kù)), sasl, 還有像 ETS,dbg 之類(lèi)的很多。
- 系統(tǒng)設(shè)計(jì)原則, 包括本章要講的一眾 Behaviors。是一堆應(yīng)用于并發(fā)世界的設(shè)計(jì)模式,他們包含了解決通用問(wèn)題的通用代碼。
- 開(kāi)源社區(qū)生態(tài)環(huán)境,包括各種開(kāi)源軟件和社區(qū)。
OTP 指的是前三個(gè),Elixir 的話還不大算。
Erlang 的邏輯是,架構(gòu)的設(shè)計(jì)應(yīng)該由有經(jīng)驗(yàn)的人負(fù)責(zé),由專(zhuān)家做好基礎(chǔ)代碼框架,解決好最困難的問(wèn)題。而使用者只需要寫(xiě)自己的邏輯代碼。這就是 OTP behaviors,他們已經(jīng)在通信、互聯(lián)網(wǎng)領(lǐng)域,經(jīng)歷了幾十年的戰(zhàn)火考驗(yàn)。
本文要講的有三個(gè):
- gen_server
- application
- supervisor
本章只講解 gen_server。 application 和 supervisor 放到后面 Hello World 工程里講解。
gen_server 要解決的問(wèn)題,就是我們上面那個(gè) msg_cache 面臨的問(wèn)題:怎樣做一個(gè)服務(wù)來(lái)響應(yīng)用戶的請(qǐng)求。
我們之前寫(xiě)的代碼很短,可以工作,但是很多東西都沒(méi)有考慮。比如請(qǐng)求者如果同時(shí)收到來(lái)自服務(wù)端的兩個(gè) Response 的話,不知道是對(duì)應(yīng)哪個(gè)請(qǐng)求的:
%% 服務(wù)端:
{get_name, From}->
From ! {ok, Name},
loop(State);
{get_length, From}->
From ! {ok, Len},
loop(State);
%% 客戶端:
ServerPID ! {get_length, self()}, %% 客戶端連續(xù)調(diào)用了兩次
ServerPID ! {get_length, self()},
receive
{ok, Len} -> %% 你知道這次匹配到的消息,是上面哪次調(diào)用的回復(fù)嗎?
success;
_ ->
failed
end.
上面代碼里連續(xù)調(diào)用了兩次 {get_length}, 但是由于發(fā)送消息是異步的,消息通過(guò)網(wǎng)絡(luò)回來(lái),你并不能確定第一次收到的回復(fù)就是第一次調(diào)用產(chǎn)生的。
這個(gè)問(wèn)題可以加一個(gè)隨機(jī)生成的 RequestID 的字段來(lái)解決,客戶端發(fā)送請(qǐng)求消息的時(shí)候帶 RequestID 過(guò)去,服務(wù)端返回的時(shí)候再傳回來(lái)。客戶端通過(guò)匹配 RequestID,就能知道當(dāng)前的回復(fù)是對(duì)應(yīng)的哪個(gè)請(qǐng)求。
但這種需求其實(shí)是通用的,你現(xiàn)在寫(xiě) msg_cache 用得到,改天寫(xiě)其他代碼也一樣用得到。另外我們也沒(méi)有過(guò)多考慮異常的情況:如果程序崩潰了怎么辦?發(fā)送消息怎么知道對(duì)方是不是還活著?
諸如此類(lèi)的問(wèn)題應(yīng)該由專(zhuān)家來(lái)解決,所以我們有了 gen_server.
gen_server 的模板是這樣的:
-module(gen_server_demo).
-behaviour(gen_server).
%% API functions
-export([start_link/0]).
%% gen_server callbacks
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-record(state, {}).
%%%% %%%% %%%% %%%% %%%%
%%%% 這是給客戶端調(diào)用的接口部分
%%%% %%%% %%%% %%%% %%%%
%% 啟動(dòng)一個(gè)服務(wù),后臺(tái)會(huì)啟動(dòng)一個(gè) erlang process, 并進(jìn)入 loop 函數(shù), 回想一下我們實(shí)現(xiàn) msg_cache 時(shí)寫(xiě)的那個(gè) loop/1.
%% 但是這個(gè) loop 函數(shù)屬于通用部分的代碼,是由 OTP 官方實(shí)現(xiàn)的,所以代碼不在這里,在 OTP 代碼的 lib/stdlib/src/gen_server.erl 里。
start_link() ->
%% gen_server:start_link 啟動(dòng) process, 然后將 process 注冊(cè)在當(dāng)前
%% node 上,注冊(cè)名字就是當(dāng)前 Module 名:gen_server_demo
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
%%%% %%%% %%%% %%%% %%%%
%%%% 這是 gen_server 發(fā)生某事件時(shí)的回調(diào)函數(shù)部分
%%%% %%%% %%%% %%%% %%%%
%% gen_server:start_link 被調(diào)用,服務(wù)啟動(dòng)時(shí),回調(diào) init/1
init([]) ->
{ok, #state{}}.
%% gen_server:call 被調(diào)用。gen_server:call 是“同步”調(diào)用,調(diào)用方可以設(shè)置一個(gè)超時(shí)時(shí)間。
%% 返回值里的 Reply 是返回給調(diào)用者的內(nèi)容。
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
%% gen_server:cast 被調(diào)用。gen_server:cast 是“異步”調(diào)用。
%% 調(diào)用者一般是想發(fā)一個(gè)消息給我們的 gen_server,然后繼續(xù)做自己的事情,他不想收到來(lái)自 gen_server 的回復(fù)。
handle_cast(_Msg, State) ->
{noreply, State}.
%% gen_server 進(jìn)程收到一個(gè)普通 erlang 消息:一個(gè)不是通過(guò) gen_server:call 和 gen_server:cast 發(fā)來(lái)的消息。
handle_info(_Info, State) ->
{noreply, State}.
%% 上面的三個(gè)函數(shù) handle_call, handle_cast, handle_info
%% 都可以返回一個(gè) {stop, Reason, State},這樣的話 gen_server 會(huì)退出。
%% 但退出之前,可能會(huì)回調(diào) terminate(_Reason, _State)。
%% gen_server 將要退出時(shí),回調(diào) terminate/2.
%% 注意
%% 1) 要想 terminate 在 gen_server 退出前被回調(diào),gen_server 必須捕獲退出信號(hào):
%% 需要在 init 回調(diào)里,加這么一行:process_flag(trap_exit, true).
%% 2) 有幾個(gè)特定的 Reason 被認(rèn)為是正常退出:normal, shutdown, or {shutdown,Term},
%% 其他的 Reason,sasl 是會(huì)報(bào)錯(cuò)打日志的。
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
gen_server 真正的進(jìn)程代碼在 OTP 庫(kù)里,運(yùn)行 start_link(),gen_server 就在后臺(tái)跑起來(lái)了。你需要實(shí)現(xiàn)的只是這個(gè)模板里的各個(gè)回調(diào)函數(shù),將你的業(yè)務(wù)邏輯放到這些回調(diào)里。
仔細(xì)看一下上面的 gen_server 模板和注釋?zhuān)_保你能完全理解。
我不想重新實(shí)現(xiàn)之前的 msg_cache,一點(diǎn)都不酷。我們重新寫(xiě)個(gè)其他的,讓你對(duì) Erlang 程序的基本設(shè)計(jì)理念有更深的印象。
我們要實(shí)現(xiàn)一個(gè)多用戶聊天的程序:
- 用戶能夠查詢?cè)诰€的其他用戶。
- 用戶之間能夠聊天。
- 要容易擴(kuò)展,因?yàn)楹竺嫖覀兊?Client 會(huì)通過(guò)TCP、WebSocket 等連接上來(lái),不會(huì)是 Erlang 寫(xiě)的 Client。
- 要容易伸縮,因?yàn)槲覀儤I(yè)務(wù)發(fā)展很快,用戶量會(huì)越來(lái)越大,我們希望程序能很容易的部署在多臺(tái)服務(wù)器上。
先來(lái)設(shè)計(jì)我們程序的架構(gòu):

- 每個(gè) client 連接上來(lái),都會(huì)啟動(dòng)一個(gè)新的 Process,叫做 ChatServer.
- ChatServer 負(fù)責(zé)維護(hù)這個(gè) Client 的 TCP 連接。
- Route 是一個(gè)Module,它提供了數(shù)據(jù)庫(kù)的管理,數(shù)據(jù)庫(kù)里維護(hù)了從 User 到其 ChatServer 的 PID 的映射關(guān)系。
注意我們的設(shè)計(jì)思想:
- 為每一個(gè)連接上來(lái)的請(qǐng)求啟動(dòng)一個(gè) Erlang 進(jìn)程 "ChatServer",不要擔(dān)心進(jìn)程個(gè)數(shù),百萬(wàn)也沒(méi)問(wèn)題。
- 兩個(gè)用戶之間的消息傳遞,體現(xiàn)在服務(wù)端就是兩個(gè) "ChatServer" 之間的 Erlang 消息傳遞。
- Route 部分只是一個(gè) Module,不是進(jìn)程。每一個(gè) ChatServer 調(diào)用 Route 里的代碼的時(shí)候,執(zhí)行過(guò)程其實(shí)是在每個(gè) ChatServer 進(jìn)程內(nèi)部的。這樣我們就避免了集中向一個(gè)進(jìn)程發(fā)送消息帶來(lái)的瓶頸。我們把這種瓶頸的處理留給了 ETS 來(lái)解決。
- 如何伸縮?ChatServer 在不在同一個(gè)服務(wù)器上沒(méi)什么關(guān)系。
ChatServerPID !{send, Msg}會(huì)將消息發(fā)送到ChatServerPID,即使 ChatServerPID 在遠(yuǎn)端的服務(wù)器上。分布式部署的時(shí)候,這行代碼根本不用改,你要做的僅僅是添加一個(gè)新的 Erlang Node。分布式 Erlang 后面還要講。 - 如何擴(kuò)展?ETS 使用 Route Module 管理,為的就是當(dāng)以后換用其他的緩存數(shù)據(jù)庫(kù)的時(shí)候簡(jiǎn)單一些。我們?cè)O(shè)想后面為了做分布式集群,要用 mnesia 替代 ETS,只需要寫(xiě)一個(gè)新的 Route Module,內(nèi)部改用 mnesia 存儲(chǔ),然后替換線上已經(jīng)加載的老的 Route Module。線上系統(tǒng)都不用停止,客戶端的連接一個(gè)都不會(huì)斷!
你現(xiàn)在能否體會(huì)到 Erlang 的實(shí)用主義呢?完全沒(méi)廢話,就是解決問(wèn)題!
Client 部分我們現(xiàn)在不做,讓前端的同學(xué)幫我們實(shí)現(xiàn)。但假設(shè)我們的前端程序員還沒(méi)到崗,所以我們可以先放著 WebSocket 部分后面再做。但有兩個(gè)過(guò)程必須現(xiàn)在實(shí)現(xiàn):
- 當(dāng) Client 登錄時(shí),我們需要使用 Route 注冊(cè) user 所在的 ChatServer 的 PID。
- 當(dāng) Client 發(fā)消息時(shí),我們需要使用 Route 查找對(duì)方的 ChatServer 的 PID。
首先我們來(lái)定義我們的消息協(xié)議。我們的消息體內(nèi)包含幾部分,發(fā)送者ID,接收者ID,以及消息內(nèi)容:
-record(msg, {
from_userid,
to_userid,
payload
}).
接下來(lái)讓我們來(lái)實(shí)現(xiàn) Route 模塊,實(shí)現(xiàn)數(shù)據(jù)庫(kù)創(chuàng)建,注冊(cè),查找與注銷(xiāo)功能:
-module(route).
-export([ensure_db/0,
lookup_server/1,
register_server/2,
unregister_server/1]).
ensure_db() ->
case ets:info(servers) of
undefined ->
%% 為了演示方便,我們啟動(dòng)一個(gè)臨時(shí)進(jìn)程來(lái)創(chuàng)建 ETS 表,
%% 如果直接在 erlang shell 里創(chuàng)建ETS的話,出錯(cuò)時(shí) shell 的崩潰連帶著我們的ETS也丟了。
%% 當(dāng)然線上系統(tǒng)不會(huì)這么做。
spawn(fun() -> ets:new(servers, [named_table, public]), receive after infinity->ok end end);
_ -> ok
end.
lookup_server(UserID) ->
case ets:lookup(servers, UserID) of
[{UserID, ServerID}] -> {ok, ServerID};
_ -> {error, no_server}
end.
register_server(UserID, ServerID) ->
ets:insert(servers, {UserID, ServerID}).
unregister_server(UserID) ->
ets:delete(servers, UserID).
接下來(lái)實(shí)現(xiàn)我們的 ChatServer:
-module(chat_server).
-behaviour(gen_server).
%% state 保存用戶的 userid,以及 client 端連上來(lái)的 socket.
-record(state, {
userid,
socket
}).
%% 后面當(dāng)一個(gè)新連接連接到服務(wù)器的時(shí)候,我們會(huì)調(diào)用 start_link 啟動(dòng)一個(gè)新的 ChatServer 為之服務(wù)。
%% 注意這里使用的是 gen_server:start_link/3,沒(méi)有注冊(cè)進(jìn)程名,我們直接使用 PID. 因?yàn)槲覀円獑?dòng)很多個(gè) ChatServer。
start_link(UserID, Socket) ->
{ok, ServerID} = gen_server:start_link(?MODULE, [UserID, Socket], []),
ServerID.
%% 在 init 回調(diào)里注冊(cè)用戶的 ChatServer。
%% 注意我們捕獲了 exit signal, 以便程序退出的時(shí)候回調(diào) terminate/2.
%% 我們?cè)?terminate/2 里取消注冊(cè)。
init([UserID, Socket]) ->
process_flag(trap_exit, true),
route:register_server(UserID, self()),
{ok, #state{userid=UserID, socket=Socket}}.
%% 如果我們的 ChatServer 收到一條來(lái)自 Socket 的消息,它會(huì)收到一條類(lèi)似 {tcp, Sock, Data} 的普通消息。
%% 我們需要在 handle_info 里處理,轉(zhuǎn)發(fā)給對(duì)方的 ChatServer。
handle_info({tcp, #msg{to_userid = ToUserID, payload = Payload} = Msg}, State) ->
io:format("Chat Server(User: ~p) - received msg from tcp client, Msg: ~p~n",[State#state.userid, Msg]),
case route:lookup_server(ToUserID) of
{error, Reason} ->
io:format("Chat Server(User: ~p) - cannot forward to Chat Server(User: ~p): ~p~n",
[State#state.userid, ToUserID, Reason]);
{ok, TargetServerID} ->
io:format("Chat Server(User: ~p) - forward msg to Chat Server(User: ~p), Payload: ~p~n",
[State#state.userid, ToUserID, Payload]),
ok = gen_server:call(TargetServerID, {send, Msg})
end,
{noreply, State};
%% 我們的 ChatServer 收到一條來(lái)自對(duì)端 ChatServer 的轉(zhuǎn)發(fā)請(qǐng)求
handle_call({send, #msg{payload = Payload}}, _From, State) ->
io:format("Chat Server(User: ~p) - deliver msg to tcp client, Payload: ~p~n",
[State#state.userid, Payload]),
send_to_client_via_tcp(State#state.socket, Payload),
{reply, ok, State};
%% Socket 部分我們沒(méi)有實(shí)現(xiàn),暫時(shí)就簡(jiǎn)單打印一下
send_to_client_via_tcp(_Socket, Payload) ->
%gen_tcp:send(_Socket, Payload),
io:format("Sent To Client: ~p~n",[Payload]).
完工了!我們測(cè)試一下:
1> c(chat_server).
{ok,chat_server}
2> c(route).
{ok,route}
%% 現(xiàn)在模擬系統(tǒng)啟動(dòng)時(shí),初始化 DB 的過(guò)程。
%% 后續(xù)這個(gè)會(huì)在啟動(dòng)代碼里寫(xiě)。
3> route:ensure_db().
<0.73.0>
%% 現(xiàn)在我們模擬一個(gè)新的用戶登錄上來(lái),啟動(dòng)新的 ChatServer 的過(guò)程。
%% 后續(xù)這個(gè)過(guò)程當(dāng)然是由 WebSocket 模塊調(diào)用。
4> ServerIDUser1 = chat_server:start_link(<<"user1">>, fake_socket).
<0.75.0>
5> ServerIDUser2 = chat_server:start_link(<<"user2">>, fake_socket).
<0.77.0>
%% 我們來(lái)做一個(gè) #msg{} 消息體。
%% 后續(xù)我們應(yīng)該在收到 socket 上來(lái)的消息解析成功之后,打包一個(gè) #msg{} 消息體。
6> rr("chat_protocol.hrl").
[msg]
7> Msg = #msg{from_userid= <<"user1">>, to_userid = <<"user2">>, payload = <<"hello?">>}.
#msg{from_userid = <<"user1">>,to_userid = <<"user2">>,
payload = <<"hello?">>}
%% 模擬從 socket 收到消息的過(guò)程。
8> ServerIDUser1 ! {tcp, Msg}.
Chat Server(User: <<"user1">>) - received msg from tcp client, Msg: {msg,
<<"user1">>,
<<"user2">>,
<<"hello?">>}
{tcp,#msg{from_userid = <<"user1">>,to_userid = <<"user2">>,
payload = <<"hello?">>}}
Chat Server(User: <<"user1">>) - forward msg to Chat Server(User: <<"user2">>), Payload: <<"hello?">>
Chat Server(User: <<"user2">>) - deliver msg to tcp client, Payload: <<"hello?">>
Sent To Client: <<"hello?">>
9>
我們看到服務(wù)端的路由已經(jīng)走通了,接下來(lái)只要寫(xiě)一個(gè) web socket 模塊,listen 在某個(gè)端口,當(dāng)有連接請(qǐng)求時(shí),像上面第 4、第 5 行一樣調(diào)用 chat_server:start_link/2 就行了。當(dāng)然 send_to_client_via_tcp 也要改為真正往 socket 發(fā)送消息。
完整代碼:
https://github.com/terry-xiaoyu/learn-erlang-in-30-mins/tree/master/chat
一個(gè)完整的線上演示:
(即將上線)
書(shū)接下文:30 分鐘學(xué) Erlang (三)
-
Golang 里你需要自己找多線程安全的 maps 庫(kù),寫(xiě)并發(fā)沒(méi)有安全感。Golang 官方也沒(méi)有下文要說(shuō)到的 OTP 里提供的各種 Behavior,代碼寫(xiě)起來(lái)天馬行空最后一團(tuán)糟。然后又不能支持函數(shù)式的 pattern matching 等寫(xiě)法... 總之用 golang 寫(xiě)代碼從來(lái)不會(huì)給人愉快的感覺(jué)。流行是流行的,但那叫“普通”吧?第一次這么吐槽 golang,但這篇是 erlang 的教程,應(yīng)該不算過(guò)分吧。等到寫(xiě) go 的時(shí)候我再來(lái)吐槽 erlang 。我是不會(huì)寫(xiě) go 的 ... ?