More on Message Passing Distributed Computing
Broadcast.
Often in a distributed algorithm it is necessary to broadcast data from one process to all of the other processes. We assume a built-in function broadcast(a,i) for this task, where a is the data in process Pi to be broadcast. For simplicity, we assume that a is replicated to the local variable a in each process, as opposed to another variable b.) We show the result of the broadcast operation in Figure 18.3, where we broadcast from P0.
***Figure 18.3 Action of broadcast(a,0) (new)***
Each process must execute the broadcast instruction before any of the processes can continue, so that broadcast enforces a synchronization amongst the processes. In other words, each process must halt further execution until all of the processes have encountered and executed the broadcast instruction. Of course, there are situations where this synchronization is not desirable, and then a broadcast must be explicitly coded by the programmer (see Section 18.5).
Scatter.
The built-in scatter operation is similar to a broadcast, except that each process gets a separate piece of the data. For example, in a sorting algorithm, we will typically assume that a list L[0:n – 1] of size n is a local variable in the memory of process P0, and process P0 distributes (scatters) n/p elements of the list to each process. More precisely, process Pireceives the sublist L[in/p: (i+1)n/p – 1], i = 0, 1, …, p – 1 . We use the notation scatter(a[0:n – 1],b[0:n/p – 1],i) for scattering the array a[0:n – 1] throughout the processes and received by the array b[0:n/p – 1] in each process. We show the result of the scatter operation in Figure 18.4, where we scatter from P0.
***Figure 18.4 Action of scatter(a[0:n – 1],b[0:n/p – 1],0) (new)***
Again, each process must execute the scatter instruction before any of the processes can continue.
Gather.
The gather operation is basically the reverse operation of scatter, where now data in the local arrays b[0:m – 1] in each process is gathered into a local array a[0:pm – 1] in the memory of a single process Pi. More precisely, in the gather(b[0:m – 1],a[0:pm – 1],i) operation, the subarray a[mj: m(j+1) – 1] of Pi receives the array b[0:m – 1] from process Pj, j = 0, 1, …, p – 1. We show the result of the gather operation in Figure 18.5, where we gather at P0.
***Figure 18.5 Action of gather(b[0:m – 1],a[0:pm – 1],0) (new)***
Once again, each process must execute the gather instruction before any of the processes can continue. For example, after a sorting algorithm has sorted a list of size n by splitting the list into sorted sublists of size n/p stored in each process Pj, j = 0, 1, …, p – 1, the sorted sublist might be gathered at process P0 into a single sorted list. We illustrate this in Section 18.3 where even-odd transposition sort is implemented in a distributed network of processes communicating by message passing.
18.2.5 Asynchronous (Non-Blocking) Message Passing
Blocking sends and receives impose synchronization constraints that are not desirable in the case where the processes should be free to perform calculations in a highly asynchronous manner. For example, in the master-worker model discussed earlier, it might be quite desirable if the worker processes could alternate between computation and communication phases without the need to tightly synchronize their code with that of other workers.
In order to accomplish this alternation between communication and computation phases, we introduce two built-in functions Bsend and probe. As mentioned earlier, the “B” in Bsend refers to “buffered,” and corresponds to the MPI function MPI_Bsend. When a source process executes a Bsend, the message is placed in the source’s message buffer, and the target process for the message receives a posting for the message which is placed in the target’s posting list. The posting will identify the source process so that when the posting is read, the target will know from which process message buffer to receive (retrieve) the message (the message is removed from the source’s message buffer when the target receives it). Immediately after placing the message in the message buffer, Bsend returns and the source process can resume execution of the statement following the Bsend statement. We will assume that this is “safe” in the sense that data sent in the Bsend operation can be altered before the message is received, since the corresponding message has been buffered.
From the target’s point of view, a process can then check to see if it has any pending messages posted by executing the probe function, which returns the Boolean value .true. if the posting list contains a matching send, and .false. otherwise. The probe function has two parameters, src and tag, where src is the source process, and tag has the same meaning as in send and receive. Usually in this context src has the value ANY_SOURCE. This process is illustrated in Figure 18.6, which illustrates the outcome of executing a probe when a matching Bsend has been posted before the matching receive occurs. Note from Figure 18.6 that probe is a non-blocking function (so that it turns out to mirror the MPI function MPI_Iprobe as opposed to the blocking MPI_Probe). Hence, in the case where the matching receive occurs before the corresponding Bsend, execution does not suspend until the Bsend is executed. Thus, one would expect that the execution would eventually loop back to the probe statement in order to receive the matching message. To illustrate how all available messages could be read before continuing further, in Figure 18.6 we show the matching receive inside a while loop (that would itself be inside a loop for the reason just stated), as this is a common occurrence.
***Figure 18.6 Action of Bsend (new)***
18.2.6 Computation/Communication Alternation
Bsend and probe support a distributed computing paradigm where processes are assigned tasks, and alternate between computation and communication phases. More precisely, after a process has completed a computation phase where a fixed amount of work (depending on problem parameters) is done, it can enter a while probe do loop in order to receive all its pending messages (note that new messages might be posted during the execution of this loop). The process can take appropriate action based on these messages before entering another computation phase. Moreover, when a process has completed its assigned task (becomes idle), it can then be given a new task, which might be a subtask assigned to another process (see Figure 18.7). In this way good load balancing can possibly be achieved, both between communication and computation phases, as well as for the distribution of the workload between processes. The flow chart in Figure 18.7 illustrates this generic alternating computation/communication cycle performed by a process. We have divided the flow chart into working and idle phases, where we consider that a process is working when it has not completed its computation task. Of course, as indicated, an “idle” process still participates in communication steps, such as participating in a broadcast of such things as the best solution found so far to an optimization problem, or a termination message, and so forth.
***Figure 18.7 Alternating computation/communication cycle performed by a process (new)***
The question remains how to implement the computation/communication cycle, including the execution of termination condition by all processes. One method is the master-worker paradigm discussed in Section 18.4 as applied to distributed depth-first search (backtracking) in Section 18.6.
18.3 Distributed Even-Odd Transposition Sort
Even-odd transposition sort serves as a nice illustration of the use of our CompareSplit, scatter and gather instructions, as well as adapting a parallel algorithm using n processes to a distributed message-passing algorithm using p processes. After scattering an input list L[0:n – 1] in process P0 to the processes P0, P1, … , Pp-1 , EvenOddSort sorts each of the scattered sublists using an optimal sequential sorting algorithm (we have chosen MergeSort), and then p parallel compare-split steps are performed on the sublists, where in an odd step, processes Pi and Pi+1participate in the compare-split step, i = 1,3, … whereas in an even step, i = 0,2, … . In the final step, the sorted sublists are gathered back to process P0.
procedureEvenOddSort(L[0:n – 1])
Model:message-passing distributed network of p processes
Input:L[0:n – 1] (array in process P0)
Output:L[0:n – 1] is sorted in increasing order
m ← n/p
myrank ← myid
scatter(L[0:n – 1],a[0:m – 1],0)
MergeSort(a[0:m – 1],1,m)
fork ← 0 to p – 1 do
if even(k + myrank) then
if myrank < p – 1 then
CompareSplit(a[0:m – 1],myrank,myrank + 1)
endif
else //odd(k + myrank)
if myrank > 0 then
CompareSplit(a[0:m – 1],myrank - 1,myrank)
endif
endif
endfor
gather(a[0:m – 1],L[0:n – 1],0)
end EvenOddSort
The correctness of EvenOddSort follows immediately from the correctness of EvenOddSort1DMesh (see Chapter 15). Considering the complexity of EvenOddSort, note that the computational complexity of the call to MergeSort is in Θ((n/p)(log n/p)), and the computational complexity of the p parallel calls to CompareSplit is in Θ(n). Thus, for p small compared to n, we basically achieve the (optimal) speedup of p as well as optimal cost. Since the communication complexity of EvenOddSort is in Θ(p), the sort is only attractive when p is small compared to n.
18.4 The Embarrassingly Parallel Master-Worker Paradigm
As mentioned earlier, in the master-worker paradigm, one of the processes, say P0, is designated as the master, which coordinates the execution of a distributed algorithm by farming out tasks to the other processes, called workers. Thus, this paradigm is also referred to as the work pool, or processor farm approach. The general master-worker paradigm skeleton is then:
ifmyid = 0 then
execute master code
else
execute worker code
endif
In this section we discuss the embarrassing parallel master-worker paradigm in which workers complete tasks completely independently of one another. In Section 18.6 we discuss the more complicated version where workers cooperate in solving tasks by splitting their task with workers who become idle.
18.4.1 Embarrassing Parallel Master-Worker Code Skeleton
In the embarrassingly parallel situation, the workers accept tasks sent by the master, execute these tasks, return the results to the master, accept further tasks, and so forth, until all tasks have been completed. There is no worker-to-worker communication, and, in particular, each worker executes its code independently of the other workers, without any need to synchronize its computations with the other workers. When all tasks are completed, the master broadcasts a message indicating termination (thereby allowing for a graceful exit for all processes). Figure 18.8 captures this version of the embarrassingly parallel master-worker paradigm.
***Figure 18.8 Embarrassingly Parallel Master-Worker Paradigm (new)***
The embarrassingly parallel master-worker pseudocode skeleton is then as follows, where we use the blocking form of send, and where we assume that we have n tasks task1, …, taskn, and p < processes .
Master
fori ← 1 top – 1 do
Ssend(taski, i, tag) //taski is a null task in the event that there are
//more workers (processes) than tasks)
endfor
numsent← min(n,p – 1) //number of tasks sent
numrec ← 0 //number of completed tasks received
whilenumrecndo
receive(result, ANY_SOURCE, tag, status)
SourceWorker ← status→source
numrec ← numrec + 1
ifnumsentnthen
numsent ← numsent + 1
Ssend(tasknumsent, SourceWorker, tag) //send the next task back to the
worker from whom the result was just received
else
Ssend(nulltask, SourceWorker, tag) //send the null task back to
worker
endif
endwhile
broadcast(termination,0)
Worker
receive(task,0, tag, status) //receive a task from the master
whiletask ≠ nulltaskdo
result ← CompleteTask(task)
Ssend(result,0, tag) //send result to master
receive(task,0, tag, status) //receive a task from the master
endwhile
broadcast(termination,0)
Note that in our master skeleton code we send the nulltask when there are no more tasks to be completed. This allows for synchronization of the Ssend and receive functions. The exact nature of the nulltask is problem dependent.
18.4.2 Matrix Multiplication
We illustrate the implementation of the embarrassingly parallel master-worker code skeleton with an algorithm for computing the product AB of two n-by-n matrices. We assume that (master) process P0 originally holds A and B, and broadcasts A to all of the other (worker) processes. Then the n columns Bi, i = 0,1, …, n – 1 of the matrix B are sent one at a time to the workers. When a worker receives a column Bi, it computes the matrix product ABi, and returns the result to P0. When P0 receives a column-vector product from a worker, it sends the worker the next remaining column Bi (if any). The algorithm terminates after all of the products ABiare returned, i = 0,1, …, n – 1. We use the tag parameter in order to keep track of which column is being sent and received. When a worker receives a message from the master having tag = n, it knows that there is no more work to be done. The procedure MatrixVectorProduct called by each worker is the sequential algorithm for computing a matrix-vector product based directly on the definition. Also, the sequential procedure GetColumn called by each worker have the obvious meaning. The matrix Null[0:n – 1] used in the algorithm can be filled with arbitrary values, since it is only sent with tag = n, which is enough information for the receiving worker to know that it needs to take no further action. Of course, the worker could also check for the null matrix, but we wish to design the code to model the general code skeleton as closely as possible.
procedureMatrixProduct(A[0:n – 1,0:n – 1], B[0:n – 1,0:n – 1], C[0:n – 1,
0:n – 1])
Model:message-passing distributed network of p processes
Input: A[0:n – 1,0:n – 1], B[0:n – 1,0:n – 1] (n-by-n matrices of
numbers in process P0)
Output:C[0:n – 1,0:n – 1] (the matrix product C = AB)
broadcast(A[0:n – 1,0:n – 1], 0) //master broadcasts A to all workers
if myid = 0 then //execute master code
fori ← 1 top – 1 do
ifi ≤n then
Ssend(GetColumn(B[0:n – 1, 0:n – 1], i – 1), i, i – 1) //send Pithe
ith column of B
else
Ssend(Null[0:n – 1],i,n) //send Pi a null vector
endif
endfor
numsent ← min(n, p – 1) //a counter for the number of columns sent
numrec← 0 //a counter for the number of matrix-vector products
received by the master
while numrecndo
receive(Column[0:n – 1], ANY_SOURCE, ANY_TAG, status)
Col←status→tag
SourceWorker←status→tag
fori← 0 ton – 1 do
C[i,Col] ←Column[i]
endfor
numrec ← numrec + 1
ifnumsent nthen
numsent ← numsent + 1
Ssend(GetColumn(B[0:n-1,0:n – 1], numsent – 1),
SourceWorker, numsent-1)
//send PSourceWorker the column of B in position numsent – 1
else
Ssend(Null[0:n – 1], SourceWorker, n)
endif
endwhile
else //that is, myid ≠ 0, so execute worker code
receive(Column[0:n – 1], 0, ANY_TAG, status)
Col←status→tag
whileCol n do
Ssend(MatrixVectorProduct(A[0:n-1,0:n-1],Column[0:n-1]),0,Col)
receive(Column[0:n – 1], 0, ANY_TAG, status)
Col ← status→tag
endwhile
endif
end MatrixProduct
The straightforward sequential algorithm for multiplying two n-by-n matrices based directly on the definition of matrix multiplication performs n3 multiplications. The distributed algorithm MatrixProduct performs (n/(p – 1)n2 parallel multiplications (assuming pn), so that it achieves (nearly) optimal speedup over the straightforward sequential algorithm. Note that MatrixProduct performs max{n,p – 1} communication steps, so that for large n (p fixed), the (qubic) computation complexity dominates the (linear) communication complexity.
By sending one column at a time to workers, MatrixProduct has been designed so that it achieves good load balancing in a heterogeneous environment where the processing speed of the processes might vary considerably. However, when the processes are about equal in processing speed, an altered version where the columns of the matrix B are partitioned into n/p blocks, and (the master and) each worker computes the corresponding block of columns of AB, would probably be preferred since the communication steps would be reduced from max{n,p – 1} to p – 1. The number of multiplications performed would be the same for both versions. We leave the pseudocode of the altered version as an exercise.