Parallelization  Parallel Circle Pi
We describe the conversion of a singleprocessor code into a parallel code.
In this example, we demonstrate a problem whose parts can be divided
into mostly independent pieces on a set of nodes.
However, what happens if the nodes differ in computational power?
The simplest "divide and conquer" approach is less efficient because
some parts of the work would be completed ahead of others.
Organizing and reassigning work to compensate for such performance differences is called "load balancing".
In this example, the work of the code is automatically load balanced between the processors in an efficient and dynamic way.

This technique applies to problems that can be cut up into relatively independent pieces and more numerous than
the number of processors available.
This example is an extension of the
Parallel Adder tutorial.
There, the code simply divided the problem size by the processor count and the separate nodes performed their work.
Here, we take advantage of a problem that can be cut up into pieces more numerous than the number of processors.
Then we assign more work to nodes that complete work more quickly.
This is useful if you anticipate that your code will be running in a heterogeneous environment, that is,
one where the processors are not identical.
A different technique would be used to divide work that requires constant interaction.
Finally, if you are learning about parallel code,
the following example shows how to partition work dynamically using elemental messagepassing
in a parallel code.
Before: SingleProcessor circlepi
This singleprocessor example, circlepi,
calculates pi using a very simple approach.
The principle of the algorithm considers a circle inscribed in a square.
Randomly throw darts at the square and count how many land inside the circle.
Assuming a unit circle, the area of the circle is pi and the area of the square is 4.
Therefore the ratio of those darts that landed inside the circle over how many were thrown should asymptotically approach pi/4.
Multiplying this ratio by 4 gives pi.


In the code we represent the random dart throws by generating random number pairs, and
we perform a simple test on those number pairs to
determine if the dart landed inside the circle. After that, it's a simple counting process.
The calculation could be performed exclusively with integer math, easing the creation of an implementation
using arbitrarily highprecision dart throws.
For the sake of brevity, we allow floatingpoint math in this code example's dart throw.
This code has three primary routines plus main:
 throwdart  retrieves random number pairs and compares the sums of their squares against 1 and
returns the result.
The number pairs represent the landing coordinates (x,y) of the dart. These coordinates range
up to 1, so the dart landed inside the circle if x*x+y*y is less than 1. This routine
simply returns true if the dart is inside the circle, otherwise false.
 performiterations  calls throwdart repeatedly, counting how many times
it returns true and how many times it threw the dart. The ratio is multiplied by four here by calling throwdart
four times as many times.
 performcalculation  initializes and seeds the system,
calls performiterations, and returns the final land/throw ratio
 main  prepares the random number generator,
calls performcalculation, uses saveresult to save the results into a file
The code is structured to clearly identify the sections that perform the primary independent work of the problem (throwdart),
and the parts that organize its results and saving data to an output file.
The C source is shown in Listing 1.
Listing 1  circlepi.c
#include <stdio.h> /* standard I/O routines */
#include <stdlib.h> /* standard library routines, such as memory */
#include <math.h> /* math libraries */
/* Produces random doubleprecision numbers in [0,1) */
inline double myrandom();
inline double myrandom()
{/* You're welcome to substitute a better random number generator */
return ((double)rand()+((double)rand())/RAND_MAX)/RAND_MAX;
}
inline unsigned long throwdart(double bottom, double top);
inline unsigned long throwdart(double bottom, double top)
{
double y=myrandom();
double x=myrandom();
y=bottom+y*(topbottom);
return x*x+y*y<1?1:0;
}
void performiterations(unsigned long long *numeratorOut, unsigned long long *denominatorOut,
unsigned long long iterationcount, double bottom, double top);
void performiterations(unsigned long long *numeratorOut, unsigned long long *denominatorOut,
unsigned long long iterationcount, double bottom, double top)
{
unsigned long long numerator=0;
unsigned long long denominator=0;
unsigned long long index;
for(index=0; index<iterationcount; index++) {
numerator+=throwdart(bottom, top);
numerator+=throwdart(bottom, top);
numerator+=throwdart(bottom, top);
numerator+=throwdart(bottom, top);
denominator++;
if (!(0xfffffL&denominator)) printf("%llu...\n", iterationcountdenominator);
}
numeratorOut[0]+=numerator;
denominatorOut[0]+=denominator;
}
#define MaxCount 0x1000000LL
void performcalculation(unsigned long long *numeratorOut, unsigned long long *denominatorOut);
void performcalculation(unsigned long long *numeratorOut, unsigned long long *denominatorOut)
{
unsigned long long numerator=0;
unsigned long long denominator=0;
performiterations(&numerator, &denominator, MaxCount, 0, 1);
numeratorOut[0]+=numerator;
denominatorOut[0]+=denominator;
}
void fprintresult(FILE *stream, unsigned long long numerator, unsigned long long denominator);
void fprintresult(FILE *stream, unsigned long long numerator, unsigned long long denominator)
{
double picalculation=numerator/(double)denominator;
double errorestimate=sqrt((double)numerator)/(double)denominator;
char dashS[]="";
dashS[(long)(log((double)numerator+1)/log(10))]=0;
fprintf(stream, " %llu\n", numerator);
fprintf(stream, "pi is approximately %s = %.9g +/ %.5g\n", dashS,
picalculation, errorestimate);
fprintf(stream, " %llu\n", denominator);
}
void saveresult(char *filename, unsigned long long numerator, unsigned long long denominator);
void saveresult(char *filename, unsigned long long numerator, unsigned long long denominator)
{
if (filename) {
FILE *fp=fopen(filename, "a+");
if (fp) {
fprintresult(fp, numerator, denominator);
fclose(fp);
}
}
}
#include <time.h>
int main(void)
{
unsigned long long numerator=0;
unsigned long long denominator=0;
{
long throwoutcount=time(NULL)&0x3ffL;
printf("Initializing random number generator by throwing out %d random numbers...\n",
throwoutcount);
while (throwoutcount) myrandom();
}
printf("Beginning calculation... (first random is %g)\n", myrandom());
performcalculation(&numerator, &denominator);
fprintresult(stdout, numerator, denominator);
saveresult("output", numerator, denominator);
return 0;
}


When this code is run with its MaxCount constant set to 0x1000000 (16777216) counts, the code produces this output:
Initializing random number generator by throwing out 447 random numbers...
Beginning calculation... (first random is 0.633351)
15728640...
14680064...
13631488...
12582912...
11534336...
10485760...
9437184...
8388608...
7340032...
6291456...
5242880...
4194304...
3145728...
2097152...
1048576...
0...
52705958
pi is approximately  = 3.1415199 +/ 0.00043272
16777216

After it counts down its work, the result is a reasonable approximation of pi, consistent with an error
estimate based on Poisson statistics.
After: parallelcirclepi
Choosing the appropriate partitioning of the problem is key to parallelizing that application.
Here we chose to group dart throws and assign them to various processors.
For code simplicity, we don't partition the area of the circle, allowing
the groups of darts spread over the entire space,
but a scenario where one would partition the space could be a more appropriate approach
for other problems.
Changes to performcalculation are "boilerplate" code that start up and shut down the parallel environment and
determine whether to call performmasteractivity or performslaveactivity and the name of the output file.
performmasteractivity organizes and assigns work to the other processors, which call performslaveactivity.
Note that the routines that perform the primary work of the problem, performiterations and throwdart, are unchanged.


performmasteractivity partitions and assigns work to the other
processors. The code declares data structures to format the master routines work assignments for the slave routines and
the slaves' replies back to the master. The slave code simply interprets these work assignments and call performiterations
to complete the work. The master code contains the "intelligence" to dynamically respond to how quickly the slaves reply.
Whenever a slave's results return, the master assigns more work to that slave. The slaves need not be in lock step with each other.
In fact, the master can handle the slaves being completely out of sync with one another. How the master responds is driven primarily
by the state of the message passing to them.
As work is being completed, performmasteractivity collects and compiles
the data back at processor 0 so, ultimately, main can
write the output in one place convenient for the user.
The detailed answer is in the code.
Listing 2 shows the C source of parallelcirclepi, with major changes relative to circlepi hyperlinked.
Listing 2  parallelcirclepi.c, with changes compared to circlepi.c underlined
#include <stdio.h> /* standard I/O routines */
#include <stdlib.h> /* standard library routines, such as memory */
#include <math.h> /* math libraries */
/* Produces random doubleprecision numbers in [0,1) */
inline double myrandom();
inline double myrandom()
{/* You're welcome to substitute a better random number generator */
return ((double)rand()+((double)rand())/RAND_MAX)/RAND_MAX;
}
inline unsigned long throwdart(double bottom, double top);
inline unsigned long throwdart(double bottom, double top)
{
double y=myrandom();
double x=myrandom();
y=bottom+y*(topbottom);
return x*x+y*y<1?1:0;
}
void performiterations(unsigned long long *numeratorOut, unsigned long long *denominatorOut,
unsigned long long iterationcount, double bottom, double top);
void performiterations(unsigned long long *numeratorOut, unsigned long long *denominatorOut,
unsigned long long iterationcount, double bottom, double top)
{
unsigned long long numerator=0;
unsigned long long denominator=0;
unsigned long long index;
for(index=0; index<iterationcount; index++) {
numerator+=throwdart(bottom, top);
numerator+=throwdart(bottom, top);
numerator+=throwdart(bottom, top);
numerator+=throwdart(bottom, top);
denominator++;
if (!(0xfffffL&denominator)) printf("%llu...\n", iterationcountdenominator);
}
numeratorOut[0]+=numerator;
denominatorOut[0]+=denominator;
}
#pragma mark  Parallel code follows
#include "mpi.h" /* MPI library */
int ppinit(int *idproc, int *nproc);
void ppexit(void);
/* The maximum number of iterations: can be set beyond the billions */
#define MaxCount 0x1000000LL
#define NumPartitions (MaxCount>=0x10000000000000LL?0x10000000L
:MaxCount>0x1000000LL?MaxCount>>20L:16LL)
/* The data structure sent to the slave nodes to specify what part of the
problem to work on */
typedef struct PartitionDataStruct {
unsigned long long iterationcount; /* 0 means quit */
double bottom, top; /* specifies the edges of the sample range */
long partitionindex, totalpartitions;
} PartitionDataStruct, *PartitionDataPtr, **PartitionDataHandle;
/* The data structure sent back to the master containing the slave's output */
typedef struct FractionDataStruct {
unsigned long long numerator;
unsigned long long denominator;
} FractionDataStruct, *FractionDataPtr, **FractionDataHandle;
/* These data structures are defined above so that both the master and slave
know their formats */
/* Accepts instructions from proc 0 to work on the problem, then replies with its answer*/
void performslaveactivity(unsigned long long *numeratorOut, unsigned long long *denominatorOut);
void performslaveactivity(unsigned long long *numeratorOut, unsigned long long *denominatorOut)
{
int staying=1;
while (staying)
{int ierr;
PartitionDataStruct incoming={0,0,0};
MPI_Status status;
ierr=MPI_Recv(&incoming, sizeof(incoming), MPI_BYTE, 0, 0,
MPI_COMM_WORLD, &status);
if (ierr!incoming.iterationcount)
staying=0;
else {
FractionDataStruct reply={0,0};
printf("performing %llu iterations in [%g, %g)...\n",
incoming.iterationcount, incoming.bottom, incoming.top);
/* In this example, only the iterationcount, bottom, and top fields are used,
but other fields might be appropriate for other problems */
performiterations(&reply.numerator, &reply.denominator,
incoming.iterationcount, incoming.bottom, incoming.top);
ierr=MPI_Send(&reply, sizeof(reply), MPI_BYTE, 0, 1, MPI_COMM_WORLD);
numeratorOut[0]+=reply.numerator;
denominatorOut[0]+=reply.denominator;
}
}
}
#include <string.h>
/* The ProcActivityDataStruct is what the master uses to hold the incoming (FractionDataStruct)
and outgoing (PartitionDataStruct) data structures and related data (MPI_Request) for efficient
use of MPI */
typedef struct ProcActivityDataStruct {
long state; /* keeps track of the state of this processing element */
long partitionindex; /* keeps track of which partition was assigned */
MPI_Request sendreq, recvreq;
/* sent to MPI_Test; returned by MPI_Isend and MPI_Irecv */
FractionDataStruct fds; /* result from slave */
PartitionDataStruct pds; /* assignment specification for slave */
} ProcActivityDataStruct, *ProcActivityDataPtr, **ProcActivityDataHandle;
void fprintresult(FILE *stream, unsigned long long numerator, unsigned long long denominator);
void performmasteractivity(unsigned long long *numeratorOut, unsigned long long *denominatorOut,
unsigned long long iterationcount, int nproc);
void performmasteractivity(unsigned long long *numeratorOut, unsigned long long *denominatorOut,
unsigned long long iterationcount, int nproc)
{void *datap;
long allocation;
/* Set the MPI so that MPI errors are recoverable */
MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
datap=malloc(allocation=(sizeof(ProcActivityDataStruct))*nproc
+sizeof(char)*NumPartitions+4);
if (datap) {
/* pointer to the process activity array that keeps track of the progress by each process */
ProcActivityDataPtr procp;
/* pointer to the partition array that keeps track of the progress on each partition */
unsigned char *partitionp;
long nextpartitionindex=0;
int staying=1, ierr=0;
double nextupdate=0, finaltimeout=0;
/* clear memory */
memset(datap, 0, allocation);
/* set up data structures */
procp=datap;
partitionp=(unsigned char *)&procp[nproc];
/* here's where the work is assigned, retrieved, and organized:
the inside of this while loop could also be called repeatedly by, say, an event loop */
while (staying)
{long index;
int flag, procactivity=0, denomchanged=0, allslavenodesidle=1;
MPI_Status status;
/* loop over all processors */
for(index=0; index<nproc; index++) {
ProcActivityDataPtr np=&procp[index];
/* if nextpartitionindex isn't pointing to an unassigned partition */
if ((nextpartitionindex<0)
(partitionp[nextpartitionindex]>=2)) {
/* search through the partition array for the next piece of unassigned work */
for(nextpartitionindex=0;
(nextpartitionindex<NumPartitions)
&&partitionp[nextpartitionindex];
nextpartitionindex++) ;
/* if all partitions have been assigned */
if (nextpartitionindex>=NumPartitions) {
/* look for anything else that hasn't completed */
for(nextpartitionindex=0;
(nextpartitionindex<NumPartitions)
&&(partitionp[nextpartitionindex]>=2);
nextpartitionindex++) ;
}
}
if (index>0)
switch (np>state) {/* the state of this process */
case 0: /* idle */
/* if new work is available */
if ((nextpartitionindex>=0)
&&(nextpartitionindex<NumPartitions)) {
/* fill the work assignment structure */
np>pds.iterationcount=
iterationcount/NumPartitions;
np>pds.bottom=0;
np>pds.top=1;
np>pds.partitionindex=nextpartitionindex;
np>pds.totalpartitions=NumPartitions;
/* keep track of which partition we're assigning this process */
np>partitionindex=nextpartitionindex;
ierr=MPI_Isend(&np>pds, sizeof(np>pds),
MPI_BYTE, index,
0, MPI_COMM_WORLD, &np>sendreq);
if (!ierr) {
/* mark this entry in the partition array as assigned */
partitionp[np>partitionindex]=1;
/* this partition is checked out; don't let it be assigned twice */
nextpartitionindex=1;
}
procactivity=1;
np>state++;
}
break;
case 1: /* waiting for the send to complete */
ierr=MPI_Test(&np>sendreq, &flag, &status);
if (!ierr)
if (flag) {
/* MPI_Isend completed; post a receive */
ierr=MPI_Irecv(&np>fds,
sizeof(np>fds), MPI_BYTE,
index, 1, MPI_COMM_WORLD,
&np>recvreq);
/* Note: if this was a variable amount of data, one would probably add a few more process
activity states here so that we would post a fixedsized receive on data saying how big
the data chunk is, followed by another receive on the actual data chunk using that size data.
Memory allocation can be fixed or performed on the fly. */
procactivity=1;
np>state++;
}
break;
case 2: /* waiting for receive to complete
(we could try to be more clever and perform the next Isend while waiting for the data,
but that would be getting fancy (and less readable)) */
ierr=MPI_Test(&np>recvreq, &flag, &status);
if (!ierr)
if (flag) {
/* MPI_Irecv completed; collect the data */
printf("Received data for partition %d from proc #%d: %llu / %llu \n",
np>partitionindex, index, np>fds.numerator, np>fds.denominator);
/* if this partition's work didn't already get finished */
if (partitionp[np>partitionindex]<2) {
/* incorporate this process' results in the larger result */
numeratorOut[0]+=np>fds.numerator;
denominatorOut[0]+=np>fds.denominator;
/* check it off as finished */
partitionp[np>partitionindex]=2;
denomchanged=1;
}
procactivity=1;
np>state++;
}
break;
case 3: /* work complete; return to idle */
procactivity=1;
np>state=0;
break;
default: /* in case something goes wrong */
np>state=0;
break;
}
if (np>state) allslavenodesidle=0;
}
/* end loop over all processors */
/* if no other interesting activity occurred */
if (!procactivity) {/* might as well find myself some work */
unsigned long long numerator=0;
unsigned long long denominator=0;
if ((nextpartitionindex>=0)
&&(nextpartitionindex<NumPartitions)) {
/* if this work has not been performed already */
if (partitionp[nextpartitionindex]<2) {
/* do the work */
partitionp[nextpartitionindex]++;
performiterations(&numerator, &denominator,
iterationcount/NumPartitions, 0, 1);
printf("Proc zero completing iterations for partition %d \n", nextpartitionindex);
partitionp[nextpartitionindex]++;
nextpartitionindex=1; /* checked out */
/* incorporate results */
numeratorOut[0]+=numerator;
denominatorOut[0]+=denominator;
}
}
else if (nextpartitionindex>=NumPartitions) {
/* time to think about finishing up */
if (allslavenodesidle
(finaltimeout?MPI_Wtime()>finaltimeout
:(finaltimeout=MPI_Wtime()+10,0)))
/* all done */
staying=0;
}
}
/* if the final result has changed */
if (denomchanged) if (MPI_Wtime()>nextupdate) {
/* Report interim results for the user */
printf("%llu iterations remaining. Progress so far:\n",
iterationcountdenominatorOut[0]);
fprintresult(stdout, numeratorOut[0], denominatorOut[0]);
nextupdate=MPI_Wtime()+10; /* but not too often */
}
}
/* end while(staying) loop */
{long index;
/* if we know we won't be using those processors any more, then tell everyone to quit */
for(index=1; index<nproc; index++) {
ProcActivityDataPtr np=&procp[index];
printf("asking proc #%d to quit...\n", index);
np>pds.iterationcount=0; /* this is the quit signal */
ierr=MPI_Send(&np>pds, sizeof(np>pds), MPI_BYTE, index,
0, MPI_COMM_WORLD);
/* reusing the same structure because that's the data size they expect */
}
}
/* filling in last little bit */
{unsigned long long numerator=0;
unsigned long long denominator=0;
unsigned long long remainder=
iterationcountNumPartitions*(iterationcount/NumPartitions);
if (remainder>0) {
performiterations(&numerator, &denominator, remainder, 0, 1);
numeratorOut[0]+=numerator;
denominatorOut[0]+=denominator;
}
}
free(datap);
}
}
char *createsavefilename(int idproc, int nproc);
char *createsavefilename(int idproc, int nproc)
{char *savefilename=malloc(256);
if (savefilename) {
char formatstring[256];
sprintf(formatstring, "output%%.%dd%%.%dd",
(int)(log(nproc)/log(10)+1), (int)(log(nproc)/log(10)+1));
sprintf(savefilename, formatstring, idproc, nproc);
}
return savefilename;
}
/* determines basic cluster attributes and calls the appropriate parallel computing routines */
char *performcalculation(unsigned long long *numeratorOut, unsigned long long *denominatorOut);
char *performcalculation(unsigned long long *numeratorOut, unsigned long long *denominatorOut)
{char *savefilename=NULL;
unsigned long long numerator=0;
unsigned long long denominator=0;
int ierr=0;
int nproc=1, idproc=0;
ierr=ppinit(&idproc, &nproc);
if (!ierr) {
if (idproc)
performslaveactivity(&numerator, &denominator);
else
performmasteractivity(&numerator, &denominator, MaxCount, nproc);
savefilename=createsavefilename(idproc, nproc);
ppexit();
}
else {
printf("MPI_Init has returned error %d. Assuming singleprocessor... \n",
ierr);
performiterations(&numerator, &denominator, MaxCount, 0, 1);
}
numeratorOut[0]+=numerator;
denominatorOut[0]+=denominator;
return savefilename;
}
#pragma mark  the bulk of the parallel code is above
/* fprints the fraction of numerator over denominator */
void fprintresult(FILE *stream, unsigned long long numerator, unsigned long long denominator)
{
double picalculation=numerator/(double)denominator;
double errorestimate=sqrt((double)numerator)/(double)denominator;
char dashS[]="";
dashS[(long)(log((double)numerator+1)/log(10))]=0;
fprintf(stream, " %llu\n", numerator);
fprintf(stream, "pi is approximately %s = %.9g +/ %.5g\n", dashS,
picalculation, errorestimate);
fprintf(stream, " %llu\n", denominator);
}
/* save the fraction into a file */
void saveresult(char *filename, unsigned long long numerator, unsigned long long denominator);
void saveresult(char *filename, unsigned long long numerator, unsigned long long denominator)
{
if (filename) {
FILE *fp=fopen(filename, "a+");
if (fp) {
fprintresult(fp, numerator, denominator);
fclose(fp);
}
}
}
#include <time.h>
int main(void)
{
unsigned long long numerator=0;
unsigned long long denominator=0;
char *savefilename=NULL;
{
long throwoutcount=time(NULL)&0x3ffL;
printf("Initializing random number generator by throwing out %d random numbers...\n",
throwoutcount);
while (throwoutcount) myrandom();
}
printf("Beginning calculation... (first random is %g)\n", myrandom());
savefilename=performcalculation(&numerator, &denominator);
printf("Calculations complete.\n");
fprintresult(stdout, numerator, denominator);
if (savefilename) {
saveresult(savefilename, numerator, denominator);
free(savefilename);
}
return 0;
}
#ifdef __MWERKS__
/* only for Metrowerks CodeWarrior */
#include <SIOUX.h>
#endif
int ppinit(int argc, char *argv[], int *idproc, int *nproc)
{
/* this subroutine initializes parallel processing
idproc = processor id
nproc = number of real or virtual processors obtained */
int ierr;
*nproc=0;
/* initialize the MPI execution environment */
ierr = MPI_Init(&argc, &argv);
if (!ierr) {
/* determine the rank of the calling process in the communicator */
ierr = MPI_Comm_rank(MPI_COMM_WORLD, idproc);
/* determine the size of the group associated with the communicator */
ierr = MPI_Comm_size(MPI_COMM_WORLD, nproc);
#ifdef __MWERKS__
/* only for Metrowerks CodeWarrior */
SIOUXSettings.asktosaveonclose=0;
SIOUXSettings.autocloseonquit=1;
#endif
}
return ierr;
}
void ppexit(void)
{
/* this subroutine terminates parallel processing */
int ierr;
/* waits until everybody gets here */
ierr = MPI_Barrier(MPI_COMM_WORLD);
/* terminate MPI execution environment */
ierr = MPI_Finalize();
}


The changes from pascalstriangle to parallelpascalstriangle:
 mpi.h  the header file for the MPI library, required to access information about the parallel system and perform communication
 idproc, nproc 
nproc describes how many processors are currently running this job and
idproc identifies the designation, labeled from 0 to nproc  1, of this processor.
This information is sufficient to identify exactly which part of the problem this instance of the executable should work on.
If idproc is zero then performmasteractivity is called,
otherwise performslaveactivity executes.
 NumPartitions 
the code will cut up the overall problem into
NumPartitions pieces.
The partition size should be big enough so that a reasonable amount of work is performed on
each processor, but not so big as to let processors run without periodic communication.
For a problem like this, pieces that take at least a few seconds is enough.
This #define is written so that the number of partitions scale with the overall
problem size while fitting within a reasonable range.
 PartitionDataStruct 
This data structure defines the format of the message the master sends to the slaves
to specify what part of the problem to work on.
iterationcount holds how many counts the slave should make. For this
example, that is as much as the slave needs to know.
We need some way of letting the slave know that the calculation is finished,
so a 0 in iterationcount is a sufficient quit signal, but one could add and use an
explicit message field instead.
bottom and top
indicate what part of the problem space the slave should sample in, and
partitionindex and totalpartitions specify which partition this slave has been
assigned. This other data could be useful for other problems, such as numerical
integration.
 FractionDataStruct 
This data structure defines the format of the reply the slave sends back to the master
containing its results. The master aggregates this data as
the calculation is being performed.
 performslaveactivity 
performslaveactivity waits for a
PartitionDataStruct message from the master by calling MPI_Recv.
Assuming iterationcount is not zero, it calls
performiterations with the specified number of interations, then
returns the numerator and denominator results via a FractionDataStruct reply
via MPI_Send.
 ProcActivityDataStruct 
This data structure facilitates the bookkeeping the master performs
in order to monitor and direct the progress of the slave processors.
It keeps track of the state of the slave, which part of the work has
been assigned, the MPI_Request variables for the asynchronous communication
with the slaves, and the data structures for the messages sent to and received from
the slaves.
 performmasteractivity 
performs the management of assignments for and data from the slaves, as well as working on
the problem when there is nothing else to do. Other procs can finish out of order or at faster
or slower speeds, and this routine will sort out the work and assign more work to others
as they finish their previous partition. It will also reassign work that a node failed
to complete.
The processes and partitions operate in a "state space", inspired by Quantum Mechanics.
Each entity evolves, at the appropriate moment, from one state to the next. They are
therefore allowed to evolve and operate without being in lockstep with one another.
In other words, their individual evolution can differ, or even change speed, at any time.
This evolution through a series of clearly defined "states" makes possible complex,
yet asynchronous, behavior.
Each partition has three states:
 0: unassigned, no work done
 1: the work in this partition has been assigned to a process
 2: the work in this partition is complete
This state data is stored in the partition array.
Each process has three important states:
 0: Idle, no work assigned
 1: work has been assigned to this process, and the send of the assignment (via MPI_Isend)
has yet to be cleared
 2: work has been assigned to this process, and we are waiting on its results
This state data is stored in the state field of the process activity array.
Fancier things that could also be done:
 1. After confirming that a request was sent, queue another request even before the
last one returned a reply.
 2. Time and evaluate the performance of each process, so that one could grow the
assignment size for better performing processors.
 3. Enhanced fault tolerance in case another process disappears, then reappears,
with timeouts.
Many of these features are implemented in
the Fresnel Diffraction Explorer.
 createsavefilename 
creates the name of the file as a function of idproc and nproc,
so that we don't have name collisions.
 performcalculation 
called by main, this routine calls ppinit
to determine attributes about the cluster and call
the slave or master routines. After those routines return, it
returns the final aggregate results and determines the name of the output file.
 MPI_Recv, MPI_Send, MPI_Irecv, MPI_Isend, and MPI_Test 
These elemental MPI calls perform the communication linking processors.
MPI_Recv and MPI_Irecv receives a message sent using
MPI_Send and MPI_Isend.
In MPI_Irecv and MPI_Isend, control is returned to the
caller immediately (hence the "I"),
allowing the caller to do other work while the message is being handled.
This behavior is called asynchronous message passing, and these
messages are completed with an MPI_Test returning true or
an MPI_Wait.
Since performslaveactivity waits on every
message it completes with the master
MPI_Recv and MPI_Send are sufficient for
all communication it has with the master.
By contrast, performmasteractivity performs much more complex message passing
than the slaves because it needs to regulate work and data with the slaves as
otherwise indepedent entities.
MPI_Irecv and MPI_Isend are perfect for this purpose because
they make it possible to initiate communication with one process
while moving on to handle another. MPI_Test indicates whether or not
the master can take the next step with a particular process.
The code in the switch (np>state) statement shows the step by step
evolution of the state of a particular process, each regulated by an MPI call.
 MPI_Errhandler_set 
called by performmasteractivity, this routine tells the MPI
not to abort in case an MPI call returns an error.
By default, all MPI errors are fatal. MPI_Errhandler_set
tells the MPI on the master node that we
know how to recover when a processor is lost or other problems occur.
It is not necessary for the slaves to call this because we want them to
exit if they can't communicate with the master.
 MPI_COMM_WORLD 
MPI defines communicator worlds or communicators that define a set of processors that can communicate with each other.
At initialization, one communicator, MPI_COMM_WORLD, covers all the processors in the system. Other MPI calls
can define arbitrary subsets of MPI_COMM_WORLD, making it possible to confine a code to a particular processor subset
just by passing it the appropriate communicator.
In simpler cases such as this, using MPI_COMM_WORLD is sufficient.
 ppinit 
performs initialization of the parallel computing environment.
Part of the boilerplate for MPI codes, this routine calls:
 MPI_Init  performs the actual initialization of MPI. It returns the error code; zero means no error.
 MPI_Comm_size  accesses the processor count of the parallel system
 MPI_Comm_rank  accesses the identification number of this particular processor
 ppexit  calling MPI_Finalize terminates
the parallel computing environment. Also part of the boilerplate of MPI codes, a call to
MPI_Finalize is needed to properly cleanup and close the connections between codes running on other
processors and release control. The call to
MPI_Barrier is optional, included here so that when all processors end up waiting for the user at processor 0 to
press the return key.
 __MWERKS__ 
these #ifdef blocks merely compensate for a pecularity about the Metrowerks CodeWarrior compiler.
These blocks allows the executable on other processors terminate automatically.
If you are using another compiler, you don't have to worry about this code.
Note that the routines that perform the actual iterative calculations,
throwdart and performiterations,
need not be altered at all.
That is possible because
they were supplied just enough data fulfill each processor's responsibility,
the number of dart throws.
performmasteractivity delegated work to the processors and
regulated all communication.
performcalculation's changes merely perform the boilerplate work of
setting up and tearing down the
parallel environment, plus the name of the output file.
main was nearly unchanged.
When this code, MaxCount constant set to 0x1000000,
is run in parallel on three processors, the code produces this output:
Processor 2 output

Initializing random number generator by throwing out 398 random numbers...
Beginning calculation... (first random is 0.770046)
performing 1048576 iterations in [0, 1)...
0...
performing 1048576 iterations in [0, 1)...
0...
performing 1048576 iterations in [0, 1)...
0...
performing 1048576 iterations in [0, 1)...
0...
performing 1048576 iterations in [0, 1)...
0...
Calculations complete.
16470391
pi is approximately  = 3.14147778 +/ 0.00077407
5242880


Processor 1 output

Initializing random number generator by throwing out 763 random numbers...
Beginning calculation... (first random is 0.985489)
performing 1048576 iterations in [0, 1)...
0...
performing 1048576 iterations in [0, 1)...
0...
performing 1048576 iterations in [0, 1)...
0...
performing 1048576 iterations in [0, 1)...
0...
performing 1048576 iterations in [0, 1)...
0...
Calculations complete.
16470377
pi is approximately  = 3.14147511 +/ 0.00077407
5242880


Processor 0 output

Initializing random number generator by throwing out 399 random numbers...
Beginning calculation... (first random is 0.955361)
0...
Proc zero completing iterations for partition 2
Received data for partition 0 from proc #1: 3295703 / 1048576
Received data for partition 1 from proc #2: 3295703 / 1048576
13631488 iterations remaining. Progress so far:
9887109
pi is approximately  = 3.14302731 +/ 0.00099957
3145728
0...
Proc zero completing iterations for partition 5
Received data for partition 3 from proc #1: 3292641 / 1048576
Received data for partition 4 from proc #2: 3292641 / 1048576
0...
Proc zero completing iterations for partition 8
Received data for partition 6 from proc #1: 3293709 / 1048576
Received data for partition 7 from proc #2: 3293709 / 1048576
0...
Proc zero completing iterations for partition 11
Received data for partition 9 from proc #1: 3293190 / 1048576
Received data for partition 10 from proc #2: 3293190 / 1048576
4194304 iterations remaining. Progress so far:
39525729
pi is approximately  = 3.14122272 +/ 0.00049964
12582912
0...
Proc zero completing iterations for partition 14
Received data for partition 12 from proc #1: 3295128 / 1048576
Received data for partition 13 from proc #2: 3295128 / 1048576
0...
Proc zero completing iterations for partition 15
asking proc #1 to quit...
asking proc #2 to quit...
Calculations complete.
52706176
pi is approximately  = 3.14153288 +/ 0.00043272
16777216


The output for the slave processors, #1 and #2, are quite simple. They
show that they perform their calculations, 1048576 at a time, and calculate their results.
Processor #0, however, performs a more complex task of assigning work and collecting
results, even out of order.
As the calculation proceeds, processor #0 reports more and more accurate results for pi.
When the calculation ends, each processor reports its estimate of pi based on the
results it has. The slave nodes only have the data they calculated, so their estimate
uses fewer iterations than what was performed by the cluster as a whole.
Consequently the results of any one processor alone gives a lesser precision,
roughly 0.000774.
Processor 0, however, has the results of entire calculation,
so it reports that pi is approximately 3.14153288 with an error estimate of 0.00043272.
So we were able to perform the same sized calculation in a shorter time, and achieved
a similar result to the singleprocessor case.
Conversely, we may increase MaxCount, increasing the problem size, to achieve a
more accurate result.
Doing so with MaxCount set to 0x10000000 (268435456) gives:
843316970
pi is approximately  = 3.14160053 +/ 0.00010818
268435456

As expected, 16 times as many samples gives a result 4 times as accurate.
The purpose of this discussion was to give an
example of a code that automatically load balanced a problem that
could be divided into mostly independent work.
We chose a simple problem of this category
so attention could be drawn to the parallelization.
A high enough iteration count can eventually exceed the precision of the
floatingpoint representation, so higherprecision iteration routines
would have to be substituted.
Components of your code may
correspond to the routines in circlepi,
so this parallelization example could suggest how to parallelize your code.
Alternatively, you may wish to substitute the
performiterations routine with
a call to your existing
code if you know your work can be divided into independent pieces.
This example can be a starting point
to experiment with parallel computing.
For example, one could try modifying the
performmasteractivity
to reacquire nodes that recover, or
reorganize the message passing to queue a work assignment to a processor while
it is already working on a previous assignment.
Some of this code could be incorporated into an existing code, say
one with an event loop, so that remote processor regulation can be
ongoing with a user iterface.
Many of these features are combined in
the Fresnel Diffraction Explorer.
These fancier features are not shown here because they would distract from the purpose of this discussion,
but the reader is welcome to explore the possibilities.
References
Using MPI by William Gropp, Ewing Lusk and Anthony Skjellum
is a good reference for the MPI library. Many of the techniques and
style of the above parallel code were adopted from Dr. Viktor K. Decyk.
He expresses these ideas in a few references, including:
 V. K. Decyk, "How to Write (Nearly) Portable Fortran Programs for Parallel Computers", Computers In Physics,
7, p. 418 (1993).
 V. K. Decyk, "Skeleton PIC Codes for Parallel Computers", Computer Physics Communications,
87, p. 87 (1995).
You're welcome to read more about parallel computing
via the Tutorials page.
