From e27c3c22cf0afbdd7eeb417625a766d91051680a Mon Sep 17 00:00:00 2001 From: Francois Gygi Date: Fri, 18 Apr 2008 03:40:03 +0000 Subject: [PATCH] Reimplemented parallel write using less memory. git-svn-id: http://qboxcode.org/svn/qb/trunk@607 cba15fb0-1239-40c8-b417-11db7ca47a34 --- src/SlaterDet.C | 108 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------------------------------------- 1 file changed, 68 insertions(+), 40 deletions(-) diff --git a/src/SlaterDet.C b/src/SlaterDet.C index 5f1a48d..e9711a1 100644 --- a/src/SlaterDet.C +++ b/src/SlaterDet.C @@ -3,13 +3,14 @@ // SlaterDet.C // //////////////////////////////////////////////////////////////////////////////// -// $Id: SlaterDet.C,v 1.48 2008-04-15 01:36:44 fgygi Exp $ +// $Id: SlaterDet.C,v 1.49 2008-04-18 03:40:03 fgygi Exp $ #include "SlaterDet.h" #include "FourierTransform.h" #include "Context.h" #include "blas.h" // daxpy #include "Base64Transcoder.h" +#include "SharedFilePtr.h" #include "Timer.h" #include @@ -1332,42 +1333,56 @@ void SlaterDet::write(SharedFilePtr& sfp, string encoding, double weight, int is FourierTransform ft(*basis_,basis_->np(0),basis_->np(1),basis_->np(2)); vector > wftmp(ft.np012loc()); const bool real_basis = basis_->real(); - const int wftmpr_size = real_basis ? ft.np012() : 2*ft.np012(); const int wftmpr_loc_size = real_basis ? ft.np012loc() : 2*ft.np012loc(); - vector wftmpr(wftmpr_size); + vector wftmpr(wftmpr_loc_size); Base64Transcoder xcdr; - ostringstream ostr; + char* wbuf = 0; + size_t wbufsize = 0; + + // Segment n on process iprow is sent to row (n*nprow+iprow)/(nprow) + const Context& colctxt = basis_->context(); + const int nprow = ctxt_.nprow(); + vector scounts(nprow), sdispl(nprow), rcounts(nprow), rdispl(nprow); + + string header; if ( ctxt_.onpe0() ) { + ostringstream ostr_hdr; string spin = (ispin > 0) ? "down" : "up"; - ostr << "kpoint() << "\"\n" + ostr_hdr << " spin=\"" << spin << "\""; + ostr_hdr << " kpoint=\"" << basis_->kpoint() << "\"\n" << " weight=\"" << weight << "\"" << " size=\"" << nst() << "\">" << endl; - ostr << "" + ostr_hdr << "" << endl; - ostr.setf(ios::fixed,ios::floatfield); - ostr.setf(ios::right,ios::adjustfield); + ostr_hdr.setf(ios::fixed,ios::floatfield); + ostr_hdr.setf(ios::right,ios::adjustfield); for ( int i = 0; i < nst(); i++ ) { - ostr << " " << setprecision(8) << occ_[i]; + ostr_hdr << " " << setprecision(8) << occ_[i]; if ( i%10 == 9 ) - ostr << endl; + ostr_hdr << endl; } if ( nst()%10 != 0 ) - ostr << endl; - ostr << "" << endl; + ostr_hdr << endl; + ostr_hdr << "" << endl; + header = ostr_hdr.str(); } // serialize all local columns of c and store in segments seg[n] - vector seg(nstloc()); + string seg; for ( int n = 0; n < nstloc(); n++ ) { + seg.clear(); + if ( n == 0 && ctxt_.myrow() == 0 ) + seg = header; + + ostringstream ostr; //cout << " state " << n << " is stored on column " // << ctxt_.mycol() << " local index: " << c_.y(n) << endl; ft.backward(c_.cvalptr(c_.mloc()*n),&wftmp[0]); @@ -1583,26 +1598,14 @@ void SlaterDet::write(SharedFilePtr& sfp, string encoding, double weight, int is ostr << "\n"; } // copy contents of ostr stringstream to segment - seg[n] += ostr.str(); - // cout << ctxt_.mype() << ": segment " << n << " size: " << seg[n].size() + seg += ostr.str(); + // cout << ctxt_.mype() << ": segment " << n << " size: " << seg.size() // << endl; - ostr.str(""); - } // for n - - // All segments are defined - // redistribute segments to tasks within each process column - string wbuf; + // seg is defined - // There are nprow*nstloc segments in the process column - // Determine the destination of each segment - // Segment nloc on process iprow is sent to row (nloc*nprow+iprow)/(nprow) - const Context& colctxt = basis_->context(); - const int nprow = ctxt_.nprow(); - vector scounts(nprow), sdispl(nprow), rcounts(nprow), rdispl(nprow); + // redistribute segments to tasks within each process column - for ( int nloc = 0; nloc < nstloc(); nloc++ ) - { for ( int i = 0; i < nprow; i++ ) { scounts[i] = 0; @@ -1611,8 +1614,8 @@ void SlaterDet::write(SharedFilePtr& sfp, string encoding, double weight, int is rdispl[i] = 0; } - int idest = (nloc*nprow+ctxt_.myrow())/nstloc(); - scounts[idest] = seg[nloc].size(); + int idest = (n*nprow+ctxt_.myrow())/nstloc(); + scounts[idest] = seg.size(); // send sendcounts to all procs MPI_Alltoall(&scounts[0],1,MPI_INT,&rcounts[0],1,MPI_INT,colctxt.comm()); @@ -1627,12 +1630,35 @@ void SlaterDet::write(SharedFilePtr& sfp, string encoding, double weight, int is } char* rbuf = new char[rbufsize]; - int err = MPI_Alltoallv((void*)seg[nloc].c_str(),&scounts[0],&sdispl[0], + int err = MPI_Alltoallv((void*)seg.data(),&scounts[0],&sdispl[0], MPI_CHAR,rbuf,&rcounts[0],&rdispl[0],MPI_CHAR,colctxt.comm()); - wbuf.append(rbuf,rbufsize); + if ( err != 0 ) + cout << ctxt_.mype() + << " SlaterDet::write: error in MPI_Alltoallv" << endl; + + if ( rbufsize > 0 ) + { + // append rbuf to wbuf + char* tmp; + try + { + tmp = new char[wbufsize+rbufsize]; + } + catch ( bad_alloc ) + { + cout << ctxt_.mype() << " bad_alloc in wbuf append " + << " n=" << n + << " rbufsize=" << rbufsize + << " wbufsize=" << wbufsize << endl; + } + memcpy(tmp,wbuf,wbufsize); + memcpy(tmp+wbufsize,rbuf,rbufsize); + delete [] wbuf; + wbuf = tmp; + wbufsize += rbufsize; + } delete [] rbuf; - seg[nloc].clear(); } // wbuf now contains the data to be written in the correct order @@ -1646,7 +1672,7 @@ void SlaterDet::write(SharedFilePtr& sfp, string encoding, double weight, int is current_offset = sfp.offset(); // compute local offset of next write - long long int local_size = wbuf.size(); + long long int local_size = wbufsize; MPI_Scan(&local_size, &local_offset, 1, MPI_LONG_LONG, MPI_SUM, ctxt_.comm()); // add base and correct for inclusive scan by subtracting local_size @@ -1656,8 +1682,8 @@ void SlaterDet::write(SharedFilePtr& sfp, string encoding, double weight, int is MPI_Status status; // write wbuf from all tasks using computed offset - int len = wbuf.size(); - int err = MPI_File_write_at_all(sfp.file(),off,(void*)wbuf.c_str(),len, + int len = wbufsize; + int err = MPI_File_write_at_all(sfp.file(),off,(void*)wbuf,len, MPI_CHAR,&status); if ( err != 0 ) cout << ctxt_.mype() @@ -1666,10 +1692,12 @@ void SlaterDet::write(SharedFilePtr& sfp, string encoding, double weight, int is sfp.sync(); + delete [] wbuf; + if ( ctxt_.onpe0() ) { string s("\n"); - int err = MPI_File_write_at(sfp.file(),sfp.mpi_offset(),(void*) s.c_str(), + int err = MPI_File_write_at(sfp.file(),sfp.mpi_offset(),(void*) s.data(), s.size(),MPI_CHAR,&status); if ( err != 0 ) cout << ctxt_.mype() -- libgit2 0.26.0