Home       About       Products       Vault       Store       Media        Press
 
Dauger Research Vault
 

    Tutorial


circlepi.c
parallelcirclepi.c
Fortran versions coming soon!

Parallelization - Parallel Circle Pi

We describe the conversion of a single-processor code into a parallel code. In this example, we demonstrate a problem whose parts can be divided into mostly independent pieces on a set of nodes. However, what happens if the nodes differ in computational power? The simplest "divide and conquer" approach is less efficient because some parts of the work would be completed ahead of others. Organizing and reassigning work to compensate for such performance differences is called "load balancing". In this example, the work of the code is automatically load balanced between the processors in an efficient and dynamic way.

This technique applies to problems that can be cut up into relatively independent pieces and more numerous than the number of processors available. This example is an extension of the Parallel Adder tutorial. There, the code simply divided the problem size by the processor count and the separate nodes performed their work. Here, we take advantage of a problem that can be cut up into pieces more numerous than the number of processors. Then we assign more work to nodes that complete work more quickly. This is useful if you anticipate that your code will be running in a heterogeneous environment, that is, one where the processors are not identical. A different technique would be used to divide work that requires constant interaction. Finally, if you are learning about parallel code, the following example shows how to partition work dynamically using elemental message-passing in a parallel code.

Before: Single-Processor circlepi

This single-processor example, circlepi, calculates pi using a very simple approach. The principle of the algorithm considers a circle inscribed in a square. Randomly throw darts at the square and count how many land inside the circle. Assuming a unit circle, the area of the circle is pi and the area of the square is 4. Therefore the ratio of those darts that landed inside the circle over how many were thrown should asymptotically approach pi/4. Multiplying this ratio by 4 gives pi.

  

In the code we represent the random dart throws by generating random number pairs, and we perform a simple test on those number pairs to determine if the dart landed inside the circle. After that, it's a simple counting process. The calculation could be performed exclusively with integer math, easing the creation of an implementation using arbitrarily high-precision dart throws. For the sake of brevity, we allow floating-point math in this code example's dart throw.

This code has three primary routines plus main:

  • throwdart - retrieves random number pairs and compares the sums of their squares against 1 and returns the result. The number pairs represent the landing coordinates (x,y) of the dart. These coordinates range up to 1, so the dart landed inside the circle if x*x+y*y is less than 1. This routine simply returns true if the dart is inside the circle, otherwise false.

  • performiterations - calls throwdart repeatedly, counting how many times it returns true and how many times it threw the dart. The ratio is multiplied by four here by calling throwdart four times as many times.

  • performcalculation - initializes and seeds the system, calls performiterations, and returns the final land/throw ratio

  • main - prepares the random number generator, calls performcalculation, uses saveresult to save the results into a file

The code is structured to clearly identify the sections that perform the primary independent work of the problem (throwdart), and the parts that organize its results and saving data to an output file. The C source is shown in Listing 1.


Listing 1 - circlepi.c

#include <stdio.h> /* standard I/O routines */
#include <stdlib.h> /* standard library routines, such as memory */
#include <math.h> /* math libraries */

/* Produces random double-precision numbers in [0,1) */ 
inline double myrandom(); 
inline double myrandom()
	{/* You're welcome to substitute a better random number generator */
	return ((double)rand()+((double)rand())/RAND_MAX)/RAND_MAX; 
	}

inline unsigned long throwdart(double bottom, double top); 
inline unsigned long throwdart(double bottom, double top)
	{
	double y=myrandom(); 
	double x=myrandom(); 
	y=bottom+y*(top-bottom); 
	return x*x+y*y<1?1:0; 
	}
	
void performiterations(unsigned long long *numeratorOut, unsigned long long *denominatorOut, 
	unsigned long long iterationcount, double bottom, double top); 
void performiterations(unsigned long long *numeratorOut, unsigned long long *denominatorOut, 
	unsigned long long iterationcount, double bottom, double top)
	{
	unsigned long long numerator=0; 
	unsigned long long denominator=0; 
	unsigned long long index; 
	
	for(index=0; index<iterationcount; index++) {
		numerator+=throwdart(bottom, top); 
		numerator+=throwdart(bottom, top); 
		numerator+=throwdart(bottom, top); 
		numerator+=throwdart(bottom, top); 
		denominator++; 
		if (!(0xfffffL&denominator)) printf("%llu...\n", iterationcount-denominator); 
		}
	
	numeratorOut[0]+=numerator; 
	denominatorOut[0]+=denominator; 
	}


#define MaxCount		0x1000000LL

void performcalculation(unsigned long long *numeratorOut, unsigned long long *denominatorOut); 
void performcalculation(unsigned long long *numeratorOut, unsigned long long *denominatorOut)
	{
	unsigned long long numerator=0; 
	unsigned long long denominator=0; 
	
	performiterations(&numerator, &denominator, MaxCount, 0, 1); 
	
	numeratorOut[0]+=numerator; 
	denominatorOut[0]+=denominator; 
	}

void fprintresult(FILE *stream, unsigned long long numerator, unsigned long long denominator); 
void fprintresult(FILE *stream, unsigned long long numerator, unsigned long long denominator)
	{
	double picalculation=numerator/(double)denominator; 
	double errorestimate=sqrt((double)numerator)/(double)denominator; 
	char dashS[]="--------------------"; 
	dashS[(long)(log((double)numerator+1)/log(10))]=0; 
	fprintf(stream, "                     %llu\n", numerator); 
	fprintf(stream, "pi is approximately -%s-- = %.9g +/- %.5g\n", dashS, 
		picalculation, errorestimate); 
	fprintf(stream, "                     %llu\n", denominator); 
	}

void saveresult(char *filename, unsigned long long numerator, unsigned long long denominator); 
void saveresult(char *filename, unsigned long long numerator, unsigned long long denominator)
	{
	if (filename) {
		FILE *fp=fopen(filename, "a+"); 
		if (fp) {
			fprintresult(fp, numerator, denominator); 
			fclose(fp); 
			}
		}
	}

#include <time.h>

int main(void)
	{
	unsigned long long numerator=0; 
	unsigned long long denominator=0; 
	{
	long throwoutcount=time(NULL)&0x3ffL; 
	printf("Initializing random number generator by throwing out %d random numbers...\n", 
		throwoutcount);
	while (throwoutcount--) myrandom(); 
	}
	printf("Beginning calculation... (first random is %g)\n", myrandom()); 
	
	performcalculation(&numerator, &denominator); 
	
	fprintresult(stdout, numerator, denominator); 
	saveresult("output", numerator, denominator); 
	
	return 0;
	}

When this code is run with its MaxCount constant set to 0x1000000 (16777216) counts, the code produces this output:

Initializing random number generator by throwing out 447 random numbers...
Beginning calculation... (first random is 0.633351)
15728640...
14680064...
13631488...
12582912...
11534336...
10485760...
9437184...
8388608...
7340032...
6291456...
5242880...
4194304...
3145728...
2097152...
1048576...
0...
                     52705958
pi is approximately ---------- = 3.1415199 +/- 0.00043272
                     16777216

After it counts down its work, the result is a reasonable approximation of pi, consistent with an error estimate based on Poisson statistics.

After: parallelcirclepi

Choosing the appropriate partitioning of the problem is key to parallelizing that application. Here we chose to group dart throws and assign them to various processors. For code simplicity, we don't partition the area of the circle, allowing the groups of darts spread over the entire space, but a scenario where one would partition the space could be a more appropriate approach for other problems.

Changes to performcalculation are "boilerplate" code that start up and shut down the parallel environment and determine whether to call performmasteractivity or performslaveactivity and the name of the output file. performmasteractivity organizes and assigns work to the other processors, which call performslaveactivity. Note that the routines that perform the primary work of the problem, performiterations and throwdart, are unchanged.

performmasteractivity partitions and assigns work to the other processors. The code declares data structures to format the master routines work assignments for the slave routines and the slaves' replies back to the master. The slave code simply interprets these work assignments and call performiterations to complete the work. The master code contains the "intelligence" to dynamically respond to how quickly the slaves reply. Whenever a slave's results return, the master assigns more work to that slave. The slaves need not be in lock step with each other. In fact, the master can handle the slaves being completely out of sync with one another. How the master responds is driven primarily by the state of the message passing to them.

As work is being completed, performmasteractivity collects and compiles the data back at processor 0 so, ultimately, main can write the output in one place convenient for the user.

The detailed answer is in the code. Listing 2 shows the C source of parallelcirclepi, with major changes relative to circlepi hyperlinked.


Listing 2 - parallelcirclepi.c, with changes compared to circlepi.c underlined

#include <stdio.h> /* standard I/O routines */
#include <stdlib.h> /* standard library routines, such as memory */
#include <math.h> /* math libraries */

/* Produces random double-precision numbers in [0,1) */ 
inline double myrandom(); 
inline double myrandom()
	{/* You're welcome to substitute a better random number generator */
	return ((double)rand()+((double)rand())/RAND_MAX)/RAND_MAX; 
	}

inline unsigned long throwdart(double bottom, double top); 
inline unsigned long throwdart(double bottom, double top)
	{
	double y=myrandom(); 
	double x=myrandom(); 
	y=bottom+y*(top-bottom); 
	return x*x+y*y<1?1:0; 
	}
	
void performiterations(unsigned long long *numeratorOut, unsigned long long *denominatorOut, 
	unsigned long long iterationcount, double bottom, double top); 
void performiterations(unsigned long long *numeratorOut, unsigned long long *denominatorOut, 
	unsigned long long iterationcount, double bottom, double top)
	{
	unsigned long long numerator=0; 
	unsigned long long denominator=0; 
	unsigned long long index; 
	
	for(index=0; index<iterationcount; index++) {
		numerator+=throwdart(bottom, top); 
		numerator+=throwdart(bottom, top); 
		numerator+=throwdart(bottom, top); 
		numerator+=throwdart(bottom, top); 
		denominator++; 
		if (!(0xfffffL&denominator)) printf("%llu...\n", iterationcount-denominator); 
		}
	
	numeratorOut[0]+=numerator; 
	denominatorOut[0]+=denominator; 
	}

#pragma mark - Parallel code follows 

#include "mpi.h" /* MPI library */

int ppinit(int *idproc, int *nproc); 
void ppexit(void); 

/* The maximum number of iterations: can be set beyond the billions */ 
#define MaxCount		0x1000000LL

#define NumPartitions	(MaxCount>=0x10000000000000LL?0x10000000L
			:MaxCount>0x1000000LL?MaxCount>>20L:16LL)

/* The data structure sent to the slave nodes to specify what part of the 
	problem to work on */ 
typedef struct PartitionDataStruct {
	unsigned long long iterationcount; /* 0 means quit */
	double bottom, top; /* specifies the edges of the sample range */
	long partitionindex, totalpartitions; 
	} PartitionDataStruct, *PartitionDataPtr, **PartitionDataHandle; 

/* The data structure sent back to the master containing the slave's output */ 
typedef struct FractionDataStruct {
	unsigned long long numerator; 
	unsigned long long denominator; 
	} FractionDataStruct, *FractionDataPtr, **FractionDataHandle; 
	
/* These data structures are defined above so that both the master and slave 
	know their formats */
	
/* Accepts instructions from proc 0 to work on the problem, then replies with its answer*/
void performslaveactivity(unsigned long long *numeratorOut, unsigned long long *denominatorOut);
void performslaveactivity(unsigned long long *numeratorOut, unsigned long long *denominatorOut)
	{
	int staying=1; 
	while (staying) 
		{int ierr; 
		PartitionDataStruct incoming={0,0,0}; 
		MPI_Status status; 
		
		ierr=MPI_Recv(&incoming, sizeof(incoming), MPI_BYTE, 0, 0, 
			MPI_COMM_WORLD, &status); 
		if (ierr||!incoming.iterationcount) 
			staying=0; 
		else {
			FractionDataStruct reply={0,0}; 
			printf("performing %llu iterations in [%g, %g)...\n", 
				incoming.iterationcount, incoming.bottom, incoming.top); 
			
/* In this example, only the iterationcount, bottom, and top fields are used, 
			but other fields might be appropriate for other problems */ 
			performiterations(&reply.numerator, &reply.denominator, 
				incoming.iterationcount, incoming.bottom, incoming.top); 
			
			ierr=MPI_Send(&reply, sizeof(reply), MPI_BYTE, 0, 1, MPI_COMM_WORLD); 
			
			numeratorOut[0]+=reply.numerator; 
			denominatorOut[0]+=reply.denominator; 
			}
		}
	
	}

#include <string.h>

/* The ProcActivityDataStruct is what the master uses to hold the incoming (FractionDataStruct) 
and outgoing (PartitionDataStruct) data structures and related data (MPI_Request) for efficient 
use of MPI */
typedef struct ProcActivityDataStruct {
	long state; /* keeps track of the state of this processing element */
	long partitionindex; /* keeps track of which partition was assigned */
	MPI_Request sendreq, recvreq; 
		/* sent to MPI_Test; returned by MPI_Isend and MPI_Irecv */
	FractionDataStruct fds; /* result from slave */
	PartitionDataStruct pds; /* assignment specification for slave */
	} ProcActivityDataStruct, *ProcActivityDataPtr, **ProcActivityDataHandle; 

void fprintresult(FILE *stream, unsigned long long numerator, unsigned long long denominator); 

void performmasteractivity(unsigned long long *numeratorOut, unsigned long long *denominatorOut,
	unsigned long long iterationcount, int nproc); 
void performmasteractivity(unsigned long long *numeratorOut, unsigned long long *denominatorOut,
	unsigned long long iterationcount, int nproc)
	{void *datap; 
	long allocation; 
	
	/* Set the MPI so that MPI errors are recoverable */
	MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN); 
	
	datap=malloc(allocation=(sizeof(ProcActivityDataStruct))*nproc
		+sizeof(char)*NumPartitions+4); 
	if (datap) {
/* pointer to the process activity array that keeps track of the progress by each process */
		ProcActivityDataPtr procp; 
/* pointer to the partition array that keeps track of the progress on each partition */ 
		unsigned char *partitionp; 
		long nextpartitionindex=0; 
		int staying=1, ierr=0; 
		double nextupdate=0, finaltimeout=0; 
		
		/* clear memory */
		memset(datap, 0, allocation); 
		/* set up data structures */
		procp=datap; 
		partitionp=(unsigned char *)&procp[nproc]; 
		
/* here's where the work is assigned, retrieved, and organized:
	the inside of this while loop could also be called repeatedly by, say, an event loop */
		while (staying) 
			{long index; 
			int flag, procactivity=0, denomchanged=0, allslavenodesidle=1; 
			MPI_Status status; 
			
			/* loop over all processors */
			for(index=0; index<nproc; index++) {
				ProcActivityDataPtr np=&procp[index]; 
				
/* if nextpartitionindex isn't pointing to an unassigned partition */
				if ((nextpartitionindex<0)
					||(partitionp[nextpartitionindex]>=2)) {
/* search through the partition array for the next piece of unassigned work */
					for(nextpartitionindex=0; 
						(nextpartitionindex<NumPartitions)
						&&partitionp[nextpartitionindex]; 
						nextpartitionindex++) ; 
					
					/* if all partitions have been assigned */
					if (nextpartitionindex>=NumPartitions) {
/* look for anything else that hasn't completed */ 
						for(nextpartitionindex=0; 
							(nextpartitionindex<NumPartitions)
							&&(partitionp[nextpartitionindex]>=2); 
							nextpartitionindex++) ; 
						}
					
					}
				
				if (index>0) 
				switch (np->state) {/* the state of this process */
					case 0: /* idle */
						
/* if new work is available */ 
if ((nextpartitionindex>=0)
	&&(nextpartitionindex<NumPartitions)) {
	/* fill the work assignment structure */
	np->pds.iterationcount=
		iterationcount/NumPartitions; 
	np->pds.bottom=0; 
	np->pds.top=1; 
	np->pds.partitionindex=nextpartitionindex; 
	np->pds.totalpartitions=NumPartitions; 
/* keep track of which partition we're assigning this process */
	np->partitionindex=nextpartitionindex; 
	
	ierr=MPI_Isend(&np->pds, sizeof(np->pds), 
		MPI_BYTE, index, 
		0, MPI_COMM_WORLD, &np->sendreq); 
	
	if (!ierr) {
/* mark this entry in the partition array as assigned */
		partitionp[np->partitionindex]=1; 
/* this partition is checked out; don't let it be assigned twice */
		nextpartitionindex=-1; 
		}
	
	procactivity=1; 
	
	np->state++; 
	}

						break; 
					case 1: /* waiting for the send to complete */
ierr=MPI_Test(&np->sendreq, &flag, &status); 
if (!ierr) 
if (flag) {
/* MPI_Isend completed; post a receive */
	ierr=MPI_Irecv(&np->fds, 
		sizeof(np->fds), MPI_BYTE, 
		index, 1, MPI_COMM_WORLD, 
		&np->recvreq); 
	
/* Note: if this was a variable amount of data, one would probably add a few more process 
activity states here so that we would post a fixed-sized receive on data saying how big 
the data chunk is, followed by another receive on the actual data chunk using that size data.
Memory allocation can be fixed or performed on the fly. */
	procactivity=1; 
	
	np->state++; 
	}

						break; 
					case 2: /* waiting for receive to complete 
(we could try to be more clever and perform the next Isend while waiting for the data, 
but that would be getting fancy (and less readable)) */
ierr=MPI_Test(&np->recvreq, &flag, &status); 
if (!ierr) 
if (flag) {
	/* MPI_Irecv completed; collect the data */
	
	printf("Received data for partition %d from proc #%d: %llu / %llu \n", 
		np->partitionindex, index, np->fds.numerator, np->fds.denominator); 
	
/* if this partition's work didn't already get finished */
if (partitionp[np->partitionindex]<2) {
/* incorporate this process' results in the larger result */
	numeratorOut[0]+=np->fds.numerator; 
	denominatorOut[0]+=np->fds.denominator; 
/* check it off as finished */
	partitionp[np->partitionindex]=2; 
	
	denomchanged=1; 
	}

	procactivity=1; 
	
	np->state++; 
	}
						break; 
					case 3: /* work complete; return to idle */
						procactivity=1; 
						np->state=0; 
						break; 
					
					default: /* in case something goes wrong */
						np->state=0; 
						break; 
					}
				
				if (np->state) allslavenodesidle=0; 
				
				}
			/* end loop over all processors */ 
			
			/* if no other interesting activity occurred */
			if (!procactivity) {/* might as well find myself some work */
				unsigned long long numerator=0; 
				unsigned long long denominator=0; 
				
				if ((nextpartitionindex>=0)
					&&(nextpartitionindex<NumPartitions)) {
					/* if this work has not been performed already */
					if (partitionp[nextpartitionindex]<2) {
						
						/* do the work */
						partitionp[nextpartitionindex]++; 
						performiterations(&numerator, &denominator, 
							iterationcount/NumPartitions, 0, 1); 
printf("Proc zero completing iterations for partition %d \n", nextpartitionindex); 
						
						partitionp[nextpartitionindex]++; 
						nextpartitionindex=-1; /* checked out */
						
						/* incorporate results */
						numeratorOut[0]+=numerator; 
						denominatorOut[0]+=denominator; 
						}
					}
				else if (nextpartitionindex>=NumPartitions) {
					/* time to think about finishing up */
					if (allslavenodesidle
						||(finaltimeout?MPI_Wtime()>finaltimeout
							:(finaltimeout=MPI_Wtime()+10,0))) 
						/* all done */
						staying=0; 
					}
				
				}
			
			/* if the final result has changed */
			if (denomchanged) if (MPI_Wtime()>nextupdate) {
				/* Report interim results for the user */ 
				printf("%llu iterations remaining.  Progress so far:\n", 
					iterationcount-denominatorOut[0]); 
				fprintresult(stdout, numeratorOut[0], denominatorOut[0]); 
				nextupdate=MPI_Wtime()+10; /* but not too often */
				}
			
			}
		/* end while(staying) loop */
		
		{long index; 
/* if we know we won't be using those processors any more, then tell everyone to quit */ 
		for(index=1; index<nproc; index++) {
			ProcActivityDataPtr np=&procp[index]; 
			
			printf("asking proc #%d to quit...\n", index); 
			
			np->pds.iterationcount=0; /* this is the quit signal */
			ierr=MPI_Send(&np->pds, sizeof(np->pds), MPI_BYTE, index, 
				0, MPI_COMM_WORLD); 
/* reusing the same structure because that's the data size they expect */
			
			}
		}
		
		/* filling in last little bit */
		{unsigned long long numerator=0; 
		unsigned long long denominator=0; 
		unsigned long long remainder=
			iterationcount-NumPartitions*(iterationcount/NumPartitions); 
		if (remainder>0) {
			performiterations(&numerator, &denominator, remainder, 0, 1); 
		
			numeratorOut[0]+=numerator;	
			denominatorOut[0]+=denominator; 
			}
		}
		
		free(datap); 
		}
	
	}

char *createsavefilename(int idproc, int nproc); 
char *createsavefilename(int idproc, int nproc)
	{char *savefilename=malloc(256);
	if (savefilename) {
		char formatstring[256]; 
		sprintf(formatstring, "output%%.%dd-%%.%dd", 
			(int)(log(nproc)/log(10)+1), (int)(log(nproc)/log(10)+1)); 
		sprintf(savefilename, formatstring, idproc, nproc); 
		}
	return savefilename; 
	}
		

/* determines basic cluster attributes and calls the appropriate parallel computing routines */
char *performcalculation(unsigned long long *numeratorOut, unsigned long long *denominatorOut);
char *performcalculation(unsigned long long *numeratorOut, unsigned long long *denominatorOut)
	{char *savefilename=NULL; 
	unsigned long long numerator=0; 
	unsigned long long denominator=0; 
	int ierr=0; 
	int nproc=1, idproc=0;
	ierr=ppinit(&idproc, &nproc); 
	if (!ierr) {
		
		if (idproc) 
			performslaveactivity(&numerator, &denominator); 
		else 
			performmasteractivity(&numerator, &denominator, MaxCount, nproc); 
		
		savefilename=createsavefilename(idproc, nproc); 
		
		ppexit(); 
		}
	else {
		printf("MPI_Init has returned error %d.  Assuming single-processor... \n", 
			ierr); 
		performiterations(&numerator, &denominator, MaxCount, 0, 1); 
		}
	
	numeratorOut[0]+=numerator; 
	denominatorOut[0]+=denominator; 
	return savefilename; 
	}

#pragma mark - the bulk of the parallel code is above

/* fprints the fraction of numerator over denominator */
void fprintresult(FILE *stream, unsigned long long numerator, unsigned long long denominator)
	{
	double picalculation=numerator/(double)denominator; 
	double errorestimate=sqrt((double)numerator)/(double)denominator; 
	char dashS[]="--------------------"; 
	dashS[(long)(log((double)numerator+1)/log(10))]=0; 
	
	fprintf(stream, "                     %llu\n", numerator); 
	fprintf(stream, "pi is approximately -%s-- = %.9g +/- %.5g\n", dashS, 
		picalculation, errorestimate); 
	fprintf(stream, "                     %llu\n", denominator); 
	}

/* save the fraction into a file */
void saveresult(char *filename, unsigned long long numerator, unsigned long long denominator); 
void saveresult(char *filename, unsigned long long numerator, unsigned long long denominator)
	{
	if (filename) {
		FILE *fp=fopen(filename, "a+"); 
		if (fp) {
			fprintresult(fp, numerator, denominator); 
			fclose(fp); 
			}
		}
	}

#include <time.h>

int main(void)
	{
	unsigned long long numerator=0; 
	unsigned long long denominator=0; 
	char *savefilename=NULL; 
	{
	long throwoutcount=time(NULL)&0x3ffL; 
	printf("Initializing random number generator by throwing out %d random numbers...\n", 
		throwoutcount);
	while (throwoutcount--) myrandom(); 
	}
	printf("Beginning calculation... (first random is %g)\n", myrandom()); 
	
	savefilename=performcalculation(&numerator, &denominator); 
	
	printf("Calculations complete.\n"); 
	
	fprintresult(stdout, numerator, denominator); 
	if (savefilename) {
		saveresult(savefilename, numerator, denominator); 
		free(savefilename); 
		}
	
	return 0;
	}
	
#ifdef __MWERKS__
/* only for Metrowerks CodeWarrior */
#include <SIOUX.h>
#endif 

int ppinit(int argc, char *argv[], int *idproc, int *nproc)
	{
/* this subroutine initializes parallel processing
   idproc = processor id
   nproc = number of real or virtual processors obtained */
   int ierr;
   *nproc=0; 

/* initialize the MPI execution environment */

   ierr = MPI_Init(&argc, &argv);
   if (!ierr) {

/* determine the rank of the calling process in the communicator */
		ierr = MPI_Comm_rank(MPI_COMM_WORLD, idproc);

/* determine the size of the group associated with the communicator */
		ierr = MPI_Comm_size(MPI_COMM_WORLD, nproc);

#ifdef __MWERKS__
/* only for Metrowerks CodeWarrior */
		SIOUXSettings.asktosaveonclose=0; 
		SIOUXSettings.autocloseonquit=1;
#endif 
		
		}
	
	return ierr;
	}

void ppexit(void)
	{
/* this subroutine terminates parallel processing */
	int ierr;

/* waits until everybody gets here */
	ierr = MPI_Barrier(MPI_COMM_WORLD); 
	
/* terminate MPI execution environment */
	ierr = MPI_Finalize();
	
	}

The changes from pascalstriangle to parallelpascalstriangle:

  • mpi.h - the header file for the MPI library, required to access information about the parallel system and perform communication

  • idproc, nproc - nproc describes how many processors are currently running this job and idproc identifies the designation, labeled from 0 to nproc - 1, of this processor. This information is sufficient to identify exactly which part of the problem this instance of the executable should work on. If idproc is zero then performmasteractivity is called, otherwise performslaveactivity executes.

  • NumPartitions - the code will cut up the overall problem into NumPartitions pieces. The partition size should be big enough so that a reasonable amount of work is performed on each processor, but not so big as to let processors run without periodic communication. For a problem like this, pieces that take at least a few seconds is enough. This #define is written so that the number of partitions scale with the overall problem size while fitting within a reasonable range.

  • PartitionDataStruct - This data structure defines the format of the message the master sends to the slaves to specify what part of the problem to work on. iterationcount holds how many counts the slave should make. For this example, that is as much as the slave needs to know. We need some way of letting the slave know that the calculation is finished, so a 0 in iterationcount is a sufficient quit signal, but one could add and use an explicit message field instead. bottom and top indicate what part of the problem space the slave should sample in, and partitionindex and totalpartitions specify which partition this slave has been assigned. This other data could be useful for other problems, such as numerical integration.

  • FractionDataStruct - This data structure defines the format of the reply the slave sends back to the master containing its results. The master aggregates this data as the calculation is being performed.

  • performslaveactivity - performslaveactivity waits for a PartitionDataStruct message from the master by calling MPI_Recv. Assuming iterationcount is not zero, it calls performiterations with the specified number of interations, then returns the numerator and denominator results via a FractionDataStruct reply via MPI_Send.

  • ProcActivityDataStruct - This data structure facilitates the bookkeeping the master performs in order to monitor and direct the progress of the slave processors. It keeps track of the state of the slave, which part of the work has been assigned, the MPI_Request variables for the asynchronous communication with the slaves, and the data structures for the messages sent to and received from the slaves.

  • performmasteractivity - performs the management of assignments for and data from the slaves, as well as working on the problem when there is nothing else to do. Other procs can finish out of order or at faster or slower speeds, and this routine will sort out the work and assign more work to others as they finish their previous partition. It will also reassign work that a node failed to complete.

    The processes and partitions operate in a "state space", inspired by Quantum Mechanics. Each entity evolves, at the appropriate moment, from one state to the next. They are therefore allowed to evolve and operate without being in lock-step with one another. In other words, their individual evolution can differ, or even change speed, at any time. This evolution through a series of clearly defined "states" makes possible complex, yet asynchronous, behavior.

    Each partition has three states:

    • 0: unassigned, no work done
    • 1: the work in this partition has been assigned to a process
    • 2: the work in this partition is complete

    This state data is stored in the partition array.

    Each process has three important states:

    • 0: Idle, no work assigned
    • 1: work has been assigned to this process, and the send of the assignment (via MPI_Isend) has yet to be cleared
    • 2: work has been assigned to this process, and we are waiting on its results

    This state data is stored in the state field of the process activity array.

    Fancier things that could also be done:

    • 1. After confirming that a request was sent, queue another request even before the last one returned a reply.
    • 2. Time and evaluate the performance of each process, so that one could grow the assignment size for better performing processors.
    • 3. Enhanced fault tolerance in case another process disappears, then reappears, with timeouts.

    Many of these features are implemented in the Fresnel Diffraction Explorer.

  • createsavefilename - creates the name of the file as a function of idproc and nproc, so that we don't have name collisions.

  • performcalculation - called by main, this routine calls ppinit to determine attributes about the cluster and call the slave or master routines. After those routines return, it returns the final aggregate results and determines the name of the output file.

  • MPI_Recv, MPI_Send, MPI_Irecv, MPI_Isend, and MPI_Test - These elemental MPI calls perform the communication linking processors. MPI_Recv and MPI_Irecv receives a message sent using MPI_Send and MPI_Isend. In MPI_Irecv and MPI_Isend, control is returned to the caller immediately (hence the "I"), allowing the caller to do other work while the message is being handled. This behavior is called asynchronous message passing, and these messages are completed with an MPI_Test returning true or an MPI_Wait.

    Since performslaveactivity waits on every message it completes with the master MPI_Recv and MPI_Send are sufficient for all communication it has with the master.

    By contrast, performmasteractivity performs much more complex message passing than the slaves because it needs to regulate work and data with the slaves as otherwise indepedent entities. MPI_Irecv and MPI_Isend are perfect for this purpose because they make it possible to initiate communication with one process while moving on to handle another. MPI_Test indicates whether or not the master can take the next step with a particular process. The code in the switch (np->state) statement shows the step by step evolution of the state of a particular process, each regulated by an MPI call.

  • MPI_Errhandler_set - called by performmasteractivity, this routine tells the MPI not to abort in case an MPI call returns an error. By default, all MPI errors are fatal. MPI_Errhandler_set tells the MPI on the master node that we know how to recover when a processor is lost or other problems occur. It is not necessary for the slaves to call this because we want them to exit if they can't communicate with the master.

  • MPI_COMM_WORLD - MPI defines communicator worlds or communicators that define a set of processors that can communicate with each other. At initialization, one communicator, MPI_COMM_WORLD, covers all the processors in the system. Other MPI calls can define arbitrary subsets of MPI_COMM_WORLD, making it possible to confine a code to a particular processor subset just by passing it the appropriate communicator. In simpler cases such as this, using MPI_COMM_WORLD is sufficient.

  • ppinit - performs initialization of the parallel computing environment. Part of the boilerplate for MPI codes, this routine calls:
    • MPI_Init - performs the actual initialization of MPI. It returns the error code; zero means no error.
    • MPI_Comm_size - accesses the processor count of the parallel system
    • MPI_Comm_rank - accesses the identification number of this particular processor

  • ppexit - calling MPI_Finalize terminates the parallel computing environment. Also part of the boilerplate of MPI codes, a call to MPI_Finalize is needed to properly cleanup and close the connections between codes running on other processors and release control. The call to MPI_Barrier is optional, included here so that when all processors end up waiting for the user at processor 0 to press the return key.

  • __MWERKS__ - these #ifdef blocks merely compensate for a pecularity about the Metrowerks CodeWarrior compiler. These blocks allows the executable on other processors terminate automatically. If you are using another compiler, you don't have to worry about this code.

Note that the routines that perform the actual iterative calculations, throwdart and performiterations, need not be altered at all. That is possible because they were supplied just enough data fulfill each processor's responsibility, the number of dart throws. performmasteractivity delegated work to the processors and regulated all communication. performcalculation's changes merely perform the boilerplate work of setting up and tearing down the parallel environment, plus the name of the output file. main was nearly unchanged.

When this code, MaxCount constant set to 0x1000000, is run in parallel on three processors, the code produces this output:

Processor 2 output
Initializing random number generator by throwing out 398 random numbers...
Beginning calculation... (first random is 0.770046)
performing 1048576 iterations in [0, 1)...
0...
performing 1048576 iterations in [0, 1)...
0...
performing 1048576 iterations in [0, 1)...
0...
performing 1048576 iterations in [0, 1)...
0...
performing 1048576 iterations in [0, 1)...
0...
Calculations complete.
                     16470391
pi is approximately ---------- = 3.14147778 +/- 0.00077407
                     5242880
Processor 1 output
Initializing random number generator by throwing out 763 random numbers...
Beginning calculation... (first random is 0.985489)
performing 1048576 iterations in [0, 1)...
0...
performing 1048576 iterations in [0, 1)...
0...
performing 1048576 iterations in [0, 1)...
0...
performing 1048576 iterations in [0, 1)...
0...
performing 1048576 iterations in [0, 1)...
0...
Calculations complete.
                     16470377
pi is approximately ---------- = 3.14147511 +/- 0.00077407
                     5242880
Processor 0 output
Initializing random number generator by throwing out 399 random numbers...
Beginning calculation... (first random is 0.955361)
0...
Proc zero completing iterations for partition 2 
Received data for partition 0 from proc #1: 3295703 / 1048576 
Received data for partition 1 from proc #2: 3295703 / 1048576 
13631488 iterations remaining.  Progress so far:
                     9887109
pi is approximately --------- = 3.14302731 +/- 0.00099957
                     3145728
0...
Proc zero completing iterations for partition 5 
Received data for partition 3 from proc #1: 3292641 / 1048576 
Received data for partition 4 from proc #2: 3292641 / 1048576 
0...
Proc zero completing iterations for partition 8 
Received data for partition 6 from proc #1: 3293709 / 1048576 
Received data for partition 7 from proc #2: 3293709 / 1048576 
0...
Proc zero completing iterations for partition 11 
Received data for partition 9 from proc #1: 3293190 / 1048576 
Received data for partition 10 from proc #2: 3293190 / 1048576 
4194304 iterations remaining.  Progress so far:
                     39525729
pi is approximately ---------- = 3.14122272 +/- 0.00049964
                     12582912
0...
Proc zero completing iterations for partition 14 
Received data for partition 12 from proc #1: 3295128 / 1048576 
Received data for partition 13 from proc #2: 3295128 / 1048576 
0...
Proc zero completing iterations for partition 15 
asking proc #1 to quit...
asking proc #2 to quit...
Calculations complete.
                     52706176
pi is approximately ---------- = 3.14153288 +/- 0.00043272
                     16777216

The output for the slave processors, #1 and #2, are quite simple. They show that they perform their calculations, 1048576 at a time, and calculate their results. Processor #0, however, performs a more complex task of assigning work and collecting results, even out of order. As the calculation proceeds, processor #0 reports more and more accurate results for pi.

When the calculation ends, each processor reports its estimate of pi based on the results it has. The slave nodes only have the data they calculated, so their estimate uses fewer iterations than what was performed by the cluster as a whole. Consequently the results of any one processor alone gives a lesser precision, roughly 0.000774. Processor 0, however, has the results of entire calculation, so it reports that pi is approximately 3.14153288 with an error estimate of 0.00043272.

So we were able to perform the same sized calculation in a shorter time, and achieved a similar result to the single-processor case. Conversely, we may increase MaxCount, increasing the problem size, to achieve a more accurate result. Doing so with MaxCount set to 0x10000000 (268435456) gives:

                     843316970
pi is approximately ----------- = 3.14160053 +/- 0.00010818
                     268435456

As expected, 16 times as many samples gives a result 4 times as accurate.

The purpose of this discussion was to give an example of a code that automatically load balanced a problem that could be divided into mostly independent work. We chose a simple problem of this category so attention could be drawn to the parallelization. A high enough iteration count can eventually exceed the precision of the floating-point representation, so higher-precision iteration routines would have to be substituted. Components of your code may correspond to the routines in circlepi, so this parallelization example could suggest how to parallelize your code. Alternatively, you may wish to substitute the performiterations routine with a call to your existing code if you know your work can be divided into independent pieces.

This example can be a starting point to experiment with parallel computing. For example, one could try modifying the performmasteractivity to reacquire nodes that recover, or reorganize the message passing to queue a work assignment to a processor while it is already working on a previous assignment. Some of this code could be incorporated into an existing code, say one with an event loop, so that remote processor regulation can be ongoing with a user iterface. Many of these features are combined in the Fresnel Diffraction Explorer. These fancier features are not shown here because they would distract from the purpose of this discussion, but the reader is welcome to explore the possibilities.

References

Using MPI by William Gropp, Ewing Lusk and Anthony Skjellum is a good reference for the MPI library. Many of the techniques and style of the above parallel code were adopted from Dr. Viktor K. Decyk. He expresses these ideas in a few references, including:

  • V. K. Decyk, "How to Write (Nearly) Portable Fortran Programs for Parallel Computers", Computers In Physics, 7, p. 418 (1993).
  • V. K. Decyk, "Skeleton PIC Codes for Parallel Computers", Computer Physics Communications, 87, p. 87 (1995).

You're welcome to read more about parallel computing via the Tutorials page.



© Copyright 2001-2011 Dauger Research, Inc. All rights reserved. Dauger Research, Inc., PO Box 3074, Huntington Beach, CA 92605 USA