Erlang中的并行编程:

定义:

Process :并发执行的活动的个体,是个完整的虚拟机,系统当中可以同时存在多个并发的进程。

Message:进程之间通信的方法。
Timeout:用于等待指定一段时间的,然后决定完成某个动作的机制。
Registered Process: 进程被注册了名称别名,因为通过进程ID(例如<0.30.0>)访问进程不方便,而通过简单易记的别名去访问进程,非常方便。
Client/Server模型:用于建立并行系统的标准的模型。
 

1.创建新进程

新进程Pid2由另一个进程Pid1所创建,创建前如下:
Pid2 = spawn(Mod, Func, Args)
过后看到:
在进程Pid1的代码中,调用:

其中Pid2是新创建进程标识,此进程只对Pid1是可见的,Pid1可通过Pid2标识向Pid2进程发送消息。
 

2.简单的消息传递


self() - 返回执行该函数进程的进程标识符(Pid)

From,Msg在B未接收到消息之前是未绑定的(未被匹配的),当B接收到消息后被绑定(匹配后)为接收的数据。


Messages可承载消息同时可以被有选择的解析,也就是你可以解析匹配某种类型的消息,不解析某种类型的消息,可通过atom来进行筛选。

当进程接收到消息之后,A和D变为绑定,绑定的变量是不能再赋予新的值了。

假如A在接收到消息之前已经绑定了,那么从此以后B只能接收A进程的消息,因为只有A进程发的消息才能够被绑定的A变量匹配。

3. 一个echo进程

-module(echo).
-export([go/0, loop/0]).
 
go() ->
	Pid2 = spawn(echo, loop, []),
	Pid2 ! {self(), hello},
	receive 
		{Pid2, Msg} ->
			io:format("P1 ~w~n",[Msg])
	end,
	Pid2 ! stop.

loop() ->
	receive
		{From, Msg} -> 
			From ! {self(), Msg},
			loop();
		stop ->
			true
	end.

4.选择性的消息接收


只有消息foo和bar才会被接收,foo和bar的消息接收是没有顺序关系的,依赖于客户端的发送顺序。
 

5.接收任何消息


选择性的接收任何消息,当第一条消息到达进程C被处理后,变量C被绑定了为原子foo或bar,具体绑定哪一个取决于那个先到达的消息。

6. 一个电话的例子


ringing_a(A, B) -> 
	receive
		{A, on_hook} ->
			A ! {stop_tone, ring},
			B ! terminate,
			idle(A);
		{B, answered} ->
			A ! {stop_tone, ring},
			switch ! {connect, A, B},
			conversation_a(A, B)
	end.


 
A拿起了电话拨打了B的电话,A这个时候听到的是铃声,在这个时候:
 1. {A, on_hook}接收到A挂机的消息
   1-1 A ! {stop_tone, ring} 停止向A播放铃音
   1-2 向B发送终止
   1-3 设置A为空闲状态
 2. B接了电话
   2-1 停止向A播放铃音
   2-2 A,B之间建立连接
   2-3 A,B之间通话。

7. Pids(进程标识符)可以在消息中传递


A 发送消息给B,消息中包含了A的进程标识符pid
B 发送transfer消息给C,其中也包含了A的进程标识符
C 直接回复A
 

8. 给进程注册一个名称

register(Alias, Pid) 注册进程 Pid 用名字 Alias.

start() ->
    Pid = spawn(num_anal, server, [])
    register(analyser, Pid).

analyse(Seq) ->
    analyser ! {self(),{analyse,Seq}},
    receive
        {analysis_result,R} ->
            R
    end.

任何进程可以通过注册的进程名称,向注册名称的进程发送消息。
 

9. C/S模型

协议:
服务器节点:
-module(myserver).

server(Data) ->
	receive
		{From,{request,X}} ->
			{R, Data1} = fn(X, Data),
			From ! {myserver,{reply, R}},
			server(Data1)
	end.

 
接口库:
-export([request/1]).

request(Req) ->
	myserver ! {self(),{request,Req}},
	receive
		{myserver,{reply,Rep}} ->
			Rep
	end.


 

10 Timeouts


如果A发送消息foo的时间在Time内,则执行Actions1,否则执行Actions2.
 
Timeouts使用
sleep(T)- 设置进程挂起T毫秒(ms).

sleep(T) ->
    receive
    after
        T ->
            true
    end.

suspend() - 设置进程无限期等待. 
suspend() ->
    receive
    after
        infinity ->
            true
    end.

alarm(T, What) - 设置当前进程从现在起T毫秒(ms)后接收到消息。
set_alarm(T, What) ->
    spawn(timer, set, [self(),T,What]).

set(Pid, T, Alarm) ->
    receive
    after
        T ->
            Pid ! Alarm
    end.
receive
    Msg ->
        ... ;
end.

flush() - 刷新进程的消息队列缓冲区
flush() ->
    receive 
        Any ->
            flush()
    after 
        0 ->
            true
    end.

    用0作为超时时间表示进程先检查消息缓冲区,当消息缓冲区为空时,立即执行后面的代码。
Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐