Message Passing Models II ========================= FORTRAN M (FM) -------------- Message Passing extension of FORTRAN Emphasis on modularity SLIDE Foster, Program 6.1 (Bridge construction) PROCESS definitions (analogous to SUBROUTINEs) PROCESS name(arg) ... end Datatypes INPORT, OUTPUT typed references to communication structure PROCESS name(in, out) INPUT(integer) in OUTPUT(integer) out integer in, eoc RECEIVE(port=in, iostat=eoc) i if (eoc.eq.0) SEND(out) i endif ENDCHANNEL(out) end Ports can be passed by messages => dynamic communication structures => more flexible than OCCAM => more controlled than MPI Process creation process blocks process do-loops PROCESSES PROCESSCALL master(port1, port2, ports) PROCESSCALL worker(port1) PROCESSCALL worker(port2) PROCESSDO i = 1,10 PROCESSCALL worker(ports(i)) ENDPROCESSDO ENDPROCESSES analogous to OCCAM Communication CHANNELs single-producer, single-consumer communication CHANNEL(in=porta, out=portb) individual connection port types must match INPORT (integer, real x(10)) porta OUTPORT (integer, real x(10)) portb CHANNEL(in=porta, out=portb) SEND(portb) i, a CHANNEL(in=portsa(:), out=portsb(:) array of channels SLIDE Foster, Figure 6.1 SLIDE Foster, Program 6.2 MERGERs multiple-producer, single-consumer communication non-determinism MERGER(in=porta, out=portsb) many to one many to many by multiple mergers SLIDE Foster, Figure 6.4 Dynamic channel structures Port variables incorporated in messages INPORT (OUTPORT(integer)) pi OUTPORT (integer) qo RECEIVE (pi) qo SEND (qo) 777 SLIDE, Foster Program 6.6 SLIDE, Foster Figure 6.5 Asynchronous communication PROBE(inport,empty=e) if (.not.e) then ... endif Nondeterminism Sources of non-determinism: - MERGER - PROBE MERGER can be implemented by PROBE Otherwise, FORTRAN M Program execute deterministically! - port occurs in more than one process (compiler) - port used in communication and still used locally - runtime systems invalidates local copy of port! Mapping constracts Similar in spirit than HPF Details different - PROCESSORS virtual processor array of specified size and shape PROCESSORS(dim1, dim2, ... dimn) n-dimensional cube of processors PROCESSCALL p(args) LOCATION (c1, c2, ..., cn) - SUBMACHINE select subcube for computation of process PROCESSORS(P, 2*P) PROCESSES PROCESSCALL p(...) SUBMACHINE(1:P, 1:P) PROCESSCALL q(...) SUBMACHINE(1:P, P+1,2*P) ENDPROCESSES Source of Modularity SLIDE Foster, Figure 6.9 ERLANG ------ Functional (actual single assignment) concurrent programming language Developed by Ericson, Sweden Applied to programming of telephone systems Function definitions len([]) -> 0; len([H|T]) -> 1+len(T). Process spawning pid = spawn(Node, Fun, [Arg1,...,Argn]) Communication pid ! message receive pattern1 [when guard1] -> action_1 pattern2 [when guard2] => action_2 ..... [after time action_n+1] end Selection according to message patterns receive {'taga', a } -> f(a) {'tagb', b } -> g(b) end Analogous to OCCAM ! and ALT construct Pid registration register(name, pid) global registration of process pid as name allows any process to communicate with pid (analogous MPI) Simulation of different programming models possible ---- RPC (Remote Procedure Call) -------------------------- machine 1 machine 2 | v b = g(a) --------> g(x) | v +-------------- return(y) | v "Synchronous message passing" - sender blocked until reply received - syntactically like procedure calls L1 = [a, ee, rr, tt] L2 = [5, 6, 7] start() call(node, append, [L1, L2]) -- start server start() -> register(rpc, spawn(loop, [])); -- server loop loop() -> receive {pid, {apply, fun, args}} -> spawn(reply, [pid, fun, args]), loop() end. -- serve request reply(pid, fun, args) -> pid ! {'result', apply(fun, args)}. -- client call call(node, fun, args) -> node ! {self(), {apply, fun, args}}, receive {'result', result} -> result' end. --- Pseudo Server ------------- start(node, service) starts a pseudo server for 'service' on local node forwarding requests to actual server 'node' start(node, fun, name) -> id = spawn(relay, [node, name]), register(name, id). relay(node, name) -> p = call(node, server, [name]), loop(p). loop(p) -> receive ANYTHING -> p ! Anything end, loop(p). -- Promises -------- Asynchronous variant of remote procedure calls - RPC immediately returns promise (place holder for result) - promise can be later used to claim actual result p = promise(node, f, a). ... result = yield(p) promise(node, fun, args) -> spawn(do_call, [self(), node, fun, args]). do_call(pid, node, fun, args) -> r = call(node, fun, args), pid ! { self(), {'promise', r} }. yield(key) -> receive {key, {'promise', r}} -> r end. -- Parallel evaluation ------------------- peval(list) -> nodes = NODES(), promises = map_nodes(list, nodes, nodes), map(yield, promises). map_nodes([],_,_) -> []; map_nodes(list,[],nodes) -> map_nodes(list, nodes, nodes). map_nodes([{fun,args} | rest], [node | nrest], nodes) -> [ call(node, fun, arg) | map_nodes(rest, nrest, nodes)]. -- More on parallel functional programming see later.