diff --git a/libsrc/core/paje_trace.cpp b/libsrc/core/paje_trace.cpp index 5c52a378..88601054 100644 --- a/libsrc/core/paje_trace.cpp +++ b/libsrc/core/paje_trace.cpp @@ -8,6 +8,7 @@ #include "archive.hpp" // for Demangle #include "paje_trace.hpp" #include "profiler.hpp" +#include "mpi_wrapper.hpp" extern const char *header; @@ -23,7 +24,6 @@ namespace ngcore PajeTrace :: PajeTrace(int anthreads, std::string aname) { - start_time = GetTimeCounter(); nthreads = anthreads; tracefile_name = std::move(aname); @@ -48,13 +48,40 @@ namespace ngcore jobs.reserve(reserve_size); timer_events.reserve(reserve_size); + // sync start time when running in parallel + NgMPI_Comm comm(MPI_COMM_WORLD); + for(auto i : Range(5)) + comm.Barrier(); + + start_time = GetTimeCounter(); tracing_enabled = true; } PajeTrace :: ~PajeTrace() { - if(!tracefile_name.empty()) + for(auto & ltask : tasks) + for(auto & task : ltask) + { + task.start_time -= start_time; + task.stop_time -= start_time; + } + for(auto & job : jobs) + { + job.start_time -= start_time; + job.stop_time -= start_time; + } + for(auto & event : timer_events) + event.time -= start_time; + + for(auto & llink : links) + for(auto & link : llink) + link.time -= start_time; + + NgMPI_Comm comm(MPI_COMM_WORLD); + if(comm.Rank() == 0) Write(tracefile_name); + else + SendData(); } @@ -90,7 +117,6 @@ namespace ngcore int alias_counter; FILE * ctrace_stream; - TTimePoint start_time; std::shared_ptr logger = GetLogger("PajeTrace"); @@ -98,7 +124,7 @@ namespace ngcore // return time in milliseconds as double // return std::chrono::duration(t-start_time).count()*1000.0; // return std::chrono::duration(t-start_time).count() / 2.7e3; - return 1000.0*static_cast(t-start_time) * seconds_per_tick; + return 1000.0*static_cast(t) * seconds_per_tick; } enum PType @@ -180,9 +206,8 @@ namespace ngcore void operator=(const PajeFile &) = delete; void operator=(PajeFile &&) = delete; - PajeFile( const std::string & filename, TTimePoint astart_time ) + PajeFile( const std::string & filename) { - start_time = astart_time; ctrace_stream = fopen (filename.c_str(),"w"); // NOLINT fprintf(ctrace_stream, "%s", header ); // NOLINT alias_counter = 0; @@ -365,7 +390,7 @@ namespace ngcore logger->warn("Tracing stopped during computation due to tracefile size limit of {} megabytes.", max_tracefile_size/1024/1024); } - PajeFile paje(filename, start_time); + PajeFile paje(filename); const int container_type_task_manager = paje.DefineContainerType( 0, "Task Manager" ); const int container_type_node = paje.DefineContainerType( container_type_task_manager, "Node"); @@ -381,16 +406,48 @@ namespace ngcore const int container_task_manager = paje.CreateContainer( container_type_task_manager, 0, "The task manager" ); const int container_jobs = paje.CreateContainer( container_type_jobs, container_task_manager, "Jobs" ); - paje.SetVariable( start_time, variable_type_active_threads, container_jobs, 0.0 ); - - const int num_nodes = 1; //task_manager ? task_manager->GetNumNodes() : 1; + paje.SetVariable( 0, variable_type_active_threads, container_jobs, 0.0 ); + int num_nodes = 1; //task_manager ? task_manager->GetNumNodes() : 1; + std::vector thread_aliases; std::vector container_nodes; + +#ifdef PARALLEL + // Hostnames + NgMPI_Comm comm(MPI_COMM_WORLD); + auto rank = comm.Rank(); + auto nranks = comm.Size(); + nthreads = nranks; + thread_aliases.reserve(nthreads); + + std::array ahostname; + int len; + MPI_Get_processor_name(ahostname.data(), &len); + std::string hostname = ahostname.data(); + + std::map host_map; + + host_map[hostname] = container_nodes.size(); + container_nodes.emplace_back( paje.CreateContainer( container_type_node, container_task_manager, hostname) ); + thread_aliases.emplace_back( paje.CreateContainer( container_type_thread, container_nodes[host_map[hostname]], "Rank 0" ) ); + + std::string name; + for(auto i : IntRange(1, nranks)) + { + comm.Recv(name, i, 0); + if(host_map.count(name)==0) + { + host_map[name] = container_nodes.size(); + container_nodes.emplace_back( paje.CreateContainer( container_type_node, container_task_manager, name) ); + } + thread_aliases.emplace_back( paje.CreateContainer( container_type_thread, container_nodes[host_map[name]], "Rank " + ToString(i) ) ); + } + +#else // PARALLEL container_nodes.reserve(num_nodes); for(int i=0; i thread_aliases; thread_aliases.reserve(nthreads); if(trace_threads) for (int i=0; i job_map; std::map job_task_map; @@ -418,18 +476,41 @@ namespace ngcore std::set timer_ids; std::map timer_aliases; + std::map timer_names; for(auto & event : timer_events) - timer_ids.insert(event.timer_id); + timer_ids.insert(event.timer_id); + // Timer names for(auto & vtasks : tasks) - for (Task & t : vtasks) - if(t.id_type==Task::ID_TIMER) - timer_ids.insert(t.id); + for (Task & t : vtasks) + if(t.id_type==Task::ID_TIMER) + timer_ids.insert(t.id); for(auto id : timer_ids) - timer_aliases[id] = paje.DefineEntityValue( state_type_timer, NgProfiler::GetName(id), -1 ); + timer_names[id] = NgProfiler::GetName(id); + +#ifdef PARALLEL + for(auto src : IntRange(1, nranks)) + { + size_t n_timers; + comm.Recv (n_timers, src, 0); + + int id; + std::string name; + for(auto i : IntRange(n_timers)) + { + comm.Recv (id, src, 0); + comm.Recv (name, src, 0); + timer_ids.insert(id); + timer_names[id] = name; + } + } +#endif // PARALLEL + + for(auto id : timer_ids) + timer_aliases[id] = paje.DefineEntityValue( state_type_timer, timer_names[id], -1 ); int timerdepth = 0; int maxdepth = 0; @@ -494,6 +575,44 @@ namespace ngcore } } +#ifdef PARALLEL + for(auto & event : timer_events) + { + if(event.is_start) + paje.PushState( event.time, state_type_timer, thread_aliases[0], timer_aliases[event.timer_id] ); + else + paje.PopState( event.time, state_type_timer, thread_aliases[0] ); + } + + // Timer events + Array timer_id; + Array time; + Array is_start; + Array thread_id; + + for(auto src : IntRange(1, nranks)) + { + comm.Recv (timer_id, src, 0); + comm.Recv (time, src, 0); + comm.Recv (is_start, src, 0); + comm.Recv (thread_id, src, 0); + + for(auto i : Range(timer_id.Size())) + { + TimerEvent event; + event.timer_id = timer_id[i]; + event.time = time[i]; + event.is_start = is_start[i]; + event.thread_id = thread_id[i]; + + if(event.is_start) + paje.PushState( event.time, state_type_timer, thread_aliases[src], timer_aliases[event.timer_id] ); + else + paje.PopState( event.time, state_type_timer, thread_aliases[src] ); + } + } +#endif // PARALLEL + // Merge link event int nlinks = 0; for( auto & l : links) @@ -556,6 +675,66 @@ namespace ngcore paje.WriteEvents(); } + void PajeTrace::SendData( ) + { +#ifdef PARALLEL + // Hostname + NgMPI_Comm comm(MPI_COMM_WORLD); + auto rank = comm.Rank(); + auto nranks = comm.Size(); + + std::string hostname; + { + std::array ahostname; + int len; + MPI_Get_processor_name(ahostname.data(), &len); + hostname = ahostname.data(); + } + + comm.Send(hostname, 0, 0); + + // Timer names + std::set timer_ids; + std::map timer_names; + + for(auto & event : timer_events) + { + event.timer_id += NgProfiler::SIZE*rank; + timer_ids.insert(event.timer_id); + } + + for(auto id : timer_ids) + timer_names[id] = NgProfiler::GetName(id-NgProfiler::SIZE*rank); + size_t size = timer_ids.size(); + comm.Send(size, 0, 0); + for(auto id : timer_ids) + { + comm.Send(id, 0, 0); + comm.Send(timer_names[id], 0, 0); + } + + + // Timer events + Array timer_id; + Array time; + Array is_start; + Array thread_id; + + for(auto & event : timer_events) + { + timer_id.Append(event.timer_id); + time.Append(event.time); + is_start.Append(event.is_start); + thread_id.Append(event.thread_id); + } + + comm.Send (timer_id, 0, 0); + comm.Send (time, 0, 0); + comm.Send (is_start, 0, 0); + comm.Send (thread_id, 0, 0); +#endif // PARALLEL + } + /////////////////////////////////////////////////////////////////// // Write HTML file drawing a sunburst chart with cumulated timings struct TreeNode diff --git a/libsrc/core/paje_trace.hpp b/libsrc/core/paje_trace.hpp index 1d656ca3..95c42d4a 100644 --- a/libsrc/core/paje_trace.hpp +++ b/libsrc/core/paje_trace.hpp @@ -185,6 +185,8 @@ namespace ngcore void Write( const std::string & filename ); + void SendData(); // MPI parallel data reduction + }; } // namespace ngcore diff --git a/libsrc/core/taskmanager.cpp b/libsrc/core/taskmanager.cpp index e730e55c..ef38a277 100644 --- a/libsrc/core/taskmanager.cpp +++ b/libsrc/core/taskmanager.cpp @@ -159,22 +159,8 @@ namespace ngcore active_workers = 0; static int cnt = 0; - char buf[100]; if (use_paje_trace) - { -#ifdef PARALLEL - int is_init = -1; - MPI_Initialized(&is_init); - if (is_init) - sprintf(buf, "ng%d_rank%d.trace", cnt++, NgMPI_Comm(MPI_COMM_WORLD).Rank()); - else -#endif - sprintf(buf, "ng%d.trace", cnt++); - } - else - buf[0] = 0; - //sprintf(buf, ""); - trace = new PajeTrace(num_threads, buf); + trace = new PajeTrace(num_threads, "ng" + ToString(cnt++) + ".trace"); } @@ -349,7 +335,8 @@ namespace ngcore } - trace->StartJob(jobnr, afunc.target_type()); + if (use_paje_trace) + trace->StartJob(jobnr, afunc.target_type()); func = &afunc; @@ -419,7 +406,8 @@ namespace ngcore if (ex) throw Exception (*ex); - trace->StopJob(); + if (use_paje_trace) + trace->StopJob(); } void TaskManager :: Loop(int thd)