Fix paje trace with MPI and TaskManager

This commit is contained in:
Matthias Hochsteger 2020-08-10 12:20:17 +02:00
parent 87c2901e32
commit 72447a51d5

View File

@ -422,44 +422,48 @@ namespace ngcore
NgMPI_Comm comm(MPI_COMM_WORLD); NgMPI_Comm comm(MPI_COMM_WORLD);
auto rank = comm.Rank(); auto rank = comm.Rank();
auto nranks = comm.Size(); auto nranks = comm.Size();
nthreads = nranks; if(nranks>1)
thread_aliases.reserve(nthreads); {
nthreads = nranks;
thread_aliases.reserve(nthreads);
std::array<char, MPI_MAX_PROCESSOR_NAME+1> ahostname; std::array<char, MPI_MAX_PROCESSOR_NAME+1> ahostname;
int len; int len;
MPI_Get_processor_name(ahostname.data(), &len); MPI_Get_processor_name(ahostname.data(), &len);
std::string hostname = ahostname.data(); std::string hostname = ahostname.data();
std::map<std::string, int> host_map; std::map<std::string, int> host_map;
std::string name; std::string name;
for(auto i : IntRange(0, nranks)) for(auto i : IntRange(0, nranks))
{ {
if(i!=MPI_PAJE_WRITER) if(i!=MPI_PAJE_WRITER)
comm.Recv(name, i, 0); comm.Recv(name, i, 0);
else else
name = hostname; name = hostname;
if(host_map.count(name)==0) if(host_map.count(name)==0)
{ {
host_map[name] = container_nodes.size(); host_map[name] = container_nodes.size();
container_nodes.emplace_back( paje.CreateContainer( container_type_node, container_task_manager, name) ); container_nodes.emplace_back( paje.CreateContainer( container_type_node, container_task_manager, name) );
} }
thread_aliases.emplace_back( paje.CreateContainer( container_type_thread, container_nodes[host_map[name]], "Rank " + ToString(i) ) ); thread_aliases.emplace_back( paje.CreateContainer( container_type_thread, container_nodes[host_map[name]], "Rank " + ToString(i) ) );
} }
}
#else // PARALLEL else
container_nodes.reserve(num_nodes); #endif // PARALLEL
for(int i=0; i<num_nodes; i++) {
container_nodes.reserve(num_nodes);
for(int i=0; i<num_nodes; i++)
container_nodes.emplace_back( paje.CreateContainer( container_type_node, container_task_manager, "Node " + ToString(i)) ); container_nodes.emplace_back( paje.CreateContainer( container_type_node, container_task_manager, "Node " + ToString(i)) );
thread_aliases.reserve(nthreads); thread_aliases.reserve(nthreads);
if(trace_threads) if(trace_threads)
for (int i=0; i<nthreads; i++) for (int i=0; i<nthreads; i++)
{ {
auto name = "Thread " + ToString(i); auto name = "Thread " + ToString(i);
thread_aliases.emplace_back( paje.CreateContainer( container_type_thread, container_nodes[i*num_nodes/nthreads], name ) ); thread_aliases.emplace_back( paje.CreateContainer( container_type_thread, container_nodes[i*num_nodes/nthreads], name ) );
} }
#endif // PARALLEL }
std::map<const std::type_info *, int> job_map; std::map<const std::type_info *, int> job_map;
std::map<const std::type_info *, int> job_task_map; std::map<const std::type_info *, int> job_task_map;
@ -496,10 +500,12 @@ namespace ngcore
timer_names[id] = NgProfiler::GetName(id); timer_names[id] = NgProfiler::GetName(id);
#ifdef PARALLEL #ifdef PARALLEL
for(auto src : IntRange(0, nranks)) if(nranks>1)
{
for(auto src : IntRange(0, nranks))
{ {
if(src==MPI_PAJE_WRITER) if(src==MPI_PAJE_WRITER)
continue; continue;
size_t n_timers; size_t n_timers;
comm.Recv (n_timers, src, 0); comm.Recv (n_timers, src, 0);
@ -507,13 +513,14 @@ namespace ngcore
int id; int id;
std::string name; std::string name;
for(auto i : IntRange(n_timers)) for(auto i : IntRange(n_timers))
{ {
comm.Recv (id, src, 0); comm.Recv (id, src, 0);
comm.Recv (name, src, 0); comm.Recv (name, src, 0);
timer_ids.insert(id); timer_ids.insert(id);
timer_names[id] = name; timer_names[id] = name;
} }
} }
}
#endif // PARALLEL #endif // PARALLEL
for(auto id : timer_ids) for(auto id : timer_ids)
@ -583,7 +590,9 @@ namespace ngcore
} }
#ifdef PARALLEL #ifdef PARALLEL
for(auto & event : timer_events) if(nranks>1)
{
for(auto & event : timer_events)
{ {
if(event.is_start) if(event.is_start)
paje.PushState( event.time, state_type_timer, thread_aliases[MPI_PAJE_WRITER], timer_aliases[event.timer_id] ); paje.PushState( event.time, state_type_timer, thread_aliases[MPI_PAJE_WRITER], timer_aliases[event.timer_id] );
@ -591,16 +600,16 @@ namespace ngcore
paje.PopState( event.time, state_type_timer, thread_aliases[MPI_PAJE_WRITER] ); paje.PopState( event.time, state_type_timer, thread_aliases[MPI_PAJE_WRITER] );
} }
// Timer events // Timer events
Array<int> timer_id; Array<int> timer_id;
Array<TTimePoint> time; Array<TTimePoint> time;
Array<bool> is_start; Array<bool> is_start;
Array<int> thread_id; Array<int> thread_id;
for(auto src : IntRange(0, nranks)) for(auto src : IntRange(0, nranks))
{ {
if(src==MPI_PAJE_WRITER) if(src==MPI_PAJE_WRITER)
continue; continue;
comm.Recv (timer_id, src, 0); comm.Recv (timer_id, src, 0);
comm.Recv (time, src, 0); comm.Recv (time, src, 0);
@ -608,19 +617,20 @@ namespace ngcore
comm.Recv (thread_id, src, 0); comm.Recv (thread_id, src, 0);
for(auto i : Range(timer_id.Size())) for(auto i : Range(timer_id.Size()))
{ {
TimerEvent event; TimerEvent event;
event.timer_id = timer_id[i]; event.timer_id = timer_id[i];
event.time = time[i]; event.time = time[i];
event.is_start = is_start[i]; event.is_start = is_start[i];
event.thread_id = thread_id[i]; event.thread_id = thread_id[i];
if(event.is_start) if(event.is_start)
paje.PushState( event.time, state_type_timer, thread_aliases[src], timer_aliases[event.timer_id] ); paje.PushState( event.time, state_type_timer, thread_aliases[src], timer_aliases[event.timer_id] );
else else
paje.PopState( event.time, state_type_timer, thread_aliases[src] ); paje.PopState( event.time, state_type_timer, thread_aliases[src] );
} }
} }
}
#endif // PARALLEL #endif // PARALLEL
// Merge link event // Merge link event
@ -818,7 +828,7 @@ namespace ngcore
std::sort (events.begin(), events.end()); std::sort (events.begin(), events.end());
root.time = 1000.0*static_cast<double>(stop_time-start_time) * seconds_per_tick; root.time = 1000.0*static_cast<double>(stop_time) * seconds_per_tick;
for(auto & event : events) for(auto & event : events)
{ {