Commit 152ac2a3 authored by Francois Gygi's avatar Francois Gygi
Browse files

loading of URI using http connection

git-svn-id: http://qboxcode.org/svn/qb/trunk@1363 cba15fb0-1239-40c8-b417-11db7ca47a34
parent ba933bc4
......@@ -75,16 +75,20 @@ void SampleReader::readSample (Sample& s, const string uri, bool serial)
catch (const XMLException& toCatch)
{
cout << " Sample::readSample: Error during XML initialization :\n"
cout << " SampleReader::readSample: Error during XML initialization :\n"
<< StrX(toCatch.getMessage()) << endl;
s.ctxt_.abort(1);
return;
}
string xmlcontent;
DoubleMatrix gfdata(ctxt_);
XMLGFPreprocessor xmlgfp;
xmlgfp.process(uri.c_str(),gfdata,xmlcontent,serial);
if ( xmlgfp.process(uri.c_str(),gfdata,xmlcontent,serial) )
{
cout << " SampleReader::readSample: Error in XMLGFPreprocessor" << endl;
return;
}
// Each task holds a copy of xmlcontent
// The distributed matrix gfdata contains the grid function data
......
......@@ -16,17 +16,22 @@
//
////////////////////////////////////////////////////////////////////////////////
#include <cassert>
#include <iostream>
#include <fstream>
#include <sstream>
#include <string>
#include <cstring> // memcpy
#include <vector>
#include <cstdlib>
#include <cstdio>
#include <cassert>
#include <vector>
#include <valarray>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include "Timer.h"
#include "Context.h"
......@@ -57,8 +62,8 @@ using namespace std;
// elements expanded to include all species information.
//
////////////////////////////////////////////////////////////////////////////////
void XMLGFPreprocessor::process(const char* const uri,
DoubleMatrix& gfdata, string& xmlcontent, bool serial)
int XMLGFPreprocessor::process(const char* const uri,
DoubleMatrix& gfdata, string& xmlcontent, bool serial)
{
cout.precision(4);
......@@ -104,7 +109,7 @@ void XMLGFPreprocessor::process(const char* const uri,
{
cout << " process " << ctxt.mype() << " could not open file "
<< uri << " for reading" << endl;
return;
return 1;
}
off_t sz = statbuf.st_size;
......@@ -164,24 +169,344 @@ void XMLGFPreprocessor::process(const char* const uri,
<< sz/(tm.real()*1024*1024) << " MB/s" << endl;
}
// At this point all tasks hold a fragment of size local_size in buf
} // if (read_from_file)
else
{
// read from a URL
// task 0 reads the data.
// Use a parser on task 0 to access the document. Use a simple handler to
// distribute the data to all tasks. Use progressive parsing to
// catch the end of the <atomset> element if reading "atoms only" and
// interrupt parsing.
// Distribute fragements of fixed size to all tasks in round-robin
// fashion. Then reassemble the document into a single string using
// an MPI call
assert(0=="XMLGFPreprocessor::process: URL source not implemented");
// read from a URI
// task 0 connects to the server, gets the data and distributes it
// to MPI tasks
tm.start();
int mype = rctxt.mype();
int nprocs = rctxt.size();
bool onpe0 = rctxt.onpe0();
// get a document from an http server
// the host name is e.g. "128.120.80.40" or "fpmd.ucdavis.edu"
// the document name is e.g. "/index.html"
//
// the document is distributed in local strings localdoc
//
// check that the uri starts with http://
string suri(uri);
if ( suri.substr(0,7) != string("http://") )
{
if ( onpe0 )
{
cout << " URI must start with http://" << endl;
cout << " could not access URI: " << uri << endl;
}
// return with error code
return 2;
}
else
{
// erase "http://" from the URI
suri.erase(0,7);
}
string localdoc;
int sockfd;
struct addrinfo hints, *servinfo, *p;
int rv;
const int blocksize = 1024*1024;
int nblocal = 0; // local number of blocks received
int total_received = 0;
if ( onpe0 )
{
cout << " XMLGFPreprocessor: blocksize: " << blocksize << endl;
char recvbuffer[blocksize];
string buffer;
// parse URI
// e.g. URI = "www.example.com/file.html" or "123.45.678.90/index.html"
// find position of first "/" in argument
string::size_type first_slash_pos = suri.find_first_of("/");
string host = suri.substr(0,first_slash_pos);
string docname = suri.substr(first_slash_pos);
cout << " XMLGFPreprocessor: host: " << host << endl;
cout << " XMLGFPreprocessor: docname: " << docname << endl;
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
if ( (rv = getaddrinfo(host.c_str(), "http", &hints, &servinfo)) != 0)
{
cout << " XMLGFPreprocessor: getaddrinfo: "
<< gai_strerror(rv) << endl;
return 3;
}
// loop through all the results and connect to the first working address
for ( p = servinfo; p != NULL; p = p->ai_next )
{
if ( (sockfd = socket(p->ai_family, p->ai_socktype,
p->ai_protocol)) == -1 )
{
perror("socket");
continue;
}
if (connect(sockfd, p->ai_addr, p->ai_addrlen) == -1)
{
close(sockfd);
perror("connect");
continue;
}
break; // if we get here, we must have connected successfully
}
if (p == NULL)
{
// looped off the end of the list with no connection
cout << " XMLGFPreprocessor: failed to connect" << endl;
return 4;
}
cout << " XMLGFPreprocessor: connected to " << host << " .. " << endl;
// assemble request string
string req = "GET " + docname + " HTTP/1.0\r\nHOST:" + host + "\r\n\r\n";
// cout << " req = " << req << endl;
// send request
if ( send(sockfd, req.c_str(), req.size(), 0) == -1)
{
perror("send");
return 5;
}
cout << " XMLGFPreprocessor: waiting for response..." << endl;
// read packets and append to document
int ibglobal = 0;
int len = 0;
do
{
// read a block into buffer
do
{
len = recv(sockfd, recvbuffer, blocksize, 0);
total_received += len;
if ( len > 0 )
{
buffer.append(recvbuffer,len);
}
} while ( len > 0 && buffer.size() < blocksize );
if ( len > 0 )
{
// buffer contains at least blocksize characters
// send first blocksize chars of buffer
int dest = ibglobal%nprocs;
if ( dest == 0 )
{
localdoc.append(buffer.c_str(),blocksize);
nblocal++;
}
else
{
int tag = 0;
MPI_Send((void*) buffer.c_str(),blocksize,
MPI_CHAR,dest,tag,rctxt.comm());
}
ibglobal++;
// remove first blocksize chars from localdoc
buffer.erase(0,blocksize);
}
else
{
// len = 0: reached end of file
// send remaining chars in buffer
int dest = ibglobal%nprocs;
if ( dest == 0 )
{
localdoc.append(buffer);
nblocal++;
}
else
{
int tag = 0;
MPI_Send((void*) buffer.c_str(),buffer.size(),
MPI_CHAR,dest,tag,rctxt.comm());
}
ibglobal++;
}
} while ( len != 0 );
cout << " XMLGFPreprocessor: received " << total_received
<< " bytes" << endl;
freeaddrinfo(servinfo);
// send empty message to all listening tasks to signify end
for ( int i = 1; i < nprocs; i++ )
MPI_Send(recvbuffer,0,MPI_CHAR,i,0,rctxt.comm());
// broadcast total number of characters received
MPI_Bcast(&total_received,1,MPI_INT,0,rctxt.comm());
}
else // onpe0
{
// start listening
// cout << mype << ": listening.." << endl;
bool done = false;
while ( !done )
{
MPI_Status status;
char rbuffer[blocksize];
// blocking receive of message from task 0
int ierr = MPI_Recv(rbuffer, blocksize, MPI_CHAR, 0, 0,
rctxt.comm(), &status);
if ( ierr != 0 )
{
cout << " XMLGFPreprocessor::process: Error in MPI_Recv on node "
<< mype << endl;
MPI_Abort(rctxt.comm(),2);
}
int count = -1;
MPI_Get_count(&status,MPI_CHAR,&count);
done = ( count == 0 );
if ( !done )
{
localdoc.append(rbuffer,count);
nblocal++;
}
}
MPI_Bcast(&total_received,1,MPI_INT,0,rctxt.comm());
// cout << "node " << mype << " done" << endl;
}
MPI_Barrier(rctxt.comm());
if ( onpe0 )
cout << " XMLGFPreprocessor: done reading" << endl;
// cout << mype << ": localdoc.size(): " << localdoc.size() << endl;
// cout << mype << ": localdoc: " << localdoc << endl;
// cout << mype << ": nblocal: " << nblocal << endl;
// redistribute data to all tasks
// cyclic to cyclic array redistribution
// determine nbglobal: total number of blocks
int nbglobal;
MPI_Allreduce(&nblocal, &nbglobal, 1, MPI_INT, MPI_SUM, rctxt.comm());
// maximum number of messages received on a task =
// maximum number of local blocks nblocalmax
int nblocalmax = (nbglobal+nprocs-1)/nprocs;
// Send messages
int nsend = 0;
MPI_Request *send_req = new MPI_Request[nblocal];
for ( int iblocal = 0; iblocal < nblocal; iblocal++ )
{
int ibglobal = mype + iblocal * nprocs;
int dest = ibglobal / nblocalmax;
const char *p = localdoc.c_str();
int len = ibglobal < nbglobal-1 ? blocksize : total_received % blocksize;
MPI_Isend((void*) &p[iblocal*blocksize],len,MPI_CHAR,dest,ibglobal,
rctxt.comm(),&send_req[iblocal]);
// cout << mype << ": sending block " << ibglobal
// << " to " << dest << endl;
nsend++;
}
int tmpnsend;
MPI_Reduce(&nsend,&tmpnsend,1,MPI_INT,MPI_SUM,0,rctxt.comm());
nsend = tmpnsend;
int nrecv = 0;
MPI_Request *recv_req = new MPI_Request[nblocalmax];
valarray<char> rbuf(nblocalmax*blocksize);
int ireq = 0;
int localsize = 0;
// post non-blocking receives
for ( int ibglobal = 0; ibglobal < nbglobal; ibglobal++ )
{
// coordinates of block ibglobal in final configuration: (i,j)
int j = ibglobal / nblocalmax;
// check if block (i,j) will be received on this task
if ( j == mype )
{
// determine task sending block (i,j)
int src = ibglobal % nprocs;
int iblocal = ibglobal % nblocalmax;
int len = ibglobal<nbglobal-1 ? blocksize : total_received % blocksize;
MPI_Irecv(&rbuf[iblocal*blocksize],len,MPI_CHAR,src,ibglobal,
rctxt.comm(),&recv_req[ireq]);
// cout << mype << ": receiving block " << ibglobal
// << " from " << src << endl;
nrecv++;
ireq++;
localsize += len;
}
}
int tmpnrecv;
MPI_Reduce(&nrecv,&tmpnrecv,1,MPI_INT,MPI_SUM,0,rctxt.comm());
nrecv = tmpnrecv;
// if ( onpe0 )
// cout << "total msgs sent/received: " << nsend << "/" << nrecv << endl;
// cout << mype << ": localsize: " << localsize << endl;
// wait for send calls to complete
MPI_Waitall(nblocal, send_req, MPI_STATUSES_IGNORE);
MPI_Waitall(ireq, recv_req, MPI_STATUSES_IGNORE);
delete [] recv_req;
delete [] send_req;
// the data now resides in rbuf on each task
buf.assign(&rbuf[0],localsize);
// on task 0, remove HTTP header
// erase characters before "<?xml " header
int xml_decl_error = 0;
string::size_type xml_start;
if ( onpe0 )
{
xml_start = buf.find("<?xml ");
if ( xml_start == string::npos )
{
cout << " XMLGFPreprocessor: could not find <?xml > declaration"
<< endl;
xml_decl_error = true;
}
else
{
cout << " XMLGFPreprocessor: HTTP header:" << endl;
cout << buf.substr(0,xml_start);
cout << " XMLGFPreprocessor: <?xml > declaration found at position "
<< xml_start << endl;
}
}
MPI_Bcast(&xml_decl_error,1,MPI_INT,0,rctxt.comm());
if ( xml_decl_error )
return 6;
// An <?xml > declaration was found
if ( onpe0 )
buf.erase(0,xml_start);
ctxt.barrier();
tm.stop();
if ( ctxt.onpe0() )
{
cout << " XMLGFPreprocessor: read time: " << tm.real() << endl;
cout << " XMLGFPreprocessor: read rate: "
<< total_received/(tm.real()*1024*1024) << " MB/s" << endl;
}
}
// At this point all tasks hold a fragment of size local_size in buf
tm.reset();
tm.start();
......@@ -1149,4 +1474,5 @@ void XMLGFPreprocessor::process(const char* const uri,
{
cout << " XMLGFPreprocessor: total time: " << ttm.real() << endl;
}
return 0;
}
......@@ -36,6 +36,6 @@ class XMLGFPreprocessor
{
public:
void process(const char* const filename,
int process(const char* const filename,
DoubleMatrix& gfdata, std::string& xmlcontent, bool serial);
};
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment