/* * Program: server_client_c.c * Programmer: Alice K. Yuen (akyuen@unm.edu) * Purpose: To demonstrate the parallel "Server-Client" model in the * multiplication of a matrix (A) and a vector (x). * * Both A and x are read by the "server" (processor 0) from a * file that may be specified on the command-line. Let N be * the number of "client" processors. Then the server sends * a column of A to each of the N clients. Upon the receipt * of "results" from any client, the server sends an unprocessed * column to the same client. If there are no remaining columns * to be processed, the server sends a message with the tag=0. * * In this particular implementation, the server actually * performs some of the calculations (sums the "results" returned * by the clients), in addition to "bookkeeping" and output * duties. This is desirable if these processing/calculations * are minimal. However, for more complicated functions and * processing, a different algorithm may be desired. See also * server_client_r.c. * * WE NOTE THAT MEMORY HAVE NOT BEEN SPARED TO CREATE A * MORE READABLE CODE. * * Notes on vector/matrix file: * * data file (with fixed name "matrix-vector" is expected to be of the form: * * m n * x1 ... xn * a11 a12 ... a1n * ... * am1 am2 ... amn * * where the matrix A is m by n, the vector x is n long * * Notes on implementation: * * The "server" processor parse the arguements from the command line * reads the size of A from the data file, * allocate memory to store both x and A, * reads the data file which contains the vector * x and matrix A, * distributes columns of A, * receives the "processed" columns from clients, * sums the processed columns, and * prints the output. * * The "client" processors receives columns of matrix A, * "processes" these columns: * multiplying each column by the corresponding * element of vector x, and * sends the processed column back to the server. * * Because the implementation works on columns of A. It is stored in * column-wise in memory. That is, the matrix A is stored in consecutive * (mn) memory locations where the first m corresponds to the first column * of A, the second m corresponds to the second column of A, etc. The * following is the nested loop used to read and store matrix, A: * * for (i = 0 ; i < m ; i++) { * for (j = 0 ; j < n ; j++) { * fscanf (f_ptr,"%f",&a[i + j*m]); * } * } * * Since the server receives results from all of the client processors, * we use the MPI call: * * MPI_Recv (buffer,m,MPI_DATA,MPI_ANY_SOURCE,MPI_ANY_TAG, * MPI_COMM_WORLD,&status); * * then query the structure 'status' for key information. * * typedef struct { * int count; * int MPI_SOURCE; * int MPI_TAG; * int MPI_ERROR; * } MPI_Status; * * Pseudo code: * server: * * 1. broadcast (vector) x to all client processors * 2. send a column of A to each processor with tag=column * 3. while (j < n OR * expected receives > 0 ) * receive results and send next unprocessed column * 4. print result * * client: * * 1. receive (vector) x * 2. receive a column of A with tag=column number * 3. multiply respective element of (vector) x (equal to * tag to produce (vector) result * 4. send result back to server * * RCS: $Revision: 1.2 $ */ /* includes ******************************************************************/ #include #include /* macros ********************************************************************/ #define DATA float #define MPI_DATA MPI_FLOAT /* prototypes ****************************************************************/ int parse_args (int argc, char *argv[], FILE **f_ptr); void usage (char *argv[]); int main (int argc, char *argv[]) { int i,j, m = 0, n = 0, rank, server = 0, num_processors = 0, receives = 0, source, tag, exit_val = 0; DATA *x = NULL, *a = NULL, *buffer = NULL, *result = NULL; FILE *f_ptr = NULL; MPI_Status status; /* initialize MPI */ MPI_Init (&argc, &argv); MPI_Comm_size (MPI_COMM_WORLD, &num_processors); MPI_Comm_rank (MPI_COMM_WORLD, &rank); /* server ********************************************************************/ if (rank == server) { fprintf (stderr,"%d processors used\n",num_processors); if (parse_args (argc,argv,&f_ptr)) { fscanf (f_ptr,"%d %d",&m,&n); MPI_Bcast (&m,1,MPI_INT,server,MPI_COMM_WORLD); MPI_Bcast (&n,1,MPI_INT,server,MPI_COMM_WORLD); if (n > num_processors) { if ((m && n) && ((a = (DATA *) malloc (m * n * sizeof (DATA)))!=NULL) && ((x = (DATA *) malloc (n * sizeof(DATA)))!=NULL) && ((buffer = (DATA *) malloc (m * sizeof(DATA)))!=NULL) && ((result = (DATA *) calloc (m, sizeof(DATA)))!=NULL) ) { /* read vector x[n] */ for (j = 0 ; j < n ; j++) { fscanf (f_ptr,"%f",&x[j]); } /* 1. server pseudo-code */ MPI_Bcast (x,n,MPI_DATA,server,MPI_COMM_WORLD); /* read matrix a[m,n] */ for (i = 0 ; i < m ; i++) { for (j = 0 ; j < n ; j++) { /* note: we're storing the matrix column-wise */ fscanf (f_ptr,"%f",&a[i + j*m]); } } /* 2. server pseudo-code */ for (j = 0 ; (j < n) && ((j+1) < num_processors) ; j++) { /* send column j to processor j+1 */ /* processors are 1-up, columns are 0-up */ /* send &a[j*m] with tag=j+1 */ MPI_Send (&a[j*m],m,MPI_DATA,j+1,j+1,MPI_COMM_WORLD); receives++; } /* 3. server pseudo-code */ while (receives || (j < n)) { /* receive from MPI_ANY_SOURCE / MPI_ANY_TAG */ /* test TAG - place into column number TAG */ MPI_Recv (buffer,m,MPI_DATA,MPI_ANY_SOURCE,MPI_ANY_TAG, MPI_COMM_WORLD,&status); receives--; /* result was initialized from calloc */ for (i = 0 ; i < m ; i++) { result[i] += buffer[i]; } /* if there is more data, send to status.MPI_SOURCE */ if (j < n) { MPI_Send (&a[j*m],m,MPI_DATA,status.MPI_SOURCE,j+1, MPI_COMM_WORLD); receives++; j++; } /* send a tag of zero to indicate end */ else { MPI_Send (a,m,MPI_DATA,status.MPI_SOURCE,0,MPI_COMM_WORLD); } } } else { fprintf (stderr,"server can't allocate memory\n"); /* send message with tag=0 to all processors */ for (j = 0 ; (j < n) && ((j+1) < num_processors) ; j++) { MPI_Send (a,m,MPI_DATA,j+1,0,MPI_COMM_WORLD); receives++; } } /* 4. server pseudo-code */ for (i = 0 ; i < m ; i++) { fprintf (stdout,"%f\n",result[i]); } } /* if (n > num_processors) */ else { fprintf (stderr,"procesor %d error: too many processors used\n",rank); exit_val = 1; } } else { MPI_Bcast (&m,1,MPI_INT,server,MPI_COMM_WORLD); MPI_Bcast (&n,1,MPI_INT,server,MPI_COMM_WORLD); usage (argv); exit_val = 2; } } /* client ********************************************************************/ else { MPI_Bcast (&m,1,MPI_INT,server,MPI_COMM_WORLD); MPI_Bcast (&n,1,MPI_INT,server,MPI_COMM_WORLD); if (m && n) { if (n > num_processors) { /* note that processors only receive one column of A at a time */ if ((a = (DATA *) malloc (m * sizeof (DATA))) && (x = (DATA *) malloc (n * sizeof(DATA))) && (result = (DATA *) malloc (1 * sizeof(DATA))) ) { /* 1. client pseudo-code */ MPI_Bcast (x,n,MPI_DATA,server,MPI_COMM_WORLD); /* 2. client pseudo-code */ /* initial recv */ MPI_Recv (a,m,MPI_DATA,server,MPI_ANY_TAG,MPI_COMM_WORLD,&status); while (status.MPI_TAG) { /* 3. client pseudo-code */ /* process data: tag-1 equals the column number*/ for (i = 0 ;i < m ; i++) { a[i] *= x[status.MPI_TAG-1]; } /* send results back to server */ /* receive new row */ /* 4. client pseudo-code */ MPI_Send ( a,m,MPI_DATA,server,status.MPI_TAG,MPI_COMM_WORLD); MPI_Recv (a,m,MPI_DATA,server,MPI_ANY_TAG,MPI_COMM_WORLD,&status); } } else { fprintf (stderr,"processor %d cannot allocate memory\n",rank); } } /* if (n > num_processors) */ else { fprintf (stderr,"procesor %d error: too many processors used\n",rank); exit_val = 1; } } else { exit_val = 2; } } MPI_Finalize(); exit (exit_val); } /****************************************************************************** parse_args() ***********************************************************/ int parse_args (int argc, char *argv[], FILE **f_ptr) { int i, retval = 1; if (argc > 1) { for (i = 1 ; i < argc ; i++) { if (strcmp(argv[i],"-f") == 0) { if (*f_ptr = fopen (argv[++i],"r")) { fprintf (stderr,"%s: reading file %s\n",argv[0],argv[i]); } else { fprintf (stderr,"%s error: error opening %s for reading\n", argv[0],argv[i]); retval = 0; } } else { fprintf (stderr,"%s error: %s option unknown\n",argv[0], argv[i]); } } } else { retval = 0; } return (retval); } /****************************************************************************** usage() ****************************************************************/ void usage (char *argv[]) { fprintf (stderr,"\n%s usage: %s -f matrix-file\n\n",argv[0],argv[0]); }