/* parallelcirclepi.c */ /* © Copyright 2005 Dauger Research, Inc. Our lawyers made us say this: DISCLAIMER: We provide the following on an "AS IS" basis. Use it at your own risk. */ #include #include #include /* Produces random double-precision numbers in [0,1) */ inline double myrandom(); inline double myrandom() {/* You're welcome to substitute a better random number generator */ return ((double)rand()+((double)rand())/RAND_MAX)/RAND_MAX; } /* throwdart, the heart of the calculation, throws virtual darts at a unit circle inscribed in a square and returns whether or not the dart hit inside the circle. Comparing the hits with the number of tries reveals the value of pi. */ inline unsigned long throwdart(double bottom, double top); inline unsigned long throwdart(double bottom, double top) { double y=myrandom(); double x=myrandom(); y=bottom+y*(top-bottom); return x*x+y*y<1?1:0; } /* performs iterations of throwdart. We want the value of pi, and it so happens the area of a unit circle is pi. The ratio of hits within a unit circle to the hits within its bounding square asymptotically approaches pi/4, so we need some way of multiplying by four. We could have divided the denominator (the count inside the square) by 4, but instead we are throwing four times as many darts for every count inside the square. You can think of this as dividing the area of a unit circle by the area of a square one unit on a side. Either way, it works. */ void performiterations(unsigned long long *numeratorOut, unsigned long long *denominatorOut, unsigned long long iterationcount, double bottom, double top); void performiterations(unsigned long long *numeratorOut, unsigned long long *denominatorOut, unsigned long long iterationcount, double bottom, double top) { unsigned long long numerator=0; unsigned long long denominator=0; unsigned long long index; for(index=0; index=0x10000000000000LL?0x10000000L:MaxCount>0x1000000LL?MaxCount>>20L:16LL) /*100LL*/ /* The data structure sent to the slave nodes to specify what part of the problem to work on */ typedef struct PartitionDataStruct { unsigned long long iterationcount; /* 0 means quit */ double bottom, top; /* specifies the edges of the sample range */ long partitionindex, totalpartitions; } PartitionDataStruct, *PartitionDataPtr, **PartitionDataHandle; /* The data structure sent back to the master containing the slave's output */ typedef struct FractionDataStruct { unsigned long long numerator; unsigned long long denominator; } FractionDataStruct, *FractionDataPtr, **FractionDataHandle; /* These data structures are defined above so that both the master and slave knows their formats */ /* Accepts instructions from proc 0 to work on the problem, then replies with its answer*/ void performslaveactivity(unsigned long long *numeratorOut, unsigned long long *denominatorOut); void performslaveactivity(unsigned long long *numeratorOut, unsigned long long *denominatorOut) { int staying=1; while (staying) {int ierr; PartitionDataStruct incoming={0,0,0}; MPI_Status status; ierr=MPI_Recv(&incoming, sizeof(incoming), MPI_BYTE, 0, 0, MPI_COMM_WORLD, &status); if (ierr||!incoming.iterationcount) staying=0; else { FractionDataStruct reply={0,0}; printf("performing %llu iterations in [%g, %g)...\n", incoming.iterationcount, incoming.bottom, incoming.top); /* In this example, only the iterationcount, bottom, and top fields are used, but other fields might be appropriate for other problems */ performiterations(&reply.numerator, &reply.denominator, incoming.iterationcount, incoming.bottom, incoming.top); ierr=MPI_Send(&reply, sizeof(reply), MPI_BYTE, 0, 1, MPI_COMM_WORLD); numeratorOut[0]+=reply.numerator; denominatorOut[0]+=reply.denominator; } } } #include /* The ProcActivityDataStruct is what the master uses to hold the incoming (FractionDataStruct) and outgoing (PartitionDataStruct) data structures and related data (MPI_Request) for efficient use of MPI */ typedef struct ProcActivityDataStruct { long state; /* keeps track of the state of this processing element */ long partitionindex; /* keeps track of which partition was assigned */ MPI_Request sendreq, recvreq; /* sent to MPI_Test; returned by MPI_Isend and MPI_Irecv */ FractionDataStruct fds; /* result from slave */ PartitionDataStruct pds; /* assignment specification for slave */ } ProcActivityDataStruct, *ProcActivityDataPtr, **ProcActivityDataHandle; void fprintresult(FILE *stream, unsigned long long numerator, unsigned long long denominator); /* performs the management of assignments for and data from the slaves, as well as working on the problem when there is nothing else to do. Other procs can finish out of order or at faster or slower speeds, and this routine will sort out the work and assign more work to others as they finish their previous partition. It will also reassign work that a node failed to complete. Fancier things that could also be done: 1. After confirming that a request was sent, queue another request even before the last one returned a reply. 2. Time and evaluate the performance of each process, so that one could grow the assignment size for better performing processors. 3. Enhanced fault tolerance in case another process disappears, then reappears, with timeouts. */ void performmasteractivity(unsigned long long *numeratorOut, unsigned long long *denominatorOut, unsigned long long iterationcount, int nproc); void performmasteractivity(unsigned long long *numeratorOut, unsigned long long *denominatorOut, unsigned long long iterationcount, int nproc) {void *datap; long allocation; /* Set the MPI so that MPI errors are recoverable */ MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN); datap=malloc(allocation=(sizeof(ProcActivityDataStruct))*nproc+sizeof(char)*NumPartitions+4); if (datap) { /* pointer to the process activity array that keeps track of the progress by each process */ ProcActivityDataPtr procp; /* pointer to the partition array that keeps track of the progress on each partition */ unsigned char *partitionp; long nextpartitionindex=0; int staying=1, ierr=0; double nextupdate=0, finaltimeout=0; /* clear memory */ memset(datap, 0, allocation); /* set up data structures */ procp=datap; partitionp=(unsigned char *)&procp[nproc]; /* here's where the work is assigned, retrieved, and organized: the inside of this while loop could also be called repeatedly by, say, an event loop */ while (staying) {long index; int flag, procactivity=0, denomchanged=0, allslavenodesidle=1; MPI_Status status; /* loop over all processors */ for(index=0; index=2)) { /* search through the partition array for the next piece of unassigned work */ for(nextpartitionindex=0; (nextpartitionindex=NumPartitions) { /* look for anything else that hasn't completed */ for(nextpartitionindex=0; (nextpartitionindex=2); nextpartitionindex++) ; } } /* The processes and partitions operate in a "state space", inspired by Quantum Mechanics. Each entity evolves, at the appropriate moment, from one state to the next. They are therefore allowed to evolve and operate without being in lock-step with one another. In other words, their individual evolution can differ, or even change speed, at any time. This evolution through a series of clearly defined "states" makes possible complex, yet asynchronous, behavior. Each partition has three states: 0: unassigned, no work done 1: the work in this partition has been assigned to a process 2: the work in this partition is complete This state data is stored in the partition array. Each process has three important states: 0: Idle, no work assigned 1: work has been assigned to this process, and the send of the assignment (via MPI_Isend) has yet to be cleared 2: work has been assigned to this process, and we are waiting on its results This state data is stored in the state field of the process activity array. */ if (index>0) switch (np->state) {/* the state of this process */ case 0: /* idle */ /* if new work is available */ if ((nextpartitionindex>=0)&&(nextpartitionindexpds.iterationcount=iterationcount/NumPartitions; np->pds.bottom=0; np->pds.top=1; np->pds.partitionindex=nextpartitionindex; np->pds.totalpartitions=NumPartitions; /* keep track of which partition we're assigning this process */ np->partitionindex=nextpartitionindex; ierr=MPI_Isend(&np->pds, sizeof(np->pds), MPI_BYTE, index, 0, MPI_COMM_WORLD, &np->sendreq); if (!ierr) { /* mark this entry in the partition array as assigned */ partitionp[np->partitionindex]=1; /* this partition is checked out; don't let it be assigned twice */ nextpartitionindex=-1; } procactivity=1; np->state++; } break; case 1: /* waiting for the send to complete */ ierr=MPI_Test(&np->sendreq, &flag, &status); if (!ierr) if (flag) {/* MPI_Isend completed; post a receive */ ierr=MPI_Irecv(&np->fds, sizeof(np->fds), MPI_BYTE, index, 1, MPI_COMM_WORLD, &np->recvreq); /* Note: if this was a variable amount of data, one would probably add a few more process activity states here so that we would post a fixed-sized receive on data saying how big the data chunk is, followed by another receive on the actual data chunk using that size data. Memory allocation can be fixed or performed on the fly. */ procactivity=1; np->state++; } break; case 2: /* waiting for receive to complete (we could try to be more clever and perform the next Isend while waiting for the data, but that would be getting fancy (and less readable)) */ ierr=MPI_Test(&np->recvreq, &flag, &status); if (!ierr) if (flag) {/* MPI_Irecv completed; collect the data */ printf("Received data for partition %d from proc #%d: %llu / %llu \n", np->partitionindex, index, np->fds.numerator, np->fds.denominator); /* if this partition's work didn't already get finished */ if (partitionp[np->partitionindex]<2) { /* incorporate this process' results in the larger result */ numeratorOut[0]+=np->fds.numerator; denominatorOut[0]+=np->fds.denominator; /* check it off as finished */ partitionp[np->partitionindex]=2; denomchanged=1; } procactivity=1; np->state++; } break; case 3: /* work complete; return to idle */ procactivity=1; np->state=0; break; default: /* in case something goes wrong */ np->state=0; break; } if (np->state) allslavenodesidle=0; } /* end loop over all processors */ /* if no other interesting activity occurred */ if (!procactivity) {/* might as well find myself some work */ unsigned long long numerator=0; unsigned long long denominator=0; if ((nextpartitionindex>=0)&&(nextpartitionindex=NumPartitions) { /* time to think about finishing up */ if (allslavenodesidle||(finaltimeout?MPI_Wtime()>finaltimeout:(finaltimeout=MPI_Wtime()+10,0))) /* all done */ staying=0; } } /* if the final result has changed */ if (denomchanged) if (MPI_Wtime()>nextupdate) { /* Report interim results for the user */ printf("%llu iterations remaining. Progress so far:\n", iterationcount-denominatorOut[0]); fprintresult(stdout, numeratorOut[0], denominatorOut[0]); nextupdate=MPI_Wtime()+10; /* but not too often */ } } /* end while(staying) loop */ {long index; /* if we know we won't be using those processors any more, then tell everyone to quit */ for(index=1; indexpds.iterationcount=0; /* this is the quit signal */ ierr=MPI_Send(&np->pds, sizeof(np->pds), MPI_BYTE, index, 0, MPI_COMM_WORLD); /* reusing the same structure because that's the data size they expect */ } } /* filling in last little bit */ {unsigned long long numerator=0; unsigned long long denominator=0; unsigned long long remainder=iterationcount-NumPartitions*(iterationcount/NumPartitions); if (remainder>0) { performiterations(&numerator, &denominator, remainder, 0, 1); numeratorOut[0]+=numerator; denominatorOut[0]+=denominator; } } free(datap); } } /* creates the name of the file as a function of idproc and nproc, so that we don't have name collisions */ char *createsavefilename(int idproc, int nproc); char *createsavefilename(int idproc, int nproc) {char *savefilename=malloc(256); if (savefilename) { char formatstring[256]; sprintf(formatstring, "output%%.%dd-%%.%dd", (int)(log(nproc)/log(10)+1), (int)(log(nproc)/log(10)+1)); sprintf(savefilename, formatstring, idproc, nproc); } return savefilename; } /* determines basic cluster attributes and calls the appropriate parallel computing routines */ char *performcalculation(unsigned long long *numeratorOut, unsigned long long *denominatorOut); char *performcalculation(unsigned long long *numeratorOut, unsigned long long *denominatorOut) {char *savefilename=NULL; unsigned long long numerator=0; unsigned long long denominator=0; int ierr=0; int nproc=1, idproc=0; ierr=ppinit(&idproc, &nproc); if (!ierr) { if (idproc) performslaveactivity(&numerator, &denominator); else performmasteractivity(&numerator, &denominator, MaxCount, nproc); savefilename=createsavefilename(idproc, nproc); ppexit(); } else { printf("MPI_Init has returned error %d. Assuming single-processor... \n", ierr); performiterations(&numerator, &denominator, MaxCount, 0, 1); } numeratorOut[0]+=numerator; denominatorOut[0]+=denominator; return savefilename; } #pragma mark - the bulk of the parallel code is above /* fprints the fraction of numerator over denominator */ void fprintresult(FILE *stream, unsigned long long numerator, unsigned long long denominator) { double picalculation=numerator/(double)denominator; double errorestimate=sqrt((double)numerator)/(double)denominator; char dashS[]="--------------------"; dashS[(long)(log((double)numerator+1)/log(10))]=0; fprintf(stream, " %llu\n", numerator); fprintf(stream, "pi is approximately -%s-- = %.9g +/- %.5g\n", dashS, picalculation, errorestimate); fprintf(stream, " %llu\n", denominator); } /* save the fraction into a file */ void saveresult(char *filename, unsigned long long numerator, unsigned long long denominator); void saveresult(char *filename, unsigned long long numerator, unsigned long long denominator) { if (filename) { FILE *fp=fopen(filename, "a+"); if (fp) { fprintresult(fp, numerator, denominator); fclose(fp); } } } #include int main(void) { unsigned long long numerator=0; unsigned long long denominator=0; char *savefilename=NULL; { long throwoutcount=time(NULL)&0x3ffL; printf("Initializing random number generator by throwing out %d random numbers...\n", throwoutcount); while (throwoutcount--) myrandom(); } printf("Beginning calculation... (first random is %g)\n", myrandom()); savefilename=performcalculation(&numerator, &denominator); printf("Calculations complete.\n"); fprintresult(stdout, numerator, denominator); if (savefilename) { saveresult(savefilename, numerator, denominator); free(savefilename); } return 0; } #ifdef __MWERKS__ /* only for Metrowerks CodeWarrior */ #include #endif int ppinit(int *idproc, int *nproc) { /* this subroutine initializes parallel processing idproc = processor id nproc = number of real or virtual processors obtained */ int ierr; *nproc=0; /* initialize the MPI execution environment */ ierr = MPI_Init(NULL, NULL); if (!ierr) { /* determine the rank of the calling process in the communicator */ ierr = MPI_Comm_rank(MPI_COMM_WORLD, idproc); /* determine the size of the group associated with a communicator */ ierr = MPI_Comm_size(MPI_COMM_WORLD, nproc); } #ifdef __MWERKS__ /* only for Metrowerks CodeWarrior */ SIOUXSettings.asktosaveonclose=0; SIOUXSettings.autocloseonquit=1; #endif return ierr; } void ppexit(void) { /* this subroutine terminates parallel processing */ int ierr; /* waits until everybody gets here */ ierr = MPI_Barrier(MPI_COMM_WORLD); /* terminate MPI execution environment */ ierr = MPI_Finalize(); }