30 分鐘學(xué) Erlang (二)

并發(fā)

創(chuàng)建進(jìn)程

使用 erlang:spawn/1,2,3,4 用來(lái)創(chuàng)建一個(gè) erlang 進(jìn)程。Erlang 進(jìn)程不是操作系統(tǒng)的進(jìn)程,而是類(lèi)似其他語(yǔ)言里“協(xié)程”的概念,它由 Erlang 虛擬機(jī)調(diào)度。本文以后說(shuō)“進(jìn)程”,就是指 Erlang 進(jìn)程。

進(jìn)程之間是互相獨(dú)立的,一個(gè)進(jìn)程要想與另外一個(gè)進(jìn)程通信,就必須通過(guò)消息傳遞。消息會(huì)被發(fā)送到對(duì)方進(jìn)程的信箱存儲(chǔ)起來(lái),對(duì)方進(jìn)程可以在合適的時(shí)間,按照自定的順序讀取信箱里的消息。

Erlang 里進(jìn)程非常輕量,啟動(dòng)速度很快,并且可以同時(shí)運(yùn)行千千萬(wàn)萬(wàn)個(gè),默認(rèn)的進(jìn)程個(gè)數(shù)上限是 262144 ,但可以在啟動(dòng)時(shí)使用 erl +P 修改這個(gè)配置。

1> HelloParallel = fun() -> io:format("hello parallel!~n") end.
#Fun<erl_eval.20.99386804>
2> spawn(HelloParallel).  %% spawn/1 BIF 接受一個(gè)函數(shù)做為參數(shù)。
hello parallel!
<0.63.0>
3> PID = pid(0,63,0).   %% 使用 pid 來(lái)生成一個(gè) PID
4> is_pid(PID).  %% 檢查是否是 PID 類(lèi)型
true
5> is_process_alive(PID). %%  檢查 Process 是否還活著。顯示 false 是因?yàn)樗呀?jīng)運(yùn)行完成終止了。
false

spawn 函數(shù)返回一個(gè)新進(jìn)程的 pid,我們可以使用這個(gè) pid 與其交互。

erlang shell 也是有 pid 的。前面說(shuō)到一個(gè)運(yùn)行時(shí)錯(cuò)誤會(huì)使得當(dāng)前的shell 進(jìn)程崩潰,并重新啟動(dòng)一個(gè)新的進(jìn)程,我們驗(yàn)證一下:

1> self().   %% self/1 返回當(dāng)前進(jìn)程的 pid
<0.60.0>
2> 1 = 2.
** exception error: no match of right hand side value 2
3> self().
<0.63.0>

消息發(fā)送和接收

使用消息發(fā)送運(yùn)算符 ! 發(fā)送消息。

4> self() ! "hello".    %% 向自己所在的進(jìn)程發(fā)送一個(gè) List 類(lèi)型的 "hello". `!` 操作的返回值是消息內(nèi)容, "hello".
"hello"
5> flush().  %% flush() 將當(dāng)前 process 的信箱里的所有消息清空并打印。
Shell got "hello"
ok

receive ... end 語(yǔ)句使用 pattern matching 來(lái)從自己進(jìn)程的信箱里讀取消息,可以使用 after 語(yǔ)句來(lái)設(shè)置等待超時(shí)時(shí)間:

1> self() ! "msg1".
"msg1"
2> self() ! "msg2".
"msg2"
3> self() ! "msg3".
"msg3"
4> receive Msg -> Msg after 3000 -> no_more end. %% 讀取任意消息并返回這條消息,如果信箱里沒(méi)有消息了,等待 3 秒后結(jié)束并返回 no_more.
"msg1"
5> receive Msg -> Msg after 3000 -> no_more end.  %% 后面這兩條為什么返回 no_more ? 不應(yīng)該是 "msg2", "msg3" 嗎?
no_more
6> receive Msg -> Msg after 3000 -> no_more end.
no_more

上面的第 4 行 receive 語(yǔ)句里,erlang shell 進(jìn)程查看郵箱,查到第一個(gè)消息是 "msg1", Msg 被綁定為 "msg1"。再次運(yùn)行 receive 語(yǔ)句的時(shí)候,由于 Msg 的值已經(jīng)為 "msg1",與信箱里的 "msg2", "msg3" 都不匹配,所以后面兩條 receive 語(yǔ)句都沒(méi)有從信箱里讀取新消息,"msg2" 和 "msg3" 仍然存儲(chǔ)在信箱里:

16> flush().
Shell got "msg2"
Shell got "msg3"
ok

注意雖然后面兩個(gè) receive 語(yǔ)句都沒(méi)有從信箱里讀取消息,但在 receive 語(yǔ)句的執(zhí)行過(guò)程中,它仍然是從頭到尾遍歷了整個(gè)郵箱,并嘗試拿郵箱里的各個(gè)消息跟代碼里的 Msg 進(jìn)行匹配,這是消耗資源的,等后面消息堆積越多越麻煩。這個(gè)叫 Selective Message Reception. 消息的讀取順序是接收方?jīng)Q定的。

所以一般情況下我們?cè)谧x取信箱消息時(shí),讀到我們不感興趣的消息也取出來(lái),打個(gè) error log 然后扔掉它,不要讓它一直在信箱里耗費(fèi)資源。

在 Erlang shell 已經(jīng)伸展不開(kāi)拳腳了。讓我們來(lái)寫(xiě)個(gè)復(fù)雜點(diǎn)的程序:
我們的程序?qū)崿F(xiàn)一個(gè) 消息緩存,具體需求是:

  • 我們需要一個(gè)消息棧,用于存儲(chǔ)用戶發(fā)來(lái)的消息。
  • 考慮到用戶發(fā)來(lái)的消息可能有很多,我們需要好幾個(gè)這樣的消息棧來(lái)分擔(dān)負(fù)載。
  • 我們還想能夠給消息棧命名,以便區(qū)分。
-module(msg_cache).

%% APIs
-export([start_one/1]).

%% for spawns
-export([loop/1]).

%% 定義進(jìn)程的 state。
%% 我們一般說(shuō),一個(gè)服務(wù)、或 “對(duì)象” 會(huì)維護(hù)自己內(nèi)部的 '狀態(tài)'
%% 狀態(tài)可能是一個(gè)字符串緩存,可能是某個(gè)資源的引用,這個(gè)跟業(yè)務(wù)相關(guān)。
%% 狀態(tài)存在于內(nèi)存中,跟外界隔離,通過(guò) API 接口與外界交互。
%% 面向?qū)ο笳Z(yǔ)言里用 類(lèi)和對(duì)象來(lái)存儲(chǔ)狀態(tài),Erlang 里我們用 process。
%% 所以我們又說(shuō) Erlang 是 “面向Process 編程的”
-record(state, {
            name,      %% 消息棧的名字
            length = 0,  %% 消息棧長(zhǎng)度
            buff = []   %% 消息棧的存儲(chǔ)列表
         }).

loop(State = #state{name = Name, length = Len, buff = Buff}) ->
  receive
    {get_name, From}->
      From ! {ok, Name},
      loop(State);
    {get_length, From}->
      From ! {ok, Len},
      loop(State);
    {set_name, NewName, From} ->
      From ! ok,
      loop(State#state{name = NewName});
    {push, Msg, From} ->
      From ! ok,
      loop(State#state{buff = [Msg | Buff], length = Len + 1});
    {pop, [], From} ->
      From ! {error, empty},
      loop(State);
    {pop, [TopMsg | Msgs], From} ->
      From ! {ok, TopMsg},
      loop(State#state{buff = Msgs, length = Len - 1});
    _Unsupported ->
      erlang:error(io_libs:format("unsupported msg: ", [_Unsupported]) )
  end.

start_one(BuffName) ->
  %% 啟動(dòng)一個(gè)消息棧,并返回其 PID
  Pid = spawn(msg_cache, loop, [#state{name=BuffName}]),
  io:format("Buff ~s created! Pid = ~p~n", [BuffName, Pid]),
  Pid

其實(shí)除了 loop/1 長(zhǎng)一點(diǎn),其他的都挺容易理解的。
注意 loop/1 里的每個(gè)分支的最后一個(gè)語(yǔ)句都是尾遞歸,意味著只要不出錯(cuò),loop/1 就一直循環(huán)下去,所以進(jìn)程就不會(huì)停止。

思考:如果把上面代碼里 receive 語(yǔ)句的最后一個(gè) _Unsupported -> 分支刪掉的話,會(huì)發(fā)生什么?

receive 語(yǔ)句里,接受消息時(shí),都要求消息發(fā)送方將自己的 Pid 帶過(guò)來(lái),放到 From 變量里,以便我們回復(fù)消息給對(duì)方。

我們來(lái)試試:

1> PID = msg_cache:start_one("cache2").
Buff cache2 created! Pid = <0.62.0>
<0.62.0>
2> PID ! {get_length, self()}.
{get_length,<0.60.0>}
3> flush().
Shell got {ok,0}
ok

4> PID ! {pop, self()}.
{pop,<0.60.0>}
5> flush().
Shell got {error,empty}
ok

6> PID ! {push, "msg1", self()}.
{push,"msg1",<0.60.0>}
7> PID ! {push, "msg2", self()}.
{push,"msg2",<0.60.0>}
8> PID ! {push, "msg3", self()}.
{push,"msg3",<0.60.0>}
9> PID ! {get_length, self()}.
{get_length,<0.60.0>}
10> flush().
Shell got ok
Shell got ok
Shell got ok
Shell got {ok,3}
ok

11> PID ! {pop, self()}.
{pop,<0.60.0>}
12> flush().
Shell got {ok,"msg3"}
ok

13> PID ! {get_length, self()}.
{get_length,<0.60.0>}
14> flush().
Shell got {ok,2}
ok

繼續(xù)往下閱讀之前,仔細(xì)看一下這個(gè)例子,確保你完全理解了這段代碼。

挺厲害的吧?但我們還有兩個(gè)問(wèn)題沒(méi)有解決:

  • 沒(méi)有一個(gè)易用易維護(hù)的 API。 PID ! {get_length, self()}. 這種調(diào)用方式實(shí)在有些反人類(lèi)。
  • 沒(méi)有管理進(jìn)程。我們調(diào)用一次 msg_cache:start_one/1 就啟動(dòng)了一個(gè)msg_cache, 但是現(xiàn)在我不知道當(dāng)前已經(jīng)啟動(dòng)了幾個(gè) msg_cache.

我們來(lái)解決這第一個(gè)問(wèn)題,重新整理一下代碼:

-module(msg_cache).

%% APIs
-export([start_one/1,
         get_name/1,
         get_length/1,
         pop/1,
         set_name/2,
         push/2
        ]).

%% for spawns
-export([loop/1]).

-define(API_TIMEOUT, 3000).

-record(state, {
            name,
            length = 0,
            buff = []
         }).

start_one(BuffName) ->
  Pid = spawn(msg_cache, loop, [#state{name=BuffName}]),
  io:format("Buff ~s created! Pid = ~p~n", [BuffName, Pid]),
  Pid.

%% 加了這幾個(gè) API
get_name(CacheID) ->
  call(CacheID, {get_name, self()}).
get_length(CacheID) ->
  call(CacheID, {get_length, self()}).
set_name(CacheID, NewName) ->
  call(CacheID, {set_name, NewName, self()}).
pop(CacheID) ->
  call(CacheID, {pop, self()}).
push(CacheID, Msg) ->
  call(CacheID, {push, Msg, self()}).

%% 由于發(fā)送和接受消息的處理方面,各個(gè) API 都差不多,就提取出來(lái)專(zhuān)門(mén)寫(xiě)個(gè) call 函數(shù),提高代碼復(fù)用。
call(Pid, Request) ->
  Pid ! Request,
  receive
    Response -> Response
  after ?API_TIMEOUT ->
    {error, api_timeout}
  end.

%% loop 這一部分我們沒(méi)改動(dòng)任何代碼
loop(State = #state{name = Name, length = Len, buff = Buff}) ->
  receive
    {get_name, From}->
      From ! {ok, Name},
      loop(State);
    {get_length, From}->
      From ! {ok, Len},
      loop(State);
    {set_name, NewName, From} ->
      From ! ok,
      loop(State#state{name = NewName});
    {push, Msg, From} ->
      From ! ok,
      loop(State#state{buff = [Msg | Buff], length = Len + 1});
    {pop, From} ->
      case Buff of
        [] ->
          From ! {error, empty},
          loop(State);
        [TopMsg | Msgs] ->
          From ! {ok, TopMsg},
          loop(State#state{buff = Msgs, length = Len - 1})
      end;
    _Unsupported ->
      erlang:error(io_libs:format("unsupported msg: ", [_Unsupported]) )
  end.

再試一下:

1> PID = msg_cache:start_one("cache_worker_1").
Buff cache_worker_1 created! Pid = <0.62.0>
<0.62.0>
2> msg_cache:get_name(PID).
{ok,"cache_worker_1"}
3> msg_cache:get_length(PID).
{ok,0}
4> msg_cache:pop(PID).
{error,empty}
5> msg_cache:push(PID, "msg1").
ok
6> msg_cache:push(PID, "msg2").
ok
7> msg_cache:get_length(PID).
{ok,2}
8> msg_cache:pop(PID).
{ok,"msg2"}
9> msg_cache:pop(PID).
{ok,"msg1"}
10> msg_cache:pop(PID).
{error,empty}
11> msg_cache:get_length(PID).
{ok,0}

還闊以吧?

留個(gè)作業(yè)

上面那個(gè) "管理進(jìn)程" 我們沒(méi)有實(shí)現(xiàn)。你來(lái)實(shí)現(xiàn)它。

我想這么調(diào)用:

%% 啟動(dòng)兩個(gè) worker:
1> msg_cache:start_cache_workers(["c_worker_1", "c_worker_2"]).
[<0.62.0>, <0.65.0>]

%% 列出所有 workers, 返回值是個(gè) worker 列表, 元素展示了每個(gè) worker 的 name, pid, 和 length 。
2> CachePidList = msg_cache:list_cache_workers().
[{"c_worker_1", <0.62.0>, 0}, {"c_worker_2", <0.65.0>, 0}]

%% 負(fù)載均衡, 會(huì)往隨機(jī)的一個(gè) cache worker 里 push.
%% 注意我這里調(diào)用 msg_cache:push 的時(shí)候,沒(méi)有提供某個(gè) cache worker 的 PID
3> ok = msg_cache:push("msg1").
ok
4> ok = msg_cache:push("msg2").
ok
5> CachePidList = msg_cache:list_cache_workers().
[{"c_worker_1", <0.62.0>, 1}, {"c_worker_2", <0.65.0>, 1}]

%% 至于 pop 不用管順序了,有消息就隨便 pop 出一個(gè)來(lái)。
4> msg_cache:pop().
{ok, "msg1"}

提示:

  • erlang:register/2 可以給一個(gè) PID 注冊(cè)一個(gè)名字,以后使用這個(gè) PID 就可以使用這個(gè)名字代替。比如
register(msg_cache_manger, Pid).

msg_cache:list_cache_workers() ->
  msg_cache_manger ! get_all_workers.
課后必讀文章

Erlang 中的錯(cuò)誤處理機(jī)制, Link、Monitor:
Errors and Processes

ETS

ETS (Erlang Term Storage) 是設(shè)計(jì)來(lái)存放大量的 Erlang 數(shù)據(jù)的。跟 ETS 打交道不用消息格式轉(zhuǎn)換,可直接存放 Erlang 數(shù)據(jù)格式 (erlang 各種數(shù)據(jù)格式的統(tǒng)稱(chēng)叫做 erlang terms)。
ETS 非??欤L問(wèn)時(shí)間是常數(shù)級(jí)的,自動(dòng)幫你解決了多進(jìn)程訪問(wèn)的各種競(jìng)態(tài)條件問(wèn)題,讓我們?cè)?Erlang 中做并發(fā)編程一身輕松。ETS 是非常優(yōu)秀的緩存系統(tǒng),是我們開(kāi)發(fā)中不可或缺的利器之一。這比起用某種流行語(yǔ)言來(lái)說(shuō),舒服太多[1]。
ETS 只將數(shù)據(jù)存儲(chǔ)在內(nèi)存里,如果想保存到磁盤(pán),或者要在多個(gè) Erlang Node 之間共享數(shù)據(jù),OTP 基于 ETS 和 DETS 實(shí)現(xiàn)了 mnesia.
NODE: mnesia 只適合用來(lái)做緩存,在多個(gè) Node 之間共享少量數(shù)據(jù),非??焖佟5遣⒉贿m合當(dāng)做數(shù)據(jù)庫(kù)存儲(chǔ)大量的數(shù)據(jù),因?yàn)?mnesia 在啟動(dòng)時(shí)會(huì)加載所有數(shù)據(jù)到內(nèi)存里,導(dǎo)致啟動(dòng)緩慢、新節(jié)點(diǎn)加入緩慢。并且 mnesia 是強(qiáng)一致性的數(shù)據(jù)庫(kù),其本身并不處理由于集群腦裂導(dǎo)致的不一致性,這可能不太符合你的預(yù)期。

ETS 支持幾種數(shù)據(jù)類(lèi)型:

  • set: set 是普通的 key - value 存儲(chǔ)類(lèi)型,一個(gè) ETS table 里,兩個(gè)數(shù)據(jù)的 key 不能相同。重復(fù)插入 key 相同的兩條數(shù)據(jù),后面的那條會(huì)覆蓋前面的那條。
  • ordered_set: 有序的 set 表。
  • bag: bag 允許多個(gè) key 相同的數(shù)據(jù)的存在,但 key, value 都完全相同的數(shù)據(jù)只能留一個(gè)。
  • duplicate_bag: 允許多個(gè) key, value 完全相同的數(shù)據(jù)的存在。

我們來(lái)試試 set 類(lèi)型的 table,這也是最常用的類(lèi)型。我們創(chuàng)建一個(gè)命名表,叫 users, 然后插入兩條數(shù)據(jù):

1> ets:new(users, [set, named_table]).
users
2> ets:info(users).   %% 注意默認(rèn)的權(quán)限是 protected
[{read_concurrency,false},
 {write_concurrency,false},
 {compressed,false},
 {memory,304},
 {owner,<0.57.0>},
 {heir,none},
 {name,users},
 {size,0},
 {node,nonode@nohost},
 {named_table,true},
 {type,set},
 {keypos,1},
 {protection,protected}]
3> ets:insert(users, {1, <<"Shawn">>, 27}).
true
4> ets:insert(users, {2, <<"Scarlett">>, 25}).
true
5> ets:lookup(users, 1).
[{1,<<"Shawn">>,27}]
6> ets:lookup(users, 2).
[{2,<<"Scarlett">>,25}]
7> ets:info(users).
[{read_concurrency,false},
 {write_concurrency,false},
 {compressed,false},
 {memory,332},
 {owner,<0.57.0>},
 {heir,none},
 {name,users},
 {size,2},
 {node,nonode@nohost},
 {named_table,true},
 {type,set},
 {keypos,1},
 {protection,protected}]
8>

注意上邊的示例里:

  • 創(chuàng)建 ETS table 時(shí)給了兩個(gè) Options 參數(shù):[set, named_table]。set 是指定創(chuàng)建 set 類(lèi)型的表,named_table 是創(chuàng)建命名表,命名為 users,后面可以用這個(gè)表名來(lái)引用。
  • 插入數(shù)據(jù) {1, <<"Shawn">>, 27}{2, <<"Scarlett">>, 25} 時(shí),兩個(gè) tuple 的第一項(xiàng)就是默認(rèn)的 key,tuple 里其他項(xiàng)都是 values。如果你想用其他的項(xiàng)作為 key,可以在 ets:new 的時(shí)候,指定 {keypos, Pos} 參數(shù),設(shè)置 key 在 tuple 中的位置。

ETS 表的其他類(lèi)型你可以自己試驗(yàn)一下。

需要注意的是:

  • ETS 表里的任何數(shù)據(jù)都不參加 GC
  • ETS 表有自己的 owner 進(jìn)程,默認(rèn)情況下,創(chuàng)建表的那個(gè)進(jìn)程就是 ETS table 的 owner。如果 owner 進(jìn)程掛了,ETS 表也就被釋放了。我們上邊的例子里,erlang shell 進(jìn)程就是 user table 的 owner。
  • ETS 表也是有訪問(wèn)權(quán)限的,默認(rèn)是 protected:
    • public:任何人可以讀寫(xiě)這張表。
    • protected: owner 可以讀寫(xiě),但其他進(jìn)程只能讀。
    • private:只有 owner 可以讀寫(xiě)。別的進(jìn)程無(wú)法訪問(wèn)。

由于 ETS 表非常高效,一般情況下我們都直接使用 public,然后設(shè)置 {read_concurrency, true}{write_concurrency,true} 選項(xiàng)來(lái)提高并發(fā)讀或?qū)懙男?,在?xiě)一個(gè)管理模塊來(lái)直接訪問(wèn) ets 表,讓什么封裝什么設(shè)計(jì)模式都去 shi。

OTP

OTP 已經(jīng)失去了字面意思,基本上指的就是 Erlang 生態(tài)環(huán)境的官方部分。Erlang 世界的組成是這樣的:

  • Erlang 以及 Elixir 等語(yǔ)言。
  • 工具和函數(shù)庫(kù),包括 erlang runtime,kernel,stdlib(像 lists 這種的官方庫(kù)), sasl, 還有像 ETS,dbg 之類(lèi)的很多。
  • 系統(tǒng)設(shè)計(jì)原則, 包括本章要講的一眾 Behaviors。是一堆應(yīng)用于并發(fā)世界的設(shè)計(jì)模式,他們包含了解決通用問(wèn)題的通用代碼。
  • 開(kāi)源社區(qū)生態(tài)環(huán)境,包括各種開(kāi)源軟件和社區(qū)。

OTP 指的是前三個(gè),Elixir 的話還不大算。

Erlang 的邏輯是,架構(gòu)的設(shè)計(jì)應(yīng)該由有經(jīng)驗(yàn)的人負(fù)責(zé),由專(zhuān)家做好基礎(chǔ)代碼框架,解決好最困難的問(wèn)題。而使用者只需要寫(xiě)自己的邏輯代碼。這就是 OTP behaviors,他們已經(jīng)在通信、互聯(lián)網(wǎng)領(lǐng)域,經(jīng)歷了幾十年的戰(zhàn)火考驗(yàn)。

本文要講的有三個(gè):

  • gen_server
  • application
  • supervisor

本章只講解 gen_server。 application 和 supervisor 放到后面 Hello World 工程里講解。

gen_server 要解決的問(wèn)題,就是我們上面那個(gè) msg_cache 面臨的問(wèn)題:怎樣做一個(gè)服務(wù)來(lái)響應(yīng)用戶的請(qǐng)求。

我們之前寫(xiě)的代碼很短,可以工作,但是很多東西都沒(méi)有考慮。比如請(qǐng)求者如果同時(shí)收到來(lái)自服務(wù)端的兩個(gè) Response 的話,不知道是對(duì)應(yīng)哪個(gè)請(qǐng)求的:

%% 服務(wù)端:
    {get_name, From}->
      From ! {ok, Name},
      loop(State);
    {get_length, From}->
      From ! {ok, Len},
      loop(State);

%% 客戶端:
    ServerPID ! {get_length, self()},   %% 客戶端連續(xù)調(diào)用了兩次
    ServerPID ! {get_length, self()},  
    receive
      {ok, Len} ->  %% 你知道這次匹配到的消息,是上面哪次調(diào)用的回復(fù)嗎?
         success;
      _ ->
         failed
    end.

上面代碼里連續(xù)調(diào)用了兩次 {get_length}, 但是由于發(fā)送消息是異步的,消息通過(guò)網(wǎng)絡(luò)回來(lái),你并不能確定第一次收到的回復(fù)就是第一次調(diào)用產(chǎn)生的。

這個(gè)問(wèn)題可以加一個(gè)隨機(jī)生成的 RequestID 的字段來(lái)解決,客戶端發(fā)送請(qǐng)求消息的時(shí)候帶 RequestID 過(guò)去,服務(wù)端返回的時(shí)候再傳回來(lái)。客戶端通過(guò)匹配 RequestID,就能知道當(dāng)前的回復(fù)是對(duì)應(yīng)的哪個(gè)請(qǐng)求。

但這種需求其實(shí)是通用的,你現(xiàn)在寫(xiě) msg_cache 用得到,改天寫(xiě)其他代碼也一樣用得到。另外我們也沒(méi)有過(guò)多考慮異常的情況:如果程序崩潰了怎么辦?發(fā)送消息怎么知道對(duì)方是不是還活著?

諸如此類(lèi)的問(wèn)題應(yīng)該由專(zhuān)家來(lái)解決,所以我們有了 gen_server.
gen_server 的模板是這樣的:

-module(gen_server_demo).
-behaviour(gen_server).

%% API functions
-export([start_link/0]).

%% gen_server callbacks
-export([init/1,
         handle_call/3,
         handle_cast/2,
         handle_info/2,
         terminate/2,
         code_change/3]).

-record(state, {}).
%%%% %%%% %%%% %%%% %%%% 
%%%% 這是給客戶端調(diào)用的接口部分
%%%% %%%% %%%% %%%% %%%% 
%% 啟動(dòng)一個(gè)服務(wù),后臺(tái)會(huì)啟動(dòng)一個(gè) erlang process, 并進(jìn)入 loop 函數(shù), 回想一下我們實(shí)現(xiàn) msg_cache 時(shí)寫(xiě)的那個(gè) loop/1.
%% 但是這個(gè) loop 函數(shù)屬于通用部分的代碼,是由 OTP 官方實(shí)現(xiàn)的,所以代碼不在這里,在 OTP 代碼的 lib/stdlib/src/gen_server.erl 里。
start_link() ->
    %% gen_server:start_link 啟動(dòng) process, 然后將 process 注冊(cè)在當(dāng)前
    %% node 上,注冊(cè)名字就是當(dāng)前 Module 名:gen_server_demo
    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

%%%% %%%% %%%% %%%% %%%% 
%%%% 這是 gen_server 發(fā)生某事件時(shí)的回調(diào)函數(shù)部分
%%%% %%%% %%%% %%%% %%%%

%% gen_server:start_link 被調(diào)用,服務(wù)啟動(dòng)時(shí),回調(diào) init/1
init([]) ->
    {ok, #state{}}.

%% gen_server:call 被調(diào)用。gen_server:call 是“同步”調(diào)用,調(diào)用方可以設(shè)置一個(gè)超時(shí)時(shí)間。
%% 返回值里的 Reply 是返回給調(diào)用者的內(nèi)容。
handle_call(_Request, _From, State) ->
    Reply = ok,
    {reply, Reply, State}.

%% gen_server:cast 被調(diào)用。gen_server:cast 是“異步”調(diào)用。
%% 調(diào)用者一般是想發(fā)一個(gè)消息給我們的 gen_server,然后繼續(xù)做自己的事情,他不想收到來(lái)自 gen_server 的回復(fù)。
handle_cast(_Msg, State) ->
    {noreply, State}.

%% gen_server 進(jìn)程收到一個(gè)普通 erlang 消息:一個(gè)不是通過(guò) gen_server:call 和 gen_server:cast 發(fā)來(lái)的消息。
handle_info(_Info, State) ->
    {noreply, State}.

%% 上面的三個(gè)函數(shù) handle_call, handle_cast, handle_info
%%   都可以返回一個(gè) {stop, Reason, State},這樣的話 gen_server 會(huì)退出。
%%   但退出之前,可能會(huì)回調(diào) terminate(_Reason, _State)。


%% gen_server 將要退出時(shí),回調(diào) terminate/2.
%% 注意
%% 1) 要想 terminate 在 gen_server 退出前被回調(diào),gen_server 必須捕獲退出信號(hào):
%%    需要在 init 回調(diào)里,加這么一行:process_flag(trap_exit, true).
%% 2) 有幾個(gè)特定的 Reason 被認(rèn)為是正常退出:normal, shutdown, or {shutdown,Term},
%%    其他的 Reason,sasl 是會(huì)報(bào)錯(cuò)打日志的。
terminate(_Reason, _State) ->
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

gen_server 真正的進(jìn)程代碼在 OTP 庫(kù)里,運(yùn)行 start_link(),gen_server 就在后臺(tái)跑起來(lái)了。你需要實(shí)現(xiàn)的只是這個(gè)模板里的各個(gè)回調(diào)函數(shù),將你的業(yè)務(wù)邏輯放到這些回調(diào)里。

仔細(xì)看一下上面的 gen_server 模板和注釋?zhuān)_保你能完全理解。

我不想重新實(shí)現(xiàn)之前的 msg_cache,一點(diǎn)都不酷。我們重新寫(xiě)個(gè)其他的,讓你對(duì) Erlang 程序的基本設(shè)計(jì)理念有更深的印象。

我們要實(shí)現(xiàn)一個(gè)多用戶聊天的程序:

  • 用戶能夠查詢?cè)诰€的其他用戶。
  • 用戶之間能夠聊天。
  • 要容易擴(kuò)展,因?yàn)楹竺嫖覀兊?Client 會(huì)通過(guò)TCP、WebSocket 等連接上來(lái),不會(huì)是 Erlang 寫(xiě)的 Client。
  • 要容易伸縮,因?yàn)槲覀儤I(yè)務(wù)發(fā)展很快,用戶量會(huì)越來(lái)越大,我們希望程序能很容易的部署在多臺(tái)服務(wù)器上。

先來(lái)設(shè)計(jì)我們程序的架構(gòu):


chat_server-2.png
  • 每個(gè) client 連接上來(lái),都會(huì)啟動(dòng)一個(gè)新的 Process,叫做 ChatServer.
  • ChatServer 負(fù)責(zé)維護(hù)這個(gè) Client 的 TCP 連接。
  • Route 是一個(gè)Module,它提供了數(shù)據(jù)庫(kù)的管理,數(shù)據(jù)庫(kù)里維護(hù)了從 User 到其 ChatServer 的 PID 的映射關(guān)系。

注意我們的設(shè)計(jì)思想:

  • 為每一個(gè)連接上來(lái)的請(qǐng)求啟動(dòng)一個(gè) Erlang 進(jìn)程 "ChatServer",不要擔(dān)心進(jìn)程個(gè)數(shù),百萬(wàn)也沒(méi)問(wèn)題。
  • 兩個(gè)用戶之間的消息傳遞,體現(xiàn)在服務(wù)端就是兩個(gè) "ChatServer" 之間的 Erlang 消息傳遞。
  • Route 部分只是一個(gè) Module,不是進(jìn)程。每一個(gè) ChatServer 調(diào)用 Route 里的代碼的時(shí)候,執(zhí)行過(guò)程其實(shí)是在每個(gè) ChatServer 進(jìn)程內(nèi)部的。這樣我們就避免了集中向一個(gè)進(jìn)程發(fā)送消息帶來(lái)的瓶頸。我們把這種瓶頸的處理留給了 ETS 來(lái)解決。
  • 如何伸縮?ChatServer 在不在同一個(gè)服務(wù)器上沒(méi)什么關(guān)系。ChatServerPID !{send, Msg} 會(huì)將消息發(fā)送到ChatServerPID,即使 ChatServerPID 在遠(yuǎn)端的服務(wù)器上。分布式部署的時(shí)候,這行代碼根本不用改,你要做的僅僅是添加一個(gè)新的 Erlang Node。分布式 Erlang 后面還要講。
  • 如何擴(kuò)展?ETS 使用 Route Module 管理,為的就是當(dāng)以后換用其他的緩存數(shù)據(jù)庫(kù)的時(shí)候簡(jiǎn)單一些。我們?cè)O(shè)想后面為了做分布式集群,要用 mnesia 替代 ETS,只需要寫(xiě)一個(gè)新的 Route Module,內(nèi)部改用 mnesia 存儲(chǔ),然后替換線上已經(jīng)加載的老的 Route Module。線上系統(tǒng)都不用停止,客戶端的連接一個(gè)都不會(huì)斷!

你現(xiàn)在能否體會(huì)到 Erlang 的實(shí)用主義呢?完全沒(méi)廢話,就是解決問(wèn)題!

Client 部分我們現(xiàn)在不做,讓前端的同學(xué)幫我們實(shí)現(xiàn)。但假設(shè)我們的前端程序員還沒(méi)到崗,所以我們可以先放著 WebSocket 部分后面再做。但有兩個(gè)過(guò)程必須現(xiàn)在實(shí)現(xiàn):

  • 當(dāng) Client 登錄時(shí),我們需要使用 Route 注冊(cè) user 所在的 ChatServer 的 PID。
  • 當(dāng) Client 發(fā)消息時(shí),我們需要使用 Route 查找對(duì)方的 ChatServer 的 PID。

首先我們來(lái)定義我們的消息協(xié)議。我們的消息體內(nèi)包含幾部分,發(fā)送者ID,接收者ID,以及消息內(nèi)容:

-record(msg, {
  from_userid,
  to_userid,
  payload
}).

接下來(lái)讓我們來(lái)實(shí)現(xiàn) Route 模塊,實(shí)現(xiàn)數(shù)據(jù)庫(kù)創(chuàng)建,注冊(cè),查找與注銷(xiāo)功能:

-module(route).
-export([ensure_db/0,
         lookup_server/1,
         register_server/2,
         unregister_server/1]).

ensure_db() ->
  case ets:info(servers) of
    undefined ->
      %% 為了演示方便,我們啟動(dòng)一個(gè)臨時(shí)進(jìn)程來(lái)創(chuàng)建 ETS 表,
      %% 如果直接在 erlang shell 里創(chuàng)建ETS的話,出錯(cuò)時(shí) shell 的崩潰連帶著我們的ETS也丟了。
      %% 當(dāng)然線上系統(tǒng)不會(huì)這么做。
      spawn(fun() -> ets:new(servers, [named_table, public]), receive after infinity->ok end end);
    _ -> ok
  end.

lookup_server(UserID) ->
  case ets:lookup(servers, UserID) of
    [{UserID, ServerID}] -> {ok, ServerID};
    _ -> {error, no_server}
  end.

register_server(UserID, ServerID) ->
  ets:insert(servers, {UserID, ServerID}).

unregister_server(UserID) ->
  ets:delete(servers, UserID).

接下來(lái)實(shí)現(xiàn)我們的 ChatServer:

-module(chat_server).
-behaviour(gen_server).
%% state 保存用戶的 userid,以及 client 端連上來(lái)的 socket.
-record(state, {
  userid,
  socket
}).

%% 后面當(dāng)一個(gè)新連接連接到服務(wù)器的時(shí)候,我們會(huì)調(diào)用 start_link 啟動(dòng)一個(gè)新的 ChatServer 為之服務(wù)。
%% 注意這里使用的是 gen_server:start_link/3,沒(méi)有注冊(cè)進(jìn)程名,我們直接使用 PID. 因?yàn)槲覀円獑?dòng)很多個(gè) ChatServer。
start_link(UserID, Socket) ->
  {ok, ServerID}  = gen_server:start_link(?MODULE, [UserID, Socket], []),
  ServerID.

%% 在 init 回調(diào)里注冊(cè)用戶的 ChatServer。
%% 注意我們捕獲了 exit signal, 以便程序退出的時(shí)候回調(diào) terminate/2. 
%% 我們?cè)?terminate/2 里取消注冊(cè)。
init([UserID, Socket]) ->
    process_flag(trap_exit, true),
    route:register_server(UserID, self()),
    {ok, #state{userid=UserID, socket=Socket}}.

%% 如果我們的 ChatServer 收到一條來(lái)自 Socket 的消息,它會(huì)收到一條類(lèi)似 {tcp, Sock, Data} 的普通消息。
%% 我們需要在 handle_info 里處理,轉(zhuǎn)發(fā)給對(duì)方的 ChatServer。
handle_info({tcp, #msg{to_userid = ToUserID, payload = Payload} = Msg}, State) ->
  io:format("Chat Server(User: ~p) - received msg from tcp client, Msg: ~p~n",[State#state.userid, Msg]),
  case route:lookup_server(ToUserID) of
    {error, Reason} ->
      io:format("Chat Server(User: ~p) - cannot forward to Chat Server(User: ~p): ~p~n",
          [State#state.userid, ToUserID, Reason]);
    {ok, TargetServerID} ->
      io:format("Chat Server(User: ~p) - forward msg to Chat Server(User: ~p), Payload: ~p~n",
        [State#state.userid, ToUserID, Payload]),
      ok = gen_server:call(TargetServerID, {send, Msg})
  end,
  {noreply, State};

%% 我們的 ChatServer 收到一條來(lái)自對(duì)端 ChatServer 的轉(zhuǎn)發(fā)請(qǐng)求
handle_call({send, #msg{payload = Payload}}, _From, State) ->
  io:format("Chat Server(User: ~p) - deliver msg to tcp client, Payload: ~p~n",
    [State#state.userid, Payload]),
  send_to_client_via_tcp(State#state.socket, Payload),
  {reply, ok, State};

%% Socket 部分我們沒(méi)有實(shí)現(xiàn),暫時(shí)就簡(jiǎn)單打印一下
send_to_client_via_tcp(_Socket, Payload) ->
  %gen_tcp:send(_Socket, Payload),
  io:format("Sent To Client: ~p~n",[Payload]).

完工了!我們測(cè)試一下:

1> c(chat_server).
{ok,chat_server}
2> c(route).
{ok,route}

%% 現(xiàn)在模擬系統(tǒng)啟動(dòng)時(shí),初始化 DB 的過(guò)程。
%% 后續(xù)這個(gè)會(huì)在啟動(dòng)代碼里寫(xiě)。
3> route:ensure_db().
<0.73.0>

%% 現(xiàn)在我們模擬一個(gè)新的用戶登錄上來(lái),啟動(dòng)新的 ChatServer 的過(guò)程。
%% 后續(xù)這個(gè)過(guò)程當(dāng)然是由 WebSocket 模塊調(diào)用。
4> ServerIDUser1 = chat_server:start_link(<<"user1">>, fake_socket).
<0.75.0>
5> ServerIDUser2 = chat_server:start_link(<<"user2">>, fake_socket).
<0.77.0>

%% 我們來(lái)做一個(gè) #msg{} 消息體。
%% 后續(xù)我們應(yīng)該在收到 socket 上來(lái)的消息解析成功之后,打包一個(gè) #msg{} 消息體。
6> rr("chat_protocol.hrl").
[msg]
7> Msg = #msg{from_userid= <<"user1">>, to_userid = <<"user2">>, payload = <<"hello?">>}.
#msg{from_userid = <<"user1">>,to_userid = <<"user2">>,
     payload = <<"hello?">>}


%% 模擬從 socket 收到消息的過(guò)程。
8> ServerIDUser1 ! {tcp, Msg}.
Chat Server(User: <<"user1">>) - received msg from tcp client, Msg: {msg,
                                                                     <<"user1">>,
                                                                     <<"user2">>,
                                                                     <<"hello?">>}
{tcp,#msg{from_userid = <<"user1">>,to_userid = <<"user2">>,
          payload = <<"hello?">>}}
Chat Server(User: <<"user1">>) - forward msg to Chat Server(User: <<"user2">>), Payload: <<"hello?">>
Chat Server(User: <<"user2">>) - deliver msg to tcp client, Payload: <<"hello?">>
Sent To Client: <<"hello?">>
9>

我們看到服務(wù)端的路由已經(jīng)走通了,接下來(lái)只要寫(xiě)一個(gè) web socket 模塊,listen 在某個(gè)端口,當(dāng)有連接請(qǐng)求時(shí),像上面第 4、第 5 行一樣調(diào)用 chat_server:start_link/2 就行了。當(dāng)然 send_to_client_via_tcp 也要改為真正往 socket 發(fā)送消息。

完整代碼:
https://github.com/terry-xiaoyu/learn-erlang-in-30-mins/tree/master/chat

一個(gè)完整的線上演示:
(即將上線)

書(shū)接下文30 分鐘學(xué) Erlang (三)


  1. Golang 里你需要自己找多線程安全的 maps 庫(kù),寫(xiě)并發(fā)沒(méi)有安全感。Golang 官方也沒(méi)有下文要說(shuō)到的 OTP 里提供的各種 Behavior,代碼寫(xiě)起來(lái)天馬行空最后一團(tuán)糟。然后又不能支持函數(shù)式的 pattern matching 等寫(xiě)法... 總之用 golang 寫(xiě)代碼從來(lái)不會(huì)給人愉快的感覺(jué)。流行是流行的,但那叫“普通”吧?第一次這么吐槽 golang,但這篇是 erlang 的教程,應(yīng)該不算過(guò)分吧。等到寫(xiě) go 的時(shí)候我再來(lái)吐槽 erlang 。我是不會(huì)寫(xiě) go 的 ... ?

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 本文寫(xiě)給誰(shuí)看的? 那些已經(jīng)有過(guò)至少一門(mén)編程語(yǔ)言基礎(chǔ),并且需要快速了解Erlang,掌握其基本要點(diǎn),并馬上投入工作中...
    Shawn_xiaoyu閱讀 31,921評(píng)論 9 60
  • 世界是并行的,Erlang程序反應(yīng)了我們思考和交流的方式,人作為個(gè)體通過(guò)發(fā)送消息進(jìn)行交流,如果有人死亡,其他人會(huì)注...
    abel_cao閱讀 2,867評(píng)論 1 4
  • erlang常規(guī)面試題 基礎(chǔ) 消息發(fā)送 基礎(chǔ)相關(guān) OTP相關(guān) gen_server:cast和erlang:sen...
    randyjia閱讀 4,384評(píng)論 1 3
  • erlang應(yīng)用腳本stop分析 其實(shí)這篇文章的名字應(yīng)該是如何安全關(guān)閉erlang應(yīng)用更加科學(xué)。 erlang應(yīng)用...
    randyjiawenjie閱讀 1,537評(píng)論 0 1
  • erlang應(yīng)用腳本stop分析 其實(shí)這篇文章的名字應(yīng)該是如何安全關(guān)閉erlang應(yīng)用更加科學(xué)。 erlang應(yīng)用...
    randyjia閱讀 1,281評(píng)論 0 1

友情鏈接更多精彩內(nèi)容