mirror of
https://github.com/NGSolve/netgen.git
synced 2025-01-11 21:50:34 +05:00
Add Array, TaskManager and concurrentqueue from NGSolve
Array and TaskManager was mainly developed by Joachim Schöberl. For complete version history, check NGSolve: https://github.com/NGSolve/ngsolve The concurrentqueue is from https://github.com/cameron314/concurrentqueue revision dea078cf5b6e742cd67a0d725e36f872feca4de4
This commit is contained in:
parent
7f7b386388
commit
e1d4cc0410
@ -1,5 +1,5 @@
|
||||
|
||||
add_library(ngcore SHARED archive.cpp logging.cpp paje_trace.cpp utils.cpp profiler.cpp)
|
||||
add_library(ngcore SHARED archive.cpp logging.cpp paje_trace.cpp utils.cpp profiler.cpp taskmanager.cpp)
|
||||
|
||||
target_compile_definitions(ngcore PRIVATE NGCORE_EXPORTS)
|
||||
if(NOT WIN32)
|
||||
@ -34,6 +34,7 @@ endif(USE_PYTHON)
|
||||
|
||||
install(FILES ngcore.hpp archive.hpp type_traits.hpp version.hpp ngcore_api.hpp logging.hpp
|
||||
exception.hpp symboltable.hpp paje_trace.hpp utils.hpp profiler.hpp mpi_wrapper.hpp
|
||||
array.hpp taskmanager.hpp concurrentqueue.h
|
||||
DESTINATION ${NG_INSTALL_DIR_INCLUDE}/core COMPONENT netgen_devel)
|
||||
|
||||
if(ENABLE_CPP_CORE_GUIDELINES_CHECK)
|
||||
|
1475
libsrc/core/array.hpp
Normal file
1475
libsrc/core/array.hpp
Normal file
File diff suppressed because it is too large
Load Diff
3619
libsrc/core/concurrentqueue.h
Normal file
3619
libsrc/core/concurrentqueue.h
Normal file
File diff suppressed because it is too large
Load Diff
@ -2,11 +2,13 @@
|
||||
#define NETGEN_CORE_NGCORE_HPP
|
||||
|
||||
#include "archive.hpp"
|
||||
#include "array.hpp"
|
||||
#include "exception.hpp"
|
||||
#include "logging.hpp"
|
||||
#include "mpi_wrapper.hpp"
|
||||
#include "profiler.hpp"
|
||||
#include "symboltable.hpp"
|
||||
#include "taskmanager.hpp"
|
||||
#include "version.hpp"
|
||||
#include "mpi_wrapper.hpp"
|
||||
|
||||
#endif // NETGEN_CORE_NGCORE_HPP
|
||||
|
812
libsrc/core/taskmanager.cpp
Normal file
812
libsrc/core/taskmanager.cpp
Normal file
@ -0,0 +1,812 @@
|
||||
/********************************************************************/
|
||||
/* File: taskmanager.cpp */
|
||||
/* Author: M. Hochsterger, J. Schoeberl */
|
||||
/* Date: 10. Mar. 2015 */
|
||||
/********************************************************************/
|
||||
|
||||
#include <thread>
|
||||
#include <atomic>
|
||||
#include <mutex>
|
||||
#include <chrono>
|
||||
|
||||
#include "concurrentqueue.h"
|
||||
#include "paje_trace.hpp"
|
||||
#include "profiler.hpp"
|
||||
#include "taskmanager.hpp"
|
||||
|
||||
#ifdef USE_MKL
|
||||
#include <mkl.h>
|
||||
#endif
|
||||
|
||||
|
||||
|
||||
namespace ngcore
|
||||
{
|
||||
using std::mutex;
|
||||
using std::lock_guard;
|
||||
using std::memory_order_release;
|
||||
using std::memory_order_relaxed;
|
||||
using std::make_tuple;
|
||||
|
||||
TaskManager * task_manager = nullptr;
|
||||
bool TaskManager :: use_paje_trace = false;
|
||||
int TaskManager :: max_threads = getenv("NGS_NUM_THREADS") ? atoi(getenv("NGS_NUM_THREADS")) : std::thread::hardware_concurrency();
|
||||
int TaskManager :: num_threads = 1;
|
||||
|
||||
|
||||
#ifndef __clang__
|
||||
thread_local int TaskManager :: thread_id = 0;
|
||||
#else
|
||||
__thread int TaskManager :: thread_id;
|
||||
#endif
|
||||
|
||||
thread_local int thread_id = 0;
|
||||
|
||||
const function<void(TaskInfo&)> * TaskManager::func;
|
||||
const function<void()> * TaskManager::startup_function = nullptr;
|
||||
const function<void()> * TaskManager::cleanup_function = nullptr;
|
||||
|
||||
atomic<int> TaskManager::ntasks;
|
||||
Exception * TaskManager::ex;
|
||||
|
||||
atomic<int> TaskManager::jobnr;
|
||||
|
||||
atomic<int> TaskManager::complete[8]; // max nodes
|
||||
atomic<int> TaskManager::done;
|
||||
atomic<int> TaskManager::active_workers;
|
||||
atomic<int> TaskManager::workers_on_node[8]; // max nodes
|
||||
|
||||
|
||||
int TaskManager::sleep_usecs = 1000;
|
||||
bool TaskManager::sleep = false;
|
||||
|
||||
TaskManager::NodeData *TaskManager::nodedata[8];
|
||||
int TaskManager::num_nodes;
|
||||
|
||||
static mutex copyex_mutex;
|
||||
|
||||
int EnterTaskManager ()
|
||||
{
|
||||
if (task_manager)
|
||||
{
|
||||
// no task manager started
|
||||
return 0;
|
||||
}
|
||||
|
||||
task_manager = new TaskManager();
|
||||
|
||||
// TODO: use logger for output
|
||||
std::cout << "task-based parallelization (C++11 threads) using "<< task_manager->GetNumThreads() << " threads" << std::endl;
|
||||
|
||||
#ifdef USE_NUMA
|
||||
numa_run_on_node (0);
|
||||
#endif
|
||||
|
||||
#ifndef WIN32
|
||||
// master has maximal priority !
|
||||
int policy;
|
||||
struct sched_param param;
|
||||
pthread_getschedparam(pthread_self(), &policy, ¶m);
|
||||
param.sched_priority = sched_get_priority_max(policy);
|
||||
pthread_setschedparam(pthread_self(), policy, ¶m);
|
||||
#endif // WIN32
|
||||
|
||||
|
||||
task_manager->StartWorkers();
|
||||
|
||||
ParallelFor (Range(100), [&] (int i) { ; }); // startup
|
||||
return task_manager->GetNumThreads();
|
||||
}
|
||||
|
||||
|
||||
void ExitTaskManager (int num_threads)
|
||||
{
|
||||
if(num_threads > 0)
|
||||
{
|
||||
task_manager->StopWorkers();
|
||||
delete task_manager;
|
||||
task_manager = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
void RunWithTaskManager (function<void()> alg)
|
||||
{
|
||||
int num_threads = EnterTaskManager();
|
||||
alg();
|
||||
ExitTaskManager(num_threads);
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
void TaskManager :: SetNumThreads(int amax_threads)
|
||||
{
|
||||
if(task_manager && task_manager->active_workers>0)
|
||||
{
|
||||
std::cerr << "Warning: can't change number of threads while TaskManager active!" << std::endl;
|
||||
return;
|
||||
}
|
||||
max_threads = amax_threads;
|
||||
}
|
||||
|
||||
|
||||
TaskManager :: TaskManager()
|
||||
{
|
||||
num_threads = GetMaxThreads();
|
||||
// if (MyMPI_GetNTasks() > 1) num_threads = 1;
|
||||
|
||||
#ifdef USE_NUMA
|
||||
numa_available();
|
||||
num_nodes = numa_max_node() + 1;
|
||||
if (num_nodes > num_threads) num_nodes = num_threads;
|
||||
|
||||
for (int j = 0; j < num_nodes; j++)
|
||||
{
|
||||
void * mem = numa_alloc_onnode (sizeof(NodeData), j);
|
||||
nodedata[j] = new (mem) NodeData;
|
||||
complete[j] = -1;
|
||||
workers_on_node[j] = 0;
|
||||
}
|
||||
#else
|
||||
num_nodes = 1;
|
||||
nodedata[0] = new NodeData;
|
||||
complete[0] = -1;
|
||||
workers_on_node[0] = 0;
|
||||
#endif
|
||||
|
||||
jobnr = 0;
|
||||
done = 0;
|
||||
sleep = false;
|
||||
sleep_usecs = 1000;
|
||||
active_workers = 0;
|
||||
|
||||
static int cnt = 0;
|
||||
char buf[100];
|
||||
if (use_paje_trace)
|
||||
{
|
||||
#ifdef PARALLEL
|
||||
int is_init = -1;
|
||||
MPI_Initialized(&is_init);
|
||||
if (is_init)
|
||||
sprintf(buf, "ng%d_rank%d.trace", cnt++, NgMPI_Comm(MPI_COMM_WORLD).Rank());
|
||||
else
|
||||
#endif
|
||||
sprintf(buf, "ng%d.trace", cnt++);
|
||||
}
|
||||
else
|
||||
buf[0] = 0;
|
||||
//sprintf(buf, "");
|
||||
trace = new PajeTrace(num_threads, buf);
|
||||
}
|
||||
|
||||
|
||||
TaskManager :: ~TaskManager ()
|
||||
{
|
||||
delete trace;
|
||||
trace = nullptr;
|
||||
num_threads = 1;
|
||||
}
|
||||
|
||||
/*
|
||||
int TaskManager :: GetThreadId()
|
||||
{
|
||||
return thread_id;
|
||||
}
|
||||
*/
|
||||
|
||||
void TaskManager :: StartWorkers()
|
||||
{
|
||||
done = false;
|
||||
|
||||
for (int i = 1; i < num_threads; i++)
|
||||
{
|
||||
std::thread([this,i]() { this->Loop(i); }).detach();
|
||||
}
|
||||
thread_id = 0;
|
||||
|
||||
size_t alloc_size = num_threads*NgProfiler::SIZE;
|
||||
NgProfiler::thread_times = new size_t[alloc_size];
|
||||
for (size_t i = 0; i < alloc_size; i++)
|
||||
NgProfiler::thread_times[i] = 0;
|
||||
NgProfiler::thread_flops = new size_t[alloc_size];
|
||||
for (size_t i = 0; i < alloc_size; i++)
|
||||
NgProfiler::thread_flops[i] = 0;
|
||||
|
||||
while (active_workers < num_threads-1)
|
||||
;
|
||||
}
|
||||
|
||||
static size_t calibrate_init_tsc = __rdtsc();
|
||||
typedef std::chrono::system_clock TClock;
|
||||
static TClock::time_point calibrate_init_clock = TClock::now();
|
||||
|
||||
void TaskManager :: StopWorkers()
|
||||
{
|
||||
done = true;
|
||||
double delta_tsc = __rdtsc()-calibrate_init_tsc;
|
||||
double delta_sec = std::chrono::duration<double>(TClock::now()-calibrate_init_clock).count();
|
||||
double frequ = (delta_sec != 0) ? delta_tsc/delta_sec : 2.7e9;
|
||||
|
||||
// cout << "cpu frequ = " << frequ << endl;
|
||||
// collect timings
|
||||
for (size_t i = 0; i < num_threads; i++)
|
||||
for (size_t j = NgProfiler::SIZE; j-- > 0; )
|
||||
{
|
||||
if (!NgProfiler::timers[j].usedcounter) break;
|
||||
NgProfiler::timers[j].tottime += 1.0/frequ * NgProfiler::thread_times[i*NgProfiler::SIZE+j];
|
||||
NgProfiler::timers[j].flops += NgProfiler::thread_flops[i*NgProfiler::SIZE+j];
|
||||
}
|
||||
delete [] NgProfiler::thread_times;
|
||||
NgProfiler::thread_times = NgProfiler::dummy_thread_times.data();
|
||||
delete [] NgProfiler::thread_flops;
|
||||
NgProfiler::thread_flops = NgProfiler::dummy_thread_flops.data();
|
||||
|
||||
while (active_workers)
|
||||
;
|
||||
}
|
||||
|
||||
/////////////////////// NEW: nested tasks using concurrent queue
|
||||
|
||||
struct TNestedTask
|
||||
{
|
||||
const function<void(TaskInfo&)> * func;
|
||||
atomic<int> * endcnt;
|
||||
int mynr;
|
||||
int total;
|
||||
|
||||
TNestedTask () { ; }
|
||||
TNestedTask (const function<void(TaskInfo&)> & _func,
|
||||
int _mynr, int _total,
|
||||
atomic<int> & _endcnt)
|
||||
: func(&_func), mynr(_mynr), total(_total), endcnt(&_endcnt)
|
||||
{
|
||||
;
|
||||
}
|
||||
};
|
||||
|
||||
typedef moodycamel::ConcurrentQueue<TNestedTask> TQueue;
|
||||
typedef moodycamel::ProducerToken TPToken;
|
||||
typedef moodycamel::ConsumerToken TCToken;
|
||||
|
||||
static TQueue taskqueue;
|
||||
|
||||
void AddTask (const function<void(TaskInfo&)> & afunc,
|
||||
atomic<int> & endcnt)
|
||||
|
||||
{
|
||||
TPToken ptoken(taskqueue);
|
||||
|
||||
int num = endcnt;
|
||||
for (int i = 0; i < num; i++)
|
||||
taskqueue.enqueue (ptoken, { afunc, i, num, endcnt });
|
||||
}
|
||||
|
||||
mutex m;
|
||||
bool ProcessTask()
|
||||
{
|
||||
TNestedTask task;
|
||||
TCToken ctoken(taskqueue);
|
||||
|
||||
if (taskqueue.try_dequeue(ctoken, task))
|
||||
{
|
||||
TaskInfo ti;
|
||||
ti.task_nr = task.mynr;
|
||||
ti.ntasks = task.total;
|
||||
ti.thread_nr = thread_id;
|
||||
ti.nthreads = TaskManager::GetNumThreads();
|
||||
/*
|
||||
{
|
||||
lock_guard<mutex> guard(m);
|
||||
cout << "process nested, nr = " << ti.task_nr << "/" << ti.ntasks << endl;
|
||||
}
|
||||
*/
|
||||
(*task.func)(ti);
|
||||
--*task.endcnt;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
void TaskManager :: CreateJob (const function<void(TaskInfo&)> & afunc,
|
||||
int antasks)
|
||||
{
|
||||
if (num_threads == 1 || !task_manager) // || func)
|
||||
{
|
||||
if (startup_function) (*startup_function)();
|
||||
|
||||
TaskInfo ti;
|
||||
ti.ntasks = antasks;
|
||||
ti.thread_nr = 0; ti.nthreads = 1;
|
||||
// ti.node_nr = 0; ti.nnodes = 1;
|
||||
for (ti.task_nr = 0; ti.task_nr < antasks; ti.task_nr++)
|
||||
afunc(ti);
|
||||
|
||||
if (cleanup_function) (*cleanup_function)();
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
if (func)
|
||||
{ // we are already parallel, use nested tasks
|
||||
// startup for inner function not supported ...
|
||||
// if (startup_function) (*startup_function)();
|
||||
|
||||
if (antasks == 1)
|
||||
{
|
||||
TaskInfo ti;
|
||||
ti.task_nr = 0;
|
||||
ti.ntasks = 1;
|
||||
ti.thread_nr = 0; ti.nthreads = 1;
|
||||
afunc(ti);
|
||||
return;
|
||||
}
|
||||
|
||||
atomic<int> endcnt(antasks);
|
||||
AddTask (afunc, endcnt);
|
||||
while (endcnt > 0)
|
||||
{
|
||||
ProcessTask();
|
||||
}
|
||||
|
||||
// if (cleanup_function) (*cleanup_function)();
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
trace->StartJob(jobnr, afunc.target_type());
|
||||
|
||||
func = &afunc;
|
||||
|
||||
ntasks.store (antasks); // , memory_order_relaxed);
|
||||
ex = nullptr;
|
||||
|
||||
|
||||
nodedata[0]->start_cnt.store (0, memory_order_relaxed);
|
||||
|
||||
jobnr++;
|
||||
|
||||
for (int j = 0; j < num_nodes; j++)
|
||||
nodedata[j]->participate |= 1;
|
||||
|
||||
if (startup_function) (*startup_function)();
|
||||
|
||||
int thd = 0;
|
||||
int thds = GetNumThreads();
|
||||
int mynode = num_nodes * thd/thds;
|
||||
|
||||
IntRange mytasks = Range(int(ntasks)).Split (mynode, num_nodes);
|
||||
NodeData & mynode_data = *(nodedata[mynode]);
|
||||
|
||||
TaskInfo ti;
|
||||
ti.nthreads = thds;
|
||||
ti.thread_nr = thd;
|
||||
// ti.nnodes = num_nodes;
|
||||
// ti.node_nr = mynode;
|
||||
|
||||
try
|
||||
{
|
||||
while (1)
|
||||
{
|
||||
int mytask = mynode_data.start_cnt++;
|
||||
if (mytask >= mytasks.Size()) break;
|
||||
|
||||
ti.task_nr = mytasks.First()+mytask;
|
||||
ti.ntasks = ntasks;
|
||||
|
||||
{
|
||||
RegionTracer t(ti.thread_nr, jobnr, RegionTracer::ID_JOB, ti.task_nr);
|
||||
(*func)(ti);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
{
|
||||
lock_guard<mutex> guard(copyex_mutex);
|
||||
delete ex;
|
||||
ex = new Exception (e);
|
||||
mynode_data.start_cnt = mytasks.Size();
|
||||
}
|
||||
}
|
||||
|
||||
if (cleanup_function) (*cleanup_function)();
|
||||
|
||||
for (int j = 0; j < num_nodes; j++)
|
||||
if (workers_on_node[j])
|
||||
{
|
||||
while (complete[j] != jobnr)
|
||||
_mm_pause();
|
||||
}
|
||||
|
||||
func = nullptr;
|
||||
if (ex)
|
||||
throw Exception (*ex);
|
||||
|
||||
trace->StopJob();
|
||||
}
|
||||
|
||||
void TaskManager :: Loop(int thd)
|
||||
{
|
||||
/*
|
||||
static Timer tADD("add entry counter");
|
||||
static Timer tCASready1("spin-CAS ready tick1");
|
||||
static Timer tCASready2("spin-CAS ready tick2");
|
||||
static Timer tCASyield("spin-CAS yield");
|
||||
static Timer tCAS1("spin-CAS wait");
|
||||
static Timer texit("exit zone");
|
||||
static Timer tdec("decrement");
|
||||
*/
|
||||
thread_id = thd;
|
||||
|
||||
int thds = GetNumThreads();
|
||||
|
||||
int mynode = num_nodes * thd/thds;
|
||||
|
||||
NodeData & mynode_data = *(nodedata[mynode]);
|
||||
|
||||
|
||||
|
||||
TaskInfo ti;
|
||||
ti.nthreads = thds;
|
||||
ti.thread_nr = thd;
|
||||
// ti.nnodes = num_nodes;
|
||||
// ti.node_nr = mynode;
|
||||
|
||||
|
||||
#ifdef USE_NUMA
|
||||
numa_run_on_node (mynode);
|
||||
#endif
|
||||
active_workers++;
|
||||
workers_on_node[mynode]++;
|
||||
int jobdone = 0;
|
||||
|
||||
|
||||
#ifdef USE_MKL
|
||||
auto mkl_max = mkl_get_max_threads();
|
||||
mkl_set_num_threads_local(1);
|
||||
#endif
|
||||
|
||||
|
||||
while (!done)
|
||||
{
|
||||
if (complete[mynode] > jobdone)
|
||||
jobdone = complete[mynode];
|
||||
|
||||
if (jobnr == jobdone)
|
||||
{
|
||||
// RegionTracer t(ti.thread_nr, tCASyield, ti.task_nr);
|
||||
while (ProcessTask()); // do the nested tasks
|
||||
|
||||
if(sleep)
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(sleep_usecs));
|
||||
else
|
||||
{
|
||||
#ifdef WIN32
|
||||
std::this_thread::yield();
|
||||
#else // WIN32
|
||||
sched_yield();
|
||||
#endif // WIN32
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
{
|
||||
// RegionTracer t(ti.thread_nr, tADD, ti.task_nr);
|
||||
|
||||
// non-atomic fast check ...
|
||||
if ( (mynode_data.participate & 1) == 0) continue;
|
||||
|
||||
int oldval = mynode_data.participate += 2;
|
||||
if ( (oldval & 1) == 0)
|
||||
{ // job not active, going out again
|
||||
mynode_data.participate -= 2;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (startup_function) (*startup_function)();
|
||||
|
||||
IntRange mytasks = Range(int(ntasks)).Split (mynode, num_nodes);
|
||||
|
||||
try
|
||||
{
|
||||
|
||||
while (1)
|
||||
{
|
||||
if (mynode_data.start_cnt >= mytasks.Size()) break;
|
||||
int mytask = mynode_data.start_cnt.fetch_add(1, memory_order_relaxed);
|
||||
if (mytask >= mytasks.Size()) break;
|
||||
|
||||
ti.task_nr = mytasks.First()+mytask;
|
||||
ti.ntasks = ntasks;
|
||||
|
||||
{
|
||||
RegionTracer t(ti.thread_nr, jobnr, RegionTracer::ID_JOB, ti.task_nr);
|
||||
(*func)(ti);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
{
|
||||
// cout << "got exception in TM" << endl;
|
||||
lock_guard<mutex> guard(copyex_mutex);
|
||||
delete ex;
|
||||
ex = new Exception (e);
|
||||
mynode_data.start_cnt = mytasks.Size();
|
||||
}
|
||||
}
|
||||
|
||||
#ifndef __MIC__
|
||||
atomic_thread_fence (memory_order_release);
|
||||
#endif // __MIC__
|
||||
|
||||
if (cleanup_function) (*cleanup_function)();
|
||||
|
||||
jobdone = jobnr;
|
||||
|
||||
mynode_data.participate-=2;
|
||||
|
||||
{
|
||||
int oldpart = 1;
|
||||
if (mynode_data.participate.compare_exchange_strong (oldpart, 0))
|
||||
{
|
||||
if (jobdone < jobnr.load())
|
||||
{ // reopen gate
|
||||
mynode_data.participate |= 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (mynode != 0)
|
||||
mynode_data.start_cnt = 0;
|
||||
complete[mynode] = jobnr.load();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#ifdef USE_MKL
|
||||
mkl_set_num_threads_local(mkl_max);
|
||||
#endif
|
||||
|
||||
workers_on_node[mynode]--;
|
||||
active_workers--;
|
||||
}
|
||||
|
||||
|
||||
std::list<std::tuple<std::string,double>> TaskManager :: Timing ()
|
||||
{
|
||||
/*
|
||||
list<tuple<string,double>>timings;
|
||||
double time =
|
||||
RunTiming
|
||||
( [&] ()
|
||||
{
|
||||
ParallelJob ( [] (TaskInfo ti) { ; } ,
|
||||
TasksPerThread(1) );
|
||||
});
|
||||
timings.push_back (make_tuple("parallel job with 1 task per thread", time*1e9));
|
||||
|
||||
time =
|
||||
RunTiming
|
||||
( [&] ()
|
||||
{
|
||||
ParallelJob ( [] (TaskInfo ti) { ; } ,
|
||||
TasksPerThread(10) );
|
||||
});
|
||||
timings.push_back (make_tuple("parallel job with 10 tasks per thread", time*1e9));
|
||||
|
||||
time =
|
||||
RunTiming
|
||||
( [&] ()
|
||||
{
|
||||
ParallelJob ( [] (TaskInfo ti) { ; } ,
|
||||
TasksPerThread(100) );
|
||||
});
|
||||
timings.push_back (make_tuple("parallel job with 100 tasks per thread", time*1e9));
|
||||
|
||||
return timings;
|
||||
*/
|
||||
|
||||
|
||||
|
||||
// this is the old function moved from the py-interface:
|
||||
std::list<std::tuple<std::string,double>>timings;
|
||||
double starttime, time;
|
||||
double maxtime = 0.5;
|
||||
size_t steps;
|
||||
|
||||
starttime = WallTime();
|
||||
steps = 0;
|
||||
do
|
||||
{
|
||||
for (size_t i = 0; i < 1000; i++)
|
||||
ParallelJob ( [] (TaskInfo ti) { ; },
|
||||
TasksPerThread(1));
|
||||
steps += 1000;
|
||||
time = WallTime()-starttime;
|
||||
}
|
||||
while (time < maxtime);
|
||||
timings.push_back(make_tuple("ParallelJob 1 task/thread", time/steps*1e9));
|
||||
|
||||
|
||||
starttime = WallTime();
|
||||
steps = 0;
|
||||
do
|
||||
{
|
||||
for (size_t i = 0; i < 1000; i++)
|
||||
ParallelJob ( [] (TaskInfo ti) { ; },
|
||||
TasksPerThread(100));
|
||||
steps += 1000;
|
||||
time = WallTime()-starttime;
|
||||
}
|
||||
while (time < maxtime);
|
||||
timings.push_back(make_tuple("ParallelJob 100 task/thread", time/steps*1e9));
|
||||
|
||||
|
||||
starttime = WallTime();
|
||||
steps = 0;
|
||||
do
|
||||
{
|
||||
for (int k = 0; k < 10000; k++)
|
||||
{
|
||||
SharedLoop2 sl(1000);
|
||||
steps += 1;
|
||||
}
|
||||
time = WallTime()-starttime;
|
||||
}
|
||||
while (time < maxtime);
|
||||
timings.push_back(make_tuple("SharedLoop init", time/steps*1e9));
|
||||
|
||||
starttime = WallTime();
|
||||
steps = 0;
|
||||
do
|
||||
{
|
||||
for (int k = 0; k < 1000; k++)
|
||||
{
|
||||
SharedLoop sl(5);
|
||||
ParallelJob ( [&sl] (TaskInfo ti)
|
||||
{
|
||||
for (auto i : sl)
|
||||
(void)i; // silence warning
|
||||
} );
|
||||
}
|
||||
steps += 1000;
|
||||
time = WallTime()-starttime;
|
||||
}
|
||||
while (time < maxtime);
|
||||
timings.push_back(make_tuple("short SharedLoop", time/steps*1e9));
|
||||
|
||||
|
||||
starttime = WallTime();
|
||||
steps = 0;
|
||||
do
|
||||
{
|
||||
for (int k = 0; k < 1000; k++)
|
||||
{
|
||||
SharedLoop sl1(5), sl2(5), sl3(5), sl4(5), sl5(5);
|
||||
ParallelJob ( [&sl1, &sl2, &sl3, &sl4, &sl5] (TaskInfo ti)
|
||||
{
|
||||
for (auto i : sl1)
|
||||
(void)i; // silence warning
|
||||
for (auto i : sl2)
|
||||
(void)i; // silence warning
|
||||
for (auto i : sl3)
|
||||
(void)i; // silence warning
|
||||
for (auto i : sl4)
|
||||
(void)i; // silence warning
|
||||
for (auto i : sl5)
|
||||
(void)i; // silence warning
|
||||
} );
|
||||
}
|
||||
steps += 1000;
|
||||
time = WallTime()-starttime;
|
||||
}
|
||||
while (time < maxtime);
|
||||
timings.push_back(make_tuple("5 short SharedLoops", time/steps*1e9));
|
||||
|
||||
|
||||
starttime = WallTime();
|
||||
steps = 0;
|
||||
SharedLoop2 sl2(5);
|
||||
do
|
||||
{
|
||||
for (int k = 0; k < 1000; k++)
|
||||
{
|
||||
sl2.Reset(5);
|
||||
ParallelJob ( [&sl2] (TaskInfo ti)
|
||||
{
|
||||
for (auto i : sl2)
|
||||
(void)i; // silence warning
|
||||
} );
|
||||
}
|
||||
steps += 1000;
|
||||
time = WallTime()-starttime;
|
||||
}
|
||||
while (time < maxtime);
|
||||
timings.push_back(make_tuple("short SharedLoop2", time/steps*1e9));
|
||||
|
||||
{
|
||||
starttime = WallTime();
|
||||
steps = 0;
|
||||
SharedLoop2 sl1(5), sl2(5), sl3(5), sl4(5), sl5(5);
|
||||
do
|
||||
{
|
||||
for (int k = 0; k < 1000; k++)
|
||||
{
|
||||
sl1.Reset(5);
|
||||
sl2.Reset(5);
|
||||
sl3.Reset(5);
|
||||
sl4.Reset(5);
|
||||
sl5.Reset(5);
|
||||
ParallelJob ( [&sl1,&sl2,&sl3,&sl4,&sl5] (TaskInfo ti)
|
||||
{
|
||||
for (auto i : sl1)
|
||||
(void)i; // silence warning
|
||||
for (auto i : sl2)
|
||||
(void)i; // silence warning
|
||||
for (auto i : sl3)
|
||||
(void)i; // silence warning
|
||||
for (auto i : sl4)
|
||||
(void)i; // silence warning
|
||||
for (auto i : sl5)
|
||||
(void)i; // silence warning
|
||||
} );
|
||||
}
|
||||
steps += 1000;
|
||||
time = WallTime()-starttime;
|
||||
}
|
||||
while (time < maxtime);
|
||||
timings.push_back(make_tuple("5 short SharedLoop2", time/steps*1e9));
|
||||
}
|
||||
|
||||
|
||||
starttime = WallTime();
|
||||
steps = 0;
|
||||
{
|
||||
SharedLoop2 sl(1000);
|
||||
do
|
||||
{
|
||||
for (int k = 0; k < 1000; k++)
|
||||
{
|
||||
sl.Reset(1000);
|
||||
ParallelJob ( [&sl] (TaskInfo ti)
|
||||
{
|
||||
for (auto i : sl)
|
||||
(void)i; // silence warning
|
||||
} );
|
||||
steps += 1000;
|
||||
}
|
||||
time = WallTime()-starttime;
|
||||
}
|
||||
while (time < maxtime);
|
||||
timings.push_back(make_tuple("SharedLoop2 1000, time per iteration", time/steps*1e9));
|
||||
}
|
||||
|
||||
{
|
||||
starttime = WallTime();
|
||||
steps = 0;
|
||||
SharedLoop2 sl(1000000);
|
||||
do
|
||||
{
|
||||
sl.Reset(1000000);
|
||||
ParallelJob ( [&sl] (TaskInfo ti)
|
||||
{
|
||||
for (auto i : sl)
|
||||
(void)i; // silence warning
|
||||
} );
|
||||
steps += 1000000;
|
||||
time = WallTime()-starttime;
|
||||
}
|
||||
while (time < maxtime);
|
||||
timings.push_back(make_tuple("SharedLoop2 1000000, time per iteration", time/steps*1e9));
|
||||
}
|
||||
|
||||
return timings;
|
||||
}
|
||||
|
||||
}
|
1021
libsrc/core/taskmanager.hpp
Normal file
1021
libsrc/core/taskmanager.hpp
Normal file
File diff suppressed because it is too large
Load Diff
@ -65,6 +65,15 @@ namespace ngcore
|
||||
ost << "\n" << val.first << ": " << val.second;
|
||||
return ost;
|
||||
}
|
||||
|
||||
template <class T>
|
||||
NETGEN_INLINE void Swap (T & a, T & b)
|
||||
{
|
||||
T temp = std::move(a);
|
||||
a = std::move(b);
|
||||
b = std::move(temp);
|
||||
}
|
||||
|
||||
} // namespace ngcore
|
||||
|
||||
#endif // NETGEN_CORE_UTILS_HPP
|
||||
|
Loading…
Reference in New Issue
Block a user