diff --git a/src/SampleReader.C b/src/SampleReader.C index 59beb66..c97fc3a 100644 --- a/src/SampleReader.C +++ b/src/SampleReader.C @@ -75,16 +75,20 @@ void SampleReader::readSample (Sample& s, const string uri, bool serial) catch (const XMLException& toCatch) { - cout << " Sample::readSample: Error during XML initialization :\n" + cout << " SampleReader::readSample: Error during XML initialization :\n" << StrX(toCatch.getMessage()) << endl; - s.ctxt_.abort(1); + return; } string xmlcontent; DoubleMatrix gfdata(ctxt_); XMLGFPreprocessor xmlgfp; - xmlgfp.process(uri.c_str(),gfdata,xmlcontent,serial); + if ( xmlgfp.process(uri.c_str(),gfdata,xmlcontent,serial) ) + { + cout << " SampleReader::readSample: Error in XMLGFPreprocessor" << endl; + return; + } // Each task holds a copy of xmlcontent // The distributed matrix gfdata contains the grid function data diff --git a/src/XMLGFPreprocessor.C b/src/XMLGFPreprocessor.C index f44a4df..6dc3414 100644 --- a/src/XMLGFPreprocessor.C +++ b/src/XMLGFPreprocessor.C @@ -16,17 +16,22 @@ // //////////////////////////////////////////////////////////////////////////////// -#include #include #include #include #include #include // memcpy -#include #include #include +#include + +#include #include + #include +#include +#include +#include #include "Timer.h" #include "Context.h" @@ -57,8 +62,8 @@ using namespace std; // elements expanded to include all species information. // //////////////////////////////////////////////////////////////////////////////// -void XMLGFPreprocessor::process(const char* const uri, - DoubleMatrix& gfdata, string& xmlcontent, bool serial) +int XMLGFPreprocessor::process(const char* const uri, + DoubleMatrix& gfdata, string& xmlcontent, bool serial) { cout.precision(4); @@ -104,7 +109,7 @@ void XMLGFPreprocessor::process(const char* const uri, { cout << " process " << ctxt.mype() << " could not open file " << uri << " for reading" << endl; - return; + return 1; } off_t sz = statbuf.st_size; @@ -164,24 +169,344 @@ void XMLGFPreprocessor::process(const char* const uri, << sz/(tm.real()*1024*1024) << " MB/s" << endl; } - // At this point all tasks hold a fragment of size local_size in buf - } // if (read_from_file) else { - // read from a URL - // task 0 reads the data. - // Use a parser on task 0 to access the document. Use a simple handler to - // distribute the data to all tasks. Use progressive parsing to - // catch the end of the element if reading "atoms only" and - // interrupt parsing. - - // Distribute fragements of fixed size to all tasks in round-robin - // fashion. Then reassemble the document into a single string using - // an MPI call - assert(0=="XMLGFPreprocessor::process: URL source not implemented"); + // read from a URI + // task 0 connects to the server, gets the data and distributes it + // to MPI tasks + tm.start(); + + int mype = rctxt.mype(); + int nprocs = rctxt.size(); + bool onpe0 = rctxt.onpe0(); + + // get a document from an http server + // the host name is e.g. "128.120.80.40" or "fpmd.ucdavis.edu" + // the document name is e.g. "/index.html" + // + // the document is distributed in local strings localdoc + // + // check that the uri starts with http:// + string suri(uri); + if ( suri.substr(0,7) != string("http://") ) + { + if ( onpe0 ) + { + cout << " URI must start with http://" << endl; + cout << " could not access URI: " << uri << endl; + } + // return with error code + return 2; + } + else + { + // erase "http://" from the URI + suri.erase(0,7); + } + + string localdoc; + int sockfd; + struct addrinfo hints, *servinfo, *p; + int rv; + const int blocksize = 1024*1024; + int nblocal = 0; // local number of blocks received + int total_received = 0; + + if ( onpe0 ) + { + cout << " XMLGFPreprocessor: blocksize: " << blocksize << endl; + char recvbuffer[blocksize]; + string buffer; + + // parse URI + // e.g. URI = "www.example.com/file.html" or "123.45.678.90/index.html" + // find position of first "/" in argument + string::size_type first_slash_pos = suri.find_first_of("/"); + string host = suri.substr(0,first_slash_pos); + string docname = suri.substr(first_slash_pos); + cout << " XMLGFPreprocessor: host: " << host << endl; + cout << " XMLGFPreprocessor: docname: " << docname << endl; + + memset(&hints, 0, sizeof hints); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + + if ( (rv = getaddrinfo(host.c_str(), "http", &hints, &servinfo)) != 0) + { + cout << " XMLGFPreprocessor: getaddrinfo: " + << gai_strerror(rv) << endl; + return 3; + } + + // loop through all the results and connect to the first working address + for ( p = servinfo; p != NULL; p = p->ai_next ) + { + if ( (sockfd = socket(p->ai_family, p->ai_socktype, + p->ai_protocol)) == -1 ) + { + perror("socket"); + continue; + } + + if (connect(sockfd, p->ai_addr, p->ai_addrlen) == -1) + { + close(sockfd); + perror("connect"); + continue; + } + + break; // if we get here, we must have connected successfully + } + + if (p == NULL) + { + // looped off the end of the list with no connection + cout << " XMLGFPreprocessor: failed to connect" << endl; + return 4; + } + + cout << " XMLGFPreprocessor: connected to " << host << " .. " << endl; + + // assemble request string + string req = "GET " + docname + " HTTP/1.0\r\nHOST:" + host + "\r\n\r\n"; + // cout << " req = " << req << endl; + + // send request + if ( send(sockfd, req.c_str(), req.size(), 0) == -1) + { + perror("send"); + return 5; + } + + cout << " XMLGFPreprocessor: waiting for response..." << endl; + + // read packets and append to document + int ibglobal = 0; + int len = 0; + do + { + // read a block into buffer + do + { + len = recv(sockfd, recvbuffer, blocksize, 0); + total_received += len; + if ( len > 0 ) + { + buffer.append(recvbuffer,len); + } + } while ( len > 0 && buffer.size() < blocksize ); + + if ( len > 0 ) + { + // buffer contains at least blocksize characters + // send first blocksize chars of buffer + int dest = ibglobal%nprocs; + if ( dest == 0 ) + { + localdoc.append(buffer.c_str(),blocksize); + nblocal++; + } + else + { + int tag = 0; + MPI_Send((void*) buffer.c_str(),blocksize, + MPI_CHAR,dest,tag,rctxt.comm()); + } + ibglobal++; + // remove first blocksize chars from localdoc + buffer.erase(0,blocksize); + } + else + { + // len = 0: reached end of file + // send remaining chars in buffer + int dest = ibglobal%nprocs; + if ( dest == 0 ) + { + localdoc.append(buffer); + nblocal++; + } + else + { + int tag = 0; + MPI_Send((void*) buffer.c_str(),buffer.size(), + MPI_CHAR,dest,tag,rctxt.comm()); + } + ibglobal++; + } + } while ( len != 0 ); + cout << " XMLGFPreprocessor: received " << total_received + << " bytes" << endl; + + freeaddrinfo(servinfo); + + // send empty message to all listening tasks to signify end + for ( int i = 1; i < nprocs; i++ ) + MPI_Send(recvbuffer,0,MPI_CHAR,i,0,rctxt.comm()); + + // broadcast total number of characters received + MPI_Bcast(&total_received,1,MPI_INT,0,rctxt.comm()); + + } + else // onpe0 + { + // start listening + // cout << mype << ": listening.." << endl; + bool done = false; + while ( !done ) + { + MPI_Status status; + char rbuffer[blocksize]; + // blocking receive of message from task 0 + int ierr = MPI_Recv(rbuffer, blocksize, MPI_CHAR, 0, 0, + rctxt.comm(), &status); + if ( ierr != 0 ) + { + cout << " XMLGFPreprocessor::process: Error in MPI_Recv on node " + << mype << endl; + MPI_Abort(rctxt.comm(),2); + } + int count = -1; + MPI_Get_count(&status,MPI_CHAR,&count); + done = ( count == 0 ); + if ( !done ) + { + localdoc.append(rbuffer,count); + nblocal++; + } + } + + MPI_Bcast(&total_received,1,MPI_INT,0,rctxt.comm()); + // cout << "node " << mype << " done" << endl; + } + + MPI_Barrier(rctxt.comm()); + + if ( onpe0 ) + cout << " XMLGFPreprocessor: done reading" << endl; + // cout << mype << ": localdoc.size(): " << localdoc.size() << endl; + // cout << mype << ": localdoc: " << localdoc << endl; + // cout << mype << ": nblocal: " << nblocal << endl; + + // redistribute data to all tasks + // cyclic to cyclic array redistribution + + // determine nbglobal: total number of blocks + int nbglobal; + MPI_Allreduce(&nblocal, &nbglobal, 1, MPI_INT, MPI_SUM, rctxt.comm()); + + // maximum number of messages received on a task = + // maximum number of local blocks nblocalmax + int nblocalmax = (nbglobal+nprocs-1)/nprocs; + + // Send messages + int nsend = 0; + MPI_Request *send_req = new MPI_Request[nblocal]; + for ( int iblocal = 0; iblocal < nblocal; iblocal++ ) + { + int ibglobal = mype + iblocal * nprocs; + int dest = ibglobal / nblocalmax; + const char *p = localdoc.c_str(); + int len = ibglobal < nbglobal-1 ? blocksize : total_received % blocksize; + MPI_Isend((void*) &p[iblocal*blocksize],len,MPI_CHAR,dest,ibglobal, + rctxt.comm(),&send_req[iblocal]); + // cout << mype << ": sending block " << ibglobal + // << " to " << dest << endl; + nsend++; + } + int tmpnsend; + MPI_Reduce(&nsend,&tmpnsend,1,MPI_INT,MPI_SUM,0,rctxt.comm()); + nsend = tmpnsend; + + int nrecv = 0; + MPI_Request *recv_req = new MPI_Request[nblocalmax]; + valarray rbuf(nblocalmax*blocksize); + int ireq = 0; + int localsize = 0; + // post non-blocking receives + for ( int ibglobal = 0; ibglobal < nbglobal; ibglobal++ ) + { + // coordinates of block ibglobal in final configuration: (i,j) + int j = ibglobal / nblocalmax; + // check if block (i,j) will be received on this task + if ( j == mype ) + { + // determine task sending block (i,j) + int src = ibglobal % nprocs; + int iblocal = ibglobal % nblocalmax; + int len = ibglobal declaration" + << endl; + xml_decl_error = true; + } + else + { + cout << " XMLGFPreprocessor: HTTP header:" << endl; + cout << buf.substr(0,xml_start); + cout << " XMLGFPreprocessor: declaration found at position " + << xml_start << endl; + } + } + MPI_Bcast(&xml_decl_error,1,MPI_INT,0,rctxt.comm()); + if ( xml_decl_error ) + return 6; + + // An declaration was found + + if ( onpe0 ) + buf.erase(0,xml_start); + + ctxt.barrier(); + tm.stop(); + + if ( ctxt.onpe0() ) + { + cout << " XMLGFPreprocessor: read time: " << tm.real() << endl; + cout << " XMLGFPreprocessor: read rate: " + << total_received/(tm.real()*1024*1024) << " MB/s" << endl; + } } + // At this point all tasks hold a fragment of size local_size in buf + tm.reset(); tm.start(); @@ -1149,4 +1474,5 @@ void XMLGFPreprocessor::process(const char* const uri, { cout << " XMLGFPreprocessor: total time: " << ttm.real() << endl; } + return 0; } diff --git a/src/XMLGFPreprocessor.h b/src/XMLGFPreprocessor.h index 1f6c86a..76b0215 100644 --- a/src/XMLGFPreprocessor.h +++ b/src/XMLGFPreprocessor.h @@ -36,6 +36,6 @@ class XMLGFPreprocessor { public: - void process(const char* const filename, + int process(const char* const filename, DoubleMatrix& gfdata, std::string& xmlcontent, bool serial); };