MPI unification

This commit is contained in:
Joachim Schoeberl 2022-04-26 22:00:25 +02:00
parent b62c7b30ef
commit fd77d17e2b
8 changed files with 199 additions and 35 deletions

View File

@ -7,8 +7,10 @@
#endif
#include "array.hpp"
#include "table.hpp"
#include "exception.hpp"
#include "profiler.hpp"
#include "ngstream.hpp"
namespace ngcore
{
@ -52,6 +54,23 @@ namespace ngcore
return GetMPIType<T>();
}
inline void MyMPI_WaitAll (FlatArray<MPI_Request> requests)
{
static Timer t("MPI - WaitAll"); RegionTimer reg(t);
if (!requests.Size()) return;
MPI_Waitall (requests.Size(), requests.Data(), MPI_STATUSES_IGNORE);
}
inline int MyMPI_WaitAny (FlatArray<MPI_Request> requests)
{
int nr;
MPI_Waitany (requests.Size(), requests.Data(), &nr, MPI_STATUS_IGNORE);
return nr;
}
class NgMPI_Comm
{
protected:
@ -270,6 +289,20 @@ namespace ngcore
MPI_Bcast (&s, 1, GetMPIType<T>(), root, comm);
}
template <class T>
void Bcast (Array<T> & d, int root = 0)
{
if (size == 1) return;
int ds = d.Size();
Bcast (ds, root);
if (Rank() != root) d.SetSize (ds);
if (ds != 0)
MPI_Bcast (d.Data(), ds, GetMPIType<T>(), root, comm);
}
void Bcast (std::string & s, int root = 0) const
{
if (size == 1) return;
@ -335,6 +368,37 @@ namespace ngcore
}
template <typename T>
void ExchangeTable (DynamicTable<T> & send_data,
DynamicTable<T> & recv_data, int tag)
{
Array<int> send_sizes(size);
Array<int> recv_sizes(size);
for (int i = 0; i < size; i++)
send_sizes[i] = send_data[i].Size();
AllToAll (send_sizes, recv_sizes);
recv_data = DynamicTable<T> (recv_sizes, true);
Array<MPI_Request> requests;
for (int dest = 0; dest < size; dest++)
if (dest != rank && send_data[dest].Size())
requests.Append (ISend (FlatArray<T>(send_data[dest]), dest, tag));
for (int dest = 0; dest < size; dest++)
if (dest != rank && recv_data[dest].Size())
requests.Append (IRecv (FlatArray<T>(recv_data[dest]), dest, tag));
MyMPI_WaitAll (requests);
}
NgMPI_Comm SubCommunicator (FlatArray<int> procs) const
{
MPI_Comm subcomm;
@ -347,19 +411,44 @@ namespace ngcore
}; // class NgMPI_Comm
inline void MyMPI_WaitAll (FlatArray<MPI_Request> requests)
class MyMPI
{
static Timer t("MPI - WaitAll"); RegionTimer reg(t);
if (!requests.Size()) return;
MPI_Waitall (requests.Size(), requests.Data(), MPI_STATUSES_IGNORE);
bool initialized_by_me;
public:
MyMPI(int argc, char ** argv)
{
int is_init = -1;
MPI_Initialized(&is_init);
if (!is_init)
{
MPI_Init (&argc, &argv);
initialized_by_me = true;
}
else
initialized_by_me = false;
NgMPI_Comm comm(MPI_COMM_WORLD);
NGSOStream::SetGlobalActive (comm.Rank() == 0);
if (comm.Size() > 1)
TaskManager::SetNumThreads (1);
}
inline int MyMPI_WaitAny (FlatArray<MPI_Request> requests)
~MyMPI()
{
int nr;
MPI_Waitany (requests.Size(), requests.Data(), &nr, MPI_STATUS_IGNORE);
return nr;
if (initialized_by_me)
MPI_Finalize ();
}
};
#else // PARALLEL
class MPI_Comm {
@ -429,6 +518,14 @@ namespace ngcore
template <typename T>
void Bcast (T & s, int root = 0) const { ; }
template <class T>
void Bcast (Array<T> & d, int root = 0) { ; }
template <typename T>
void ExchangeTable (DynamicTable<T> & send_data,
DynamicTable<T> & recv_data, int tag) { ; }
NgMPI_Comm SubCommunicator (FlatArray<int> procs) const
{ return *this; }
};
@ -436,6 +533,13 @@ namespace ngcore
inline void MyMPI_WaitAll (FlatArray<MPI_Request> requests) { ; }
inline int MyMPI_WaitAny (FlatArray<MPI_Request> requests) { return 0; }
class MyMPI
{
public:
MyMPI(int argc, char ** argv) { ; }
};
#endif // PARALLEL
} // namespace ngcore

View File

@ -465,7 +465,7 @@ namespace ngcore
}
/// Creates table with a priori fixed entry sizes.
DynamicTable (const Array<int, IndexType> & entrysizes)
DynamicTable (const Array<int, IndexType> & entrysizes, bool setentrysize=false)
: data(entrysizes.Size())
{
size_t cnt = 0;
@ -479,6 +479,9 @@ namespace ngcore
for (auto i : data.Range())
{
data[i].maxsize = entrysizes[i];
if (setentrysize)
data[i].size = entrysizes[i];
else
data[i].size = 0;
data[i].col = &oneblock[cnt];
cnt += entrysizes[i];

View File

@ -26,7 +26,10 @@
namespace ngcore
{
// MPI rank, nranks TODO: Rename
extern NGCORE_API int id, ntasks;
// [[deprecated("don't use global id/ntasks")]]
extern NGCORE_API int id;
// [[deprecated("don't use global id/ntasks")]]
extern NGCORE_API int ntasks;
NGCORE_API std::string Demangle(const char* typeinfo);

View File

@ -186,7 +186,7 @@ namespace netgen
*/
template <typename T>
// [[deprecated("do we need that ? ")]]
[[deprecated("do we need that ? ")]]
inline void MyMPI_ExchangeTable (TABLE<T> & send_data,
TABLE<T> & recv_data, int tag,
const NgMPI_Comm & comm)
@ -217,6 +217,41 @@ namespace netgen
}
template <typename T>
[[deprecated("do we need that ? ")]]
inline void MyMPI_ExchangeTable (DynamicTable<T> & send_data,
DynamicTable<T> & recv_data, int tag,
const NgMPI_Comm & comm)
{
int rank = comm.Rank();
int ntasks = comm.Size();
Array<int> send_sizes(ntasks);
Array<int> recv_sizes(ntasks);
for (int i = 0; i < ntasks; i++)
send_sizes[i] = send_data[i].Size();
comm.AllToAll (send_sizes, recv_sizes);
// for (int i = 0; i < ntasks; i++)
// recv_data.SetEntrySize (i, recv_sizes[i], sizeof(T));
recv_data = DynamicTable<T> (recv_sizes, true);
Array<MPI_Request> requests;
for (int dest = 0; dest < ntasks; dest++)
if (dest != rank && send_data[dest].Size())
requests.Append (comm.ISend (FlatArray<T>(send_data[dest]), dest, tag));
for (int dest = 0; dest < ntasks; dest++)
if (dest != rank && recv_data[dest].Size())
requests.Append (comm.IRecv (FlatArray<T>(recv_data[dest]), dest, tag));
MyMPI_WaitAll (requests);
}
[[deprecated("do we still send commands?")]]
extern void MyMPI_SendCmd (const char * cmd);
[[deprecated("do we still send commands?")]]
@ -224,14 +259,14 @@ namespace netgen
template <class T>
// [[deprecated("use comm.BCast instead")]]
[[deprecated("use comm.BCast instead")]]
inline void MyMPI_Bcast (T & s, MPI_Comm comm)
{
MPI_Bcast (&s, 1, GetMPIType<T>(), 0, comm);
}
template <class T>
// [[deprecated("use comm.BCast instead")]]
[[deprecated("use comm.BCast instead")]]
inline void MyMPI_Bcast (NgArray<T, 0> & s, NgMPI_Comm comm)
{
int size = s.Size();

View File

@ -10,7 +10,6 @@ namespace netgen
// bool rational = true;
static void ComputeGaussRule (int n, NgArray<double> & xi, NgArray<double> & wi)
{
xi.SetSize (n);
@ -640,8 +639,9 @@ namespace netgen
}
#ifdef PARALLEL
TABLE<int> send_orders(ntasks), recv_orders(ntasks);
// #ifdef PARALLEL
// TABLE<int> send_orders(ntasks), recv_orders(ntasks);
DynamicTable<int> send_orders(ntasks), recv_orders(ntasks);
if (ntasks > 1 && working)
{
@ -654,11 +654,12 @@ namespace netgen
}
if (ntasks > 1)
MyMPI_ExchangeTable (send_orders, recv_orders, MPI_TAG_CURVE, comm);
// MyMPI_ExchangeTable (send_orders, recv_orders, MPI_TAG_CURVE, comm);
comm.ExchangeTable (send_orders, recv_orders, MPI_TAG_CURVE);
if (ntasks > 1 && working)
{
NgArray<int> cnt(ntasks);
Array<int> cnt(ntasks);
cnt = 0;
for (int e = 0; e < edgeorder.Size(); e++)
for (auto proc : partop.GetDistantEdgeProcs(e))
@ -667,7 +668,7 @@ namespace netgen
for (auto proc : partop.GetDistantFaceProcs(f))
faceorder[f] = max(faceorder[f], recv_orders[proc][cnt[proc]++]);
}
#endif
// #endif
edgecoeffsindex.SetSize (nedges+1);
@ -750,7 +751,9 @@ namespace netgen
if (ntasks > 1)
{
// distribute it ...
TABLE<double> senddata(ntasks), recvdata(ntasks);
// TABLE<double> senddata(ntasks), recvdata(ntasks);
DynamicTable<double> senddata(ntasks), recvdata(ntasks);
if (working)
for (int e = 0; e < nedges; e++)
for (int proc : partop.GetDistantEdgeProcs(e))
@ -767,7 +770,8 @@ namespace netgen
}
}
MyMPI_ExchangeTable (senddata, recvdata, MPI_TAG_CURVE, comm);
// MyMPI_ExchangeTable (senddata, recvdata, MPI_TAG_CURVE, comm);
comm.ExchangeTable (senddata, recvdata, MPI_TAG_CURVE);
NgArray<int> cnt(ntasks);
cnt = 0;
@ -945,7 +949,9 @@ namespace netgen
if (ntasks > 1)
{
// distribute it ...
TABLE<double> senddata(ntasks), recvdata(ntasks);
// TABLE<double> senddata(ntasks), recvdata(ntasks);
DynamicTable<double> senddata(ntasks), recvdata(ntasks);
if (working)
for (int e = 0; e < nedges; e++)
for (int proc : partop.GetDistantEdgeProcs(e))
@ -969,7 +975,9 @@ namespace netgen
}
}
MyMPI_ExchangeTable (senddata, recvdata, MPI_TAG_CURVE, comm);
// MyMPI_ExchangeTable (senddata, recvdata, MPI_TAG_CURVE, comm);
comm.ExchangeTable (senddata, recvdata, MPI_TAG_CURVE);
NgArray<int> cnt(ntasks);
cnt = 0;
if (working)
@ -1145,7 +1153,8 @@ namespace netgen
mesh.GetFaceDescriptor(mesh[i].GetIndex()).SurfNr();
#ifdef PARALLEL
TABLE<int> send_surfnr(ntasks), recv_surfnr(ntasks);
// TABLE<int> send_surfnr(ntasks), recv_surfnr(ntasks);
DynamicTable<int> send_surfnr(ntasks), recv_surfnr(ntasks);
if (ntasks > 1 && working)
{
@ -1155,7 +1164,8 @@ namespace netgen
}
if (ntasks > 1)
MyMPI_ExchangeTable (send_surfnr, recv_surfnr, MPI_TAG_CURVE, comm);
// MyMPI_ExchangeTable (send_surfnr, recv_surfnr, MPI_TAG_CURVE, comm);
comm.ExchangeTable (send_surfnr, recv_surfnr, MPI_TAG_CURVE);
if (ntasks > 1 && working)
{

View File

@ -701,7 +701,7 @@ DLL_HEADER void ExportNetgenMeshing(py::module &m)
}
istream * infile = nullptr;
NgArray<char> buf; // for distributing geometry!
Array<char> buf; // for distributing geometry!
int strs;
if( id == 0) {
@ -796,13 +796,16 @@ DLL_HEADER void ExportNetgenMeshing(py::module &m)
}
if(ntasks>1) {
#ifdef PARALLEL
// #ifdef PARALLEL
/** Scatter the geometry-string (no dummy-implementation in mpi_interface) **/
/*
int strs = buf.Size();
MyMPI_Bcast(strs, comm);
if(strs>0)
MyMPI_Bcast(buf, comm);
#endif
*/
comm.Bcast(buf);
// #endif
}
shared_ptr<NetgenGeometry> geo;

View File

@ -13,7 +13,7 @@
#ifdef PARALLEL
#include <mpi.h>
extern void ParallelRun();
// extern void ParallelRun();
#endif
#include "../libsrc/interface/writeuser.hpp"
@ -281,7 +281,7 @@ int main(int argc, char ** argv)
#ifdef PARALLEL
else
{
ParallelRun();
// ParallelRun();
MPI_Finalize();
}

View File

@ -1,3 +1,6 @@
#ifdef OLDFILE
#ifdef PARALLEL
#include "dlfcn.h"
@ -353,4 +356,7 @@ void ParallelRun()
#endif
#endif