mirror of
https://github.com/NGSolve/netgen.git
synced 2024-12-24 21:10:33 +05:00
Merge traces with MPI
This commit is contained in:
parent
620b90fbee
commit
acfe9bb606
@ -8,6 +8,7 @@
|
|||||||
#include "archive.hpp" // for Demangle
|
#include "archive.hpp" // for Demangle
|
||||||
#include "paje_trace.hpp"
|
#include "paje_trace.hpp"
|
||||||
#include "profiler.hpp"
|
#include "profiler.hpp"
|
||||||
|
#include "mpi_wrapper.hpp"
|
||||||
|
|
||||||
extern const char *header;
|
extern const char *header;
|
||||||
|
|
||||||
@ -23,7 +24,6 @@ namespace ngcore
|
|||||||
|
|
||||||
PajeTrace :: PajeTrace(int anthreads, std::string aname)
|
PajeTrace :: PajeTrace(int anthreads, std::string aname)
|
||||||
{
|
{
|
||||||
start_time = GetTimeCounter();
|
|
||||||
|
|
||||||
nthreads = anthreads;
|
nthreads = anthreads;
|
||||||
tracefile_name = std::move(aname);
|
tracefile_name = std::move(aname);
|
||||||
@ -48,13 +48,40 @@ namespace ngcore
|
|||||||
jobs.reserve(reserve_size);
|
jobs.reserve(reserve_size);
|
||||||
timer_events.reserve(reserve_size);
|
timer_events.reserve(reserve_size);
|
||||||
|
|
||||||
|
// sync start time when running in parallel
|
||||||
|
NgMPI_Comm comm(MPI_COMM_WORLD);
|
||||||
|
for(auto i : Range(5))
|
||||||
|
comm.Barrier();
|
||||||
|
|
||||||
|
start_time = GetTimeCounter();
|
||||||
tracing_enabled = true;
|
tracing_enabled = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
PajeTrace :: ~PajeTrace()
|
PajeTrace :: ~PajeTrace()
|
||||||
{
|
{
|
||||||
if(!tracefile_name.empty())
|
for(auto & ltask : tasks)
|
||||||
|
for(auto & task : ltask)
|
||||||
|
{
|
||||||
|
task.start_time -= start_time;
|
||||||
|
task.stop_time -= start_time;
|
||||||
|
}
|
||||||
|
for(auto & job : jobs)
|
||||||
|
{
|
||||||
|
job.start_time -= start_time;
|
||||||
|
job.stop_time -= start_time;
|
||||||
|
}
|
||||||
|
for(auto & event : timer_events)
|
||||||
|
event.time -= start_time;
|
||||||
|
|
||||||
|
for(auto & llink : links)
|
||||||
|
for(auto & link : llink)
|
||||||
|
link.time -= start_time;
|
||||||
|
|
||||||
|
NgMPI_Comm comm(MPI_COMM_WORLD);
|
||||||
|
if(comm.Rank() == 0)
|
||||||
Write(tracefile_name);
|
Write(tracefile_name);
|
||||||
|
else
|
||||||
|
SendData();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -90,7 +117,6 @@ namespace ngcore
|
|||||||
int alias_counter;
|
int alias_counter;
|
||||||
|
|
||||||
FILE * ctrace_stream;
|
FILE * ctrace_stream;
|
||||||
TTimePoint start_time;
|
|
||||||
std::shared_ptr<Logger> logger = GetLogger("PajeTrace");
|
std::shared_ptr<Logger> logger = GetLogger("PajeTrace");
|
||||||
|
|
||||||
|
|
||||||
@ -98,7 +124,7 @@ namespace ngcore
|
|||||||
// return time in milliseconds as double
|
// return time in milliseconds as double
|
||||||
// return std::chrono::duration<double>(t-start_time).count()*1000.0;
|
// return std::chrono::duration<double>(t-start_time).count()*1000.0;
|
||||||
// return std::chrono::duration<double>(t-start_time).count() / 2.7e3;
|
// return std::chrono::duration<double>(t-start_time).count() / 2.7e3;
|
||||||
return 1000.0*static_cast<double>(t-start_time) * seconds_per_tick;
|
return 1000.0*static_cast<double>(t) * seconds_per_tick;
|
||||||
}
|
}
|
||||||
|
|
||||||
enum PType
|
enum PType
|
||||||
@ -180,9 +206,8 @@ namespace ngcore
|
|||||||
void operator=(const PajeFile &) = delete;
|
void operator=(const PajeFile &) = delete;
|
||||||
void operator=(PajeFile &&) = delete;
|
void operator=(PajeFile &&) = delete;
|
||||||
|
|
||||||
PajeFile( const std::string & filename, TTimePoint astart_time )
|
PajeFile( const std::string & filename)
|
||||||
{
|
{
|
||||||
start_time = astart_time;
|
|
||||||
ctrace_stream = fopen (filename.c_str(),"w"); // NOLINT
|
ctrace_stream = fopen (filename.c_str(),"w"); // NOLINT
|
||||||
fprintf(ctrace_stream, "%s", header ); // NOLINT
|
fprintf(ctrace_stream, "%s", header ); // NOLINT
|
||||||
alias_counter = 0;
|
alias_counter = 0;
|
||||||
@ -365,7 +390,7 @@ namespace ngcore
|
|||||||
logger->warn("Tracing stopped during computation due to tracefile size limit of {} megabytes.", max_tracefile_size/1024/1024);
|
logger->warn("Tracing stopped during computation due to tracefile size limit of {} megabytes.", max_tracefile_size/1024/1024);
|
||||||
}
|
}
|
||||||
|
|
||||||
PajeFile paje(filename, start_time);
|
PajeFile paje(filename);
|
||||||
|
|
||||||
const int container_type_task_manager = paje.DefineContainerType( 0, "Task Manager" );
|
const int container_type_task_manager = paje.DefineContainerType( 0, "Task Manager" );
|
||||||
const int container_type_node = paje.DefineContainerType( container_type_task_manager, "Node");
|
const int container_type_node = paje.DefineContainerType( container_type_task_manager, "Node");
|
||||||
@ -381,16 +406,48 @@ namespace ngcore
|
|||||||
|
|
||||||
const int container_task_manager = paje.CreateContainer( container_type_task_manager, 0, "The task manager" );
|
const int container_task_manager = paje.CreateContainer( container_type_task_manager, 0, "The task manager" );
|
||||||
const int container_jobs = paje.CreateContainer( container_type_jobs, container_task_manager, "Jobs" );
|
const int container_jobs = paje.CreateContainer( container_type_jobs, container_task_manager, "Jobs" );
|
||||||
paje.SetVariable( start_time, variable_type_active_threads, container_jobs, 0.0 );
|
paje.SetVariable( 0, variable_type_active_threads, container_jobs, 0.0 );
|
||||||
|
|
||||||
const int num_nodes = 1; //task_manager ? task_manager->GetNumNodes() : 1;
|
|
||||||
|
|
||||||
|
int num_nodes = 1; //task_manager ? task_manager->GetNumNodes() : 1;
|
||||||
|
std::vector <int> thread_aliases;
|
||||||
std::vector<int> container_nodes;
|
std::vector<int> container_nodes;
|
||||||
|
|
||||||
|
#ifdef PARALLEL
|
||||||
|
// Hostnames
|
||||||
|
NgMPI_Comm comm(MPI_COMM_WORLD);
|
||||||
|
auto rank = comm.Rank();
|
||||||
|
auto nranks = comm.Size();
|
||||||
|
nthreads = nranks;
|
||||||
|
thread_aliases.reserve(nthreads);
|
||||||
|
|
||||||
|
std::array<char, MPI_MAX_PROCESSOR_NAME+1> ahostname;
|
||||||
|
int len;
|
||||||
|
MPI_Get_processor_name(ahostname.data(), &len);
|
||||||
|
std::string hostname = ahostname.data();
|
||||||
|
|
||||||
|
std::map<std::string, int> host_map;
|
||||||
|
|
||||||
|
host_map[hostname] = container_nodes.size();
|
||||||
|
container_nodes.emplace_back( paje.CreateContainer( container_type_node, container_task_manager, hostname) );
|
||||||
|
thread_aliases.emplace_back( paje.CreateContainer( container_type_thread, container_nodes[host_map[hostname]], "Rank 0" ) );
|
||||||
|
|
||||||
|
std::string name;
|
||||||
|
for(auto i : IntRange(1, nranks))
|
||||||
|
{
|
||||||
|
comm.Recv(name, i, 0);
|
||||||
|
if(host_map.count(name)==0)
|
||||||
|
{
|
||||||
|
host_map[name] = container_nodes.size();
|
||||||
|
container_nodes.emplace_back( paje.CreateContainer( container_type_node, container_task_manager, name) );
|
||||||
|
}
|
||||||
|
thread_aliases.emplace_back( paje.CreateContainer( container_type_thread, container_nodes[host_map[name]], "Rank " + ToString(i) ) );
|
||||||
|
}
|
||||||
|
|
||||||
|
#else // PARALLEL
|
||||||
container_nodes.reserve(num_nodes);
|
container_nodes.reserve(num_nodes);
|
||||||
for(int i=0; i<num_nodes; i++)
|
for(int i=0; i<num_nodes; i++)
|
||||||
container_nodes.emplace_back( paje.CreateContainer( container_type_node, container_task_manager, "Node " + ToString(i)) );
|
container_nodes.emplace_back( paje.CreateContainer( container_type_node, container_task_manager, "Node " + ToString(i)) );
|
||||||
|
|
||||||
std::vector <int> thread_aliases;
|
|
||||||
thread_aliases.reserve(nthreads);
|
thread_aliases.reserve(nthreads);
|
||||||
if(trace_threads)
|
if(trace_threads)
|
||||||
for (int i=0; i<nthreads; i++)
|
for (int i=0; i<nthreads; i++)
|
||||||
@ -398,6 +455,7 @@ namespace ngcore
|
|||||||
auto name = "Thread " + ToString(i);
|
auto name = "Thread " + ToString(i);
|
||||||
thread_aliases.emplace_back( paje.CreateContainer( container_type_thread, container_nodes[i*num_nodes/nthreads], name ) );
|
thread_aliases.emplace_back( paje.CreateContainer( container_type_thread, container_nodes[i*num_nodes/nthreads], name ) );
|
||||||
}
|
}
|
||||||
|
#endif // PARALLEL
|
||||||
|
|
||||||
std::map<const std::type_info *, int> job_map;
|
std::map<const std::type_info *, int> job_map;
|
||||||
std::map<const std::type_info *, int> job_task_map;
|
std::map<const std::type_info *, int> job_task_map;
|
||||||
@ -418,18 +476,41 @@ namespace ngcore
|
|||||||
|
|
||||||
std::set<int> timer_ids;
|
std::set<int> timer_ids;
|
||||||
std::map<int,int> timer_aliases;
|
std::map<int,int> timer_aliases;
|
||||||
|
std::map<int,std::string> timer_names;
|
||||||
|
|
||||||
for(auto & event : timer_events)
|
for(auto & event : timer_events)
|
||||||
timer_ids.insert(event.timer_id);
|
timer_ids.insert(event.timer_id);
|
||||||
|
|
||||||
|
|
||||||
|
// Timer names
|
||||||
for(auto & vtasks : tasks)
|
for(auto & vtasks : tasks)
|
||||||
for (Task & t : vtasks)
|
for (Task & t : vtasks)
|
||||||
if(t.id_type==Task::ID_TIMER)
|
if(t.id_type==Task::ID_TIMER)
|
||||||
timer_ids.insert(t.id);
|
timer_ids.insert(t.id);
|
||||||
|
|
||||||
for(auto id : timer_ids)
|
for(auto id : timer_ids)
|
||||||
timer_aliases[id] = paje.DefineEntityValue( state_type_timer, NgProfiler::GetName(id), -1 );
|
timer_names[id] = NgProfiler::GetName(id);
|
||||||
|
|
||||||
|
#ifdef PARALLEL
|
||||||
|
for(auto src : IntRange(1, nranks))
|
||||||
|
{
|
||||||
|
size_t n_timers;
|
||||||
|
comm.Recv (n_timers, src, 0);
|
||||||
|
|
||||||
|
int id;
|
||||||
|
std::string name;
|
||||||
|
for(auto i : IntRange(n_timers))
|
||||||
|
{
|
||||||
|
comm.Recv (id, src, 0);
|
||||||
|
comm.Recv (name, src, 0);
|
||||||
|
timer_ids.insert(id);
|
||||||
|
timer_names[id] = name;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif // PARALLEL
|
||||||
|
|
||||||
|
for(auto id : timer_ids)
|
||||||
|
timer_aliases[id] = paje.DefineEntityValue( state_type_timer, timer_names[id], -1 );
|
||||||
|
|
||||||
int timerdepth = 0;
|
int timerdepth = 0;
|
||||||
int maxdepth = 0;
|
int maxdepth = 0;
|
||||||
@ -494,6 +575,44 @@ namespace ngcore
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifdef PARALLEL
|
||||||
|
for(auto & event : timer_events)
|
||||||
|
{
|
||||||
|
if(event.is_start)
|
||||||
|
paje.PushState( event.time, state_type_timer, thread_aliases[0], timer_aliases[event.timer_id] );
|
||||||
|
else
|
||||||
|
paje.PopState( event.time, state_type_timer, thread_aliases[0] );
|
||||||
|
}
|
||||||
|
|
||||||
|
// Timer events
|
||||||
|
Array<int> timer_id;
|
||||||
|
Array<TTimePoint> time;
|
||||||
|
Array<bool> is_start;
|
||||||
|
Array<int> thread_id;
|
||||||
|
|
||||||
|
for(auto src : IntRange(1, nranks))
|
||||||
|
{
|
||||||
|
comm.Recv (timer_id, src, 0);
|
||||||
|
comm.Recv (time, src, 0);
|
||||||
|
comm.Recv (is_start, src, 0);
|
||||||
|
comm.Recv (thread_id, src, 0);
|
||||||
|
|
||||||
|
for(auto i : Range(timer_id.Size()))
|
||||||
|
{
|
||||||
|
TimerEvent event;
|
||||||
|
event.timer_id = timer_id[i];
|
||||||
|
event.time = time[i];
|
||||||
|
event.is_start = is_start[i];
|
||||||
|
event.thread_id = thread_id[i];
|
||||||
|
|
||||||
|
if(event.is_start)
|
||||||
|
paje.PushState( event.time, state_type_timer, thread_aliases[src], timer_aliases[event.timer_id] );
|
||||||
|
else
|
||||||
|
paje.PopState( event.time, state_type_timer, thread_aliases[src] );
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif // PARALLEL
|
||||||
|
|
||||||
// Merge link event
|
// Merge link event
|
||||||
int nlinks = 0;
|
int nlinks = 0;
|
||||||
for( auto & l : links)
|
for( auto & l : links)
|
||||||
@ -556,6 +675,66 @@ namespace ngcore
|
|||||||
paje.WriteEvents();
|
paje.WriteEvents();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void PajeTrace::SendData( )
|
||||||
|
{
|
||||||
|
#ifdef PARALLEL
|
||||||
|
// Hostname
|
||||||
|
NgMPI_Comm comm(MPI_COMM_WORLD);
|
||||||
|
auto rank = comm.Rank();
|
||||||
|
auto nranks = comm.Size();
|
||||||
|
|
||||||
|
std::string hostname;
|
||||||
|
{
|
||||||
|
std::array<char, MPI_MAX_PROCESSOR_NAME+1> ahostname;
|
||||||
|
int len;
|
||||||
|
MPI_Get_processor_name(ahostname.data(), &len);
|
||||||
|
hostname = ahostname.data();
|
||||||
|
}
|
||||||
|
|
||||||
|
comm.Send(hostname, 0, 0);
|
||||||
|
|
||||||
|
// Timer names
|
||||||
|
std::set<int> timer_ids;
|
||||||
|
std::map<int,std::string> timer_names;
|
||||||
|
|
||||||
|
for(auto & event : timer_events)
|
||||||
|
{
|
||||||
|
event.timer_id += NgProfiler::SIZE*rank;
|
||||||
|
timer_ids.insert(event.timer_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
for(auto id : timer_ids)
|
||||||
|
timer_names[id] = NgProfiler::GetName(id-NgProfiler::SIZE*rank);
|
||||||
|
size_t size = timer_ids.size();
|
||||||
|
comm.Send(size, 0, 0);
|
||||||
|
for(auto id : timer_ids)
|
||||||
|
{
|
||||||
|
comm.Send(id, 0, 0);
|
||||||
|
comm.Send(timer_names[id], 0, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Timer events
|
||||||
|
Array<int> timer_id;
|
||||||
|
Array<TTimePoint> time;
|
||||||
|
Array<bool> is_start;
|
||||||
|
Array<int> thread_id;
|
||||||
|
|
||||||
|
for(auto & event : timer_events)
|
||||||
|
{
|
||||||
|
timer_id.Append(event.timer_id);
|
||||||
|
time.Append(event.time);
|
||||||
|
is_start.Append(event.is_start);
|
||||||
|
thread_id.Append(event.thread_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
comm.Send (timer_id, 0, 0);
|
||||||
|
comm.Send (time, 0, 0);
|
||||||
|
comm.Send (is_start, 0, 0);
|
||||||
|
comm.Send (thread_id, 0, 0);
|
||||||
|
#endif // PARALLEL
|
||||||
|
}
|
||||||
|
|
||||||
///////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////
|
||||||
// Write HTML file drawing a sunburst chart with cumulated timings
|
// Write HTML file drawing a sunburst chart with cumulated timings
|
||||||
struct TreeNode
|
struct TreeNode
|
||||||
|
@ -185,6 +185,8 @@ namespace ngcore
|
|||||||
|
|
||||||
void Write( const std::string & filename );
|
void Write( const std::string & filename );
|
||||||
|
|
||||||
|
void SendData(); // MPI parallel data reduction
|
||||||
|
|
||||||
};
|
};
|
||||||
} // namespace ngcore
|
} // namespace ngcore
|
||||||
|
|
||||||
|
@ -159,22 +159,8 @@ namespace ngcore
|
|||||||
active_workers = 0;
|
active_workers = 0;
|
||||||
|
|
||||||
static int cnt = 0;
|
static int cnt = 0;
|
||||||
char buf[100];
|
|
||||||
if (use_paje_trace)
|
if (use_paje_trace)
|
||||||
{
|
trace = new PajeTrace(num_threads, "ng" + ToString(cnt++) + ".trace");
|
||||||
#ifdef PARALLEL
|
|
||||||
int is_init = -1;
|
|
||||||
MPI_Initialized(&is_init);
|
|
||||||
if (is_init)
|
|
||||||
sprintf(buf, "ng%d_rank%d.trace", cnt++, NgMPI_Comm(MPI_COMM_WORLD).Rank());
|
|
||||||
else
|
|
||||||
#endif
|
|
||||||
sprintf(buf, "ng%d.trace", cnt++);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
buf[0] = 0;
|
|
||||||
//sprintf(buf, "");
|
|
||||||
trace = new PajeTrace(num_threads, buf);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -349,7 +335,8 @@ namespace ngcore
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
trace->StartJob(jobnr, afunc.target_type());
|
if (use_paje_trace)
|
||||||
|
trace->StartJob(jobnr, afunc.target_type());
|
||||||
|
|
||||||
func = &afunc;
|
func = &afunc;
|
||||||
|
|
||||||
@ -419,7 +406,8 @@ namespace ngcore
|
|||||||
if (ex)
|
if (ex)
|
||||||
throw Exception (*ex);
|
throw Exception (*ex);
|
||||||
|
|
||||||
trace->StopJob();
|
if (use_paje_trace)
|
||||||
|
trace->StopJob();
|
||||||
}
|
}
|
||||||
|
|
||||||
void TaskManager :: Loop(int thd)
|
void TaskManager :: Loop(int thd)
|
||||||
|
Loading…
Reference in New Issue
Block a user