/* * Program: server_client_r.c * Programmer: Alice K. Yuen (akyuen@unm.edu) * Purpose: To demonstrate the parallel "Server-Client" model in the * multiplication of a matrix (A) and a vector (x). * * Both A and x are read by the "server" (processor 0) from a * file that may be specified on the command-line. Let N be * the number of "client" processors. Then the server sends * a row of A to each of the N clients. Upon the receipt of * "results" from any client, the server sends an unprocessed * row to the same client. If there are no remaining rows to * be processed, the server sends a message with the tag=0. * * In this particular implementation, the server does not * perform any calculations but places the results from the * clients into the appropriate location. That is, only result * placement, "bookkeeping", and input/output duties are performed * by the server. For a different implementation, see also * server_client_c.c. * * WE NOTE THAT MEMORY HAVE NOT BEEN SPARED TO CREATE A * MORE READABLE CODE. * * Notes on vector/matrix file: * * data file (with fixed name "matrix-vector" is expected to be of the form: * * m n * x1 ... xn * a11 a12 ... a1n * ... * am1 am2 ... amn * * where the matrix A is m by n, the vector x is n long * * Notes on implementation: * * The "server" processor parse the arguements from the command line * reads the size of A from the data file, * allocate memory to store both x and A, * reads the data file which contains the vector * x and matrix A, * distributes rows of A, * receives the "processed" columns from clients, * places these results in the appropriate location, * prints the output. * * The "client" processors receives rows of matrix A, * "processes" these rows: * taking the 'dot product' of x with each row, and * sends the result back to teh server. * * Because the implementation works on rows of A. It is stored row-wise * in memory. That is, the matrix A is stored in consecutive (mn) memory * locations where the first n corresponds to the first row of A, the * second n corresponds to the second row of A, etc. The * following is the nested loop used to read and store matrix, A: * * for (i = 0 ; i < (m*n) ; i++) { * fscanf (f_ptr,"%f",&a[i]); * } * * Since the server receives results from all of the client processors, * we use the MPI call: * * MPI_Recv (buffer,m,MPI_DATA,MPI_ANY_SOURCE,MPI_ANY_TAG, * MPI_COMM_WORLD,&status); * * then query the structure 'status' for key information. * * typedef struct { * int count; * int MPI_SOURCE; * int MPI_TAG; * int MPI_ERROR; * } MPI_Status; * * Pseudo code: * server: * * 1. broadcast (vector) x to all client processors * 2. send a row of A to each processor with tag=row * 3. while (i < m OR * expected receives > 0 ) * receive results and send next unprocessed row * 4. print result * * client: * * 1. receive (vector) x * 2. receive a row of A with tag=row number * 3. sum the product of each element of (vector) a with * the respective element of (vector)x to produce * (scalar) result * 4. send result back to server * * RCS: $Revision: 1.2 $ */ /* includes ******************************************************************/ #include #include /* macros ********************************************************************/ #define DATA float #define MPI_DATA MPI_FLOAT /* prototypes ****************************************************************/ int parse_args (int argc, char *argv[], FILE **f_ptr); void usage (char *argv[]); int main (int argc, char *argv[]) { int i,j, m = 0, n = 0, rank, server = 0, num_processors = 0, receives = 0, source, tag, exit_val = 0; DATA buffer = 0, *a = NULL, *x = NULL, *result = NULL; FILE *f_ptr = NULL; MPI_Status status; /* initialize MPI */ MPI_Init (&argc, &argv); MPI_Comm_size (MPI_COMM_WORLD, &num_processors); MPI_Comm_rank (MPI_COMM_WORLD, &rank); /* server ********************************************************************/ if (rank == server) { fprintf (stderr,"%d processors used\n",num_processors); if (parse_args (argc,argv,&f_ptr)) { fscanf (f_ptr,"%d %d",&m,&n); MPI_Bcast (&m,1,MPI_INT,server,MPI_COMM_WORLD); MPI_Bcast (&n,1,MPI_INT,server,MPI_COMM_WORLD); if (m > num_processors) { if ((m && n) && ((a = (DATA *) malloc (m * n * sizeof (DATA)))!=NULL) && ((x = (DATA *) malloc (n * sizeof(DATA)))!=NULL) && ((result = (DATA *) calloc (m, sizeof(DATA)))!=NULL) ) { /* read vector x[n] */ for (j = 0 ; j < n ; j++) { fscanf (f_ptr,"%f",&x[j]); } /* 1. server pseudo-code */ MPI_Bcast (x,n,MPI_DATA,server,MPI_COMM_WORLD); /* read matrix a[m,n] */ for (i = 0 ; i < (m*n) ; i++) { /* note: we're storing the matrix row-wise */ fscanf (f_ptr,"%f",&a[i]); } /* 2. server pseudo-code */ for (i = 0 ; (i < m) && ((i+1) < num_processors) ; i++) { /* send row i to processor i+1 */ /* processors are 1-up, columns are 0-up */ /* send &a[i*n] with tag=i+1 */ MPI_Send (&a[i*n],n,MPI_DATA,i+1,i+1,MPI_COMM_WORLD); receives++; } /* 3. server pseudo-code */ while (receives || (i < m)) { /* receive from MPI_ANY_SOURCE / MPI_ANY_TAG */ /* test TAG - place into column number TAG */ MPI_Recv (&buffer,1,MPI_DATA,MPI_ANY_SOURCE,MPI_ANY_TAG, MPI_COMM_WORLD,&status); receives--; result[status.MPI_TAG-1] = buffer; /* if there is more data, send next row to source of previous result */ if (i < m) { MPI_Send (&a[i*n],n,MPI_DATA,status.MPI_SOURCE,i+1, MPI_COMM_WORLD); receives++; i++; } /* send a tag of zero to indicate end */ else { MPI_Send (a,n,MPI_DATA,status.MPI_SOURCE,0,MPI_COMM_WORLD); } } } else { fprintf (stderr,"server can't allocate memory\n"); /* send message with tag=0 to all processors */ for (i = 0 ; (i+1) < num_processors ; i++) { MPI_Send (a,n,MPI_DATA,i+1,0,MPI_COMM_WORLD); receives++; } } /* 4. server pseudo-code */ for (i = 0 ; i < m ; i++) { fprintf (stdout,"%f\n",result[i]); } } /* if (m > num_processors) */ else { fprintf (stderr,"procesor %d error: too many processors used\n",rank); exit_val = 1; } } else { MPI_Bcast (&m,1,MPI_INT,server,MPI_COMM_WORLD); MPI_Bcast (&n,1,MPI_INT,server,MPI_COMM_WORLD); usage (argv); exit_val = 2; } } /* client ********************************************************************/ else { MPI_Bcast (&m,1,MPI_INT,server,MPI_COMM_WORLD); MPI_Bcast (&n,1,MPI_INT,server,MPI_COMM_WORLD); if (m && n) { if (m > num_processors) { /* note that processors only receive one column of A at a time */ if ((a = (DATA *) malloc (n * sizeof (DATA))) && (x = (DATA *) malloc (n * sizeof(DATA))) ) { /* 1. client pseudo-code */ MPI_Bcast (x,n,MPI_DATA,server,MPI_COMM_WORLD); /* 2. client pseudo-code */ /* initial recv */ MPI_Recv (a,n,MPI_DATA,server,MPI_ANY_TAG,MPI_COMM_WORLD,&status); while (status.MPI_TAG) { /* 3. client pseudo-code */ for (j = 0 , buffer = 0 ;j < n ; j++) { buffer += a[j] * x[j]; } /* send results back to server */ /* receive new row */ /* 4. client pseudo-code */ MPI_Send (&buffer,1,MPI_DATA,server,status.MPI_TAG,MPI_COMM_WORLD); MPI_Recv (a,n,MPI_DATA,server,MPI_ANY_TAG,MPI_COMM_WORLD,&status); } } else { fprintf (stderr,"processor %d cannot allocate memory\n",rank); } } /* if (m > num_processors) */ else { fprintf (stderr,"procesor %d error: too many processors used\n",rank); exit_val = 1; } } else { exit_val = 2; } } MPI_Finalize(); exit (exit_val); } /****************************************************************************** parse_args() ***********************************************************/ int parse_args (int argc, char *argv[], FILE **f_ptr) { int i, retval = 1; if (argc > 1) { for (i = 1 ; i < argc ; i++) { if (strcmp(argv[i],"-f") == 0) { if (*f_ptr = fopen (argv[++i],"r")) { fprintf (stderr,"%s: reading file %s\n",argv[0],argv[i]); } else { fprintf (stderr,"%s error: error opening %s for reading\n", argv[0],argv[i]); retval = 0; } } else { fprintf (stderr,"%s error: %s option unknown\n",argv[0], argv[i]); } } } else { retval = 0; } return (retval); } /****************************************************************************** usage() ****************************************************************/ void usage (char *argv[]) { fprintf (stderr,"\n%s usage: %s -f matrix-file\n\n",argv[0],argv[0]); }