mirror of
https://github.com/NGSolve/netgen.git
synced 2024-12-25 05:20:34 +05:00
1066 lines
26 KiB
C++
1066 lines
26 KiB
C++
#ifndef NETGEN_CORE_TASKMANAGER_HPP
|
|
#define NETGEN_CORE_TASKMANAGER_HPP
|
|
|
|
/*********************************************************************/
|
|
/* File: taskmanager.hpp */
|
|
/* Author: M. Hochsterger, J. Schoeberl */
|
|
/* Date: 10. Mar. 2015 */
|
|
/*********************************************************************/
|
|
|
|
#include <atomic>
|
|
#include <functional>
|
|
#include <list>
|
|
#include <ostream>
|
|
#include <thread>
|
|
|
|
#include "array.hpp"
|
|
#include "paje_trace.hpp"
|
|
#include "taskmanager.hpp"
|
|
|
|
#ifdef USE_NUMA
|
|
#include <numa.h>
|
|
#include <sched.h>
|
|
#endif
|
|
|
|
|
|
namespace ngcore
|
|
{
|
|
using std::atomic;
|
|
using std::function;
|
|
|
|
class TaskInfo
|
|
{
|
|
public:
|
|
int task_nr;
|
|
int ntasks;
|
|
|
|
int thread_nr;
|
|
int nthreads;
|
|
|
|
// int node_nr;
|
|
// int nnodes;
|
|
};
|
|
|
|
NGCORE_API extern class TaskManager * task_manager;
|
|
|
|
class TaskManager
|
|
{
|
|
// PajeTrace *trace;
|
|
|
|
class alignas(64) NodeData
|
|
{
|
|
public:
|
|
atomic<int> start_cnt{0};
|
|
atomic<int> participate{0};
|
|
};
|
|
|
|
NGCORE_API static const function<void(TaskInfo&)> * func;
|
|
NGCORE_API static const function<void()> * startup_function;
|
|
NGCORE_API static const function<void()> * cleanup_function;
|
|
NGCORE_API static atomic<int> ntasks;
|
|
NGCORE_API static Exception * ex;
|
|
|
|
NGCORE_API static atomic<int> jobnr;
|
|
|
|
static atomic<int> complete[8]; // max nodes
|
|
static atomic<int> done;
|
|
static atomic<int> active_workers;
|
|
static atomic<int> workers_on_node[8]; // max nodes
|
|
// Array<atomic<int>*> sync;
|
|
NGCORE_API static int sleep_usecs;
|
|
NGCORE_API static bool sleep;
|
|
|
|
static NodeData *nodedata[8];
|
|
|
|
static int num_nodes;
|
|
NGCORE_API static int num_threads;
|
|
NGCORE_API static int max_threads;
|
|
|
|
|
|
|
|
// #ifndef __clang__
|
|
static thread_local int thread_id;
|
|
// #else
|
|
// static __thread int thread_id;
|
|
// #endif
|
|
|
|
NGCORE_API static bool use_paje_trace;
|
|
public:
|
|
|
|
NGCORE_API TaskManager();
|
|
NGCORE_API ~TaskManager();
|
|
|
|
|
|
NGCORE_API void StartWorkers();
|
|
NGCORE_API void StopWorkers();
|
|
|
|
void SuspendWorkers(int asleep_usecs = 1000 )
|
|
{
|
|
sleep_usecs = asleep_usecs;
|
|
sleep = true;
|
|
}
|
|
void ResumeWorkers() { sleep = false; }
|
|
|
|
NGCORE_API static void SetNumThreads(int amax_threads);
|
|
NGCORE_API static int GetMaxThreads() { return max_threads; }
|
|
// static int GetNumThreads() { return task_manager ? task_manager->num_threads : 1; }
|
|
NGCORE_API static int GetNumThreads() { return num_threads; }
|
|
NGCORE_API static int GetThreadId();
|
|
NGCORE_API int GetNumNodes() const { return num_nodes; }
|
|
|
|
static void SetPajeTrace (bool use) { use_paje_trace = use; }
|
|
|
|
NGCORE_API static bool ProcessTask();
|
|
|
|
NGCORE_API static void CreateJob (const function<void(TaskInfo&)> & afunc,
|
|
int antasks = task_manager->GetNumThreads());
|
|
|
|
static void SetStartupFunction (const function<void()> & func) { startup_function = &func; }
|
|
static void SetStartupFunction () { startup_function = nullptr; }
|
|
static void SetCleanupFunction (const function<void()> & func) { cleanup_function = &func; }
|
|
static void SetCleanupFunction () { cleanup_function = nullptr; }
|
|
|
|
void Done() { done = true; }
|
|
NGCORE_API void Loop(int thread_num);
|
|
|
|
NGCORE_API static std::list<std::tuple<std::string,double>> Timing ();
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
NGCORE_API void RunWithTaskManager (function<void()> alg);
|
|
|
|
// For Python context manager
|
|
NGCORE_API int EnterTaskManager ();
|
|
NGCORE_API void ExitTaskManager (int num_threads);
|
|
|
|
class RegionTaskManager
|
|
{
|
|
int nthreads_before;
|
|
int nthreads;
|
|
bool started_taskmanager;
|
|
|
|
public:
|
|
RegionTaskManager(int anthreads=TaskManager::GetMaxThreads())
|
|
: nthreads(anthreads)
|
|
{
|
|
if(task_manager || nthreads==0)
|
|
{
|
|
// already running, no need to do anything
|
|
started_taskmanager = false;
|
|
return;
|
|
}
|
|
else
|
|
{
|
|
nthreads_before = TaskManager::GetMaxThreads();
|
|
TaskManager::SetNumThreads(nthreads);
|
|
nthreads = EnterTaskManager();
|
|
started_taskmanager = true;
|
|
}
|
|
}
|
|
|
|
~RegionTaskManager()
|
|
{
|
|
if(started_taskmanager)
|
|
{
|
|
ExitTaskManager(nthreads);
|
|
TaskManager::SetNumThreads(nthreads_before);
|
|
}
|
|
}
|
|
};
|
|
|
|
NETGEN_INLINE int TasksPerThread (int tpt)
|
|
{
|
|
// return task_manager ? tpt*task_manager->GetNumThreads() : 1;
|
|
return tpt*TaskManager::GetNumThreads();
|
|
}
|
|
|
|
|
|
class TotalCosts
|
|
{
|
|
size_t cost;
|
|
public:
|
|
TotalCosts (size_t _cost) : cost(_cost) { ; }
|
|
size_t operator ()() { return cost; }
|
|
};
|
|
|
|
template <typename TR, typename TFUNC>
|
|
NETGEN_INLINE void ParallelFor (T_Range<TR> r, TFUNC f,
|
|
int antasks = TaskManager::GetNumThreads(),
|
|
TotalCosts costs = 1000)
|
|
{
|
|
// if (task_manager && costs() >= 1000)
|
|
|
|
TaskManager::CreateJob
|
|
([r, f] (TaskInfo & ti)
|
|
{
|
|
auto myrange = r.Split (ti.task_nr, ti.ntasks);
|
|
for (auto i : myrange) f(i);
|
|
},
|
|
antasks);
|
|
|
|
/*
|
|
else
|
|
for (auto i : r) f(i);
|
|
*/
|
|
}
|
|
|
|
/*
|
|
template <typename TFUNC>
|
|
NETGEN_INLINE void ParallelFor (size_t n, TFUNC f,
|
|
int antasks = task_manager ? task_manager->GetNumThreads() : 0)
|
|
{
|
|
ParallelFor (IntRange (n), f, antasks);
|
|
}
|
|
*/
|
|
template <typename ...Args>
|
|
NETGEN_INLINE void ParallelFor (size_t n, Args...args)
|
|
{
|
|
ParallelFor (IntRange (n), args...);
|
|
}
|
|
|
|
template <typename TR, typename TFUNC>
|
|
NETGEN_INLINE void ParallelForRange (T_Range<TR> r, TFUNC f,
|
|
int antasks = TaskManager::GetNumThreads(),
|
|
TotalCosts costs = 1000)
|
|
{
|
|
// if (task_manager && costs() >= 1000)
|
|
|
|
TaskManager::CreateJob
|
|
([r, f] (TaskInfo & ti)
|
|
{
|
|
auto myrange = r.Split (ti.task_nr, ti.ntasks);
|
|
f(myrange);
|
|
},
|
|
antasks);
|
|
/*
|
|
else
|
|
f(r);
|
|
*/
|
|
}
|
|
|
|
/*
|
|
template <typename TFUNC>
|
|
NETGEN_INLINE void ParallelForRange (size_t n, TFUNC f,
|
|
int antasks = task_manager ? task_manager->GetNumThreads() : 0)
|
|
{
|
|
ParallelForRange (IntRange(n), f, antasks);
|
|
}
|
|
*/
|
|
template <typename ...Args>
|
|
NETGEN_INLINE void ParallelForRange (size_t n, Args...args)
|
|
{
|
|
ParallelForRange (IntRange(n), args...);
|
|
}
|
|
|
|
template <typename TFUNC>
|
|
NETGEN_INLINE void ParallelJob (TFUNC f,
|
|
int antasks = TaskManager::GetNumThreads())
|
|
{
|
|
TaskManager::CreateJob (f, antasks);
|
|
}
|
|
|
|
|
|
/*
|
|
Usage example:
|
|
|
|
ShareLoop myloop(100);
|
|
task_manager->CreateJob ([]()
|
|
{
|
|
for (int i : myloop)
|
|
cout << "i = " << i << endl;
|
|
});
|
|
|
|
*/
|
|
|
|
class SharedLoop
|
|
{
|
|
atomic<int> cnt;
|
|
IntRange r;
|
|
|
|
|
|
class SharedIterator
|
|
{
|
|
atomic<int> & cnt;
|
|
int myval;
|
|
int endval;
|
|
public:
|
|
SharedIterator (atomic<int> & acnt, int aendval, bool begin_iterator)
|
|
: cnt (acnt)
|
|
{
|
|
endval = aendval;
|
|
myval = begin_iterator ? cnt++ : endval;
|
|
if (myval > endval) myval = endval;
|
|
}
|
|
|
|
SharedIterator & operator++ ()
|
|
{
|
|
myval = cnt++;
|
|
if (myval > endval) myval = endval;
|
|
return *this;
|
|
}
|
|
|
|
int operator* () const { return myval; }
|
|
bool operator!= (const SharedIterator & it2) const { return myval != it2.myval; }
|
|
};
|
|
|
|
|
|
public:
|
|
SharedLoop (IntRange ar) : r(ar) { cnt = r.begin(); }
|
|
SharedIterator begin() { return SharedIterator (cnt, r.end(), true); }
|
|
SharedIterator end() { return SharedIterator (cnt, r.end(), false); }
|
|
};
|
|
|
|
|
|
/*
|
|
class alignas(4096) AtomicRange
|
|
{
|
|
mutex lock;
|
|
int begin;
|
|
int end;
|
|
public:
|
|
|
|
void Set (IntRange r)
|
|
{
|
|
lock_guard<mutex> guard(lock);
|
|
begin = r.begin();
|
|
end = r.end();
|
|
}
|
|
|
|
IntRange Get()
|
|
{
|
|
lock_guard<mutex> guard(lock);
|
|
return IntRange(begin, end);
|
|
}
|
|
|
|
bool PopFirst (int & first)
|
|
{
|
|
lock_guard<mutex> guard(lock);
|
|
bool non_empty = end > begin;
|
|
first = begin;
|
|
if (non_empty) begin++;
|
|
return non_empty;
|
|
}
|
|
|
|
bool PopHalf (IntRange & r)
|
|
{
|
|
lock_guard<mutex> guard(lock);
|
|
bool non_empty = end > begin;
|
|
if (non_empty)
|
|
{
|
|
int mid = (begin+end+1)/2;
|
|
r = IntRange(begin, mid);
|
|
begin = mid;
|
|
}
|
|
return non_empty;
|
|
}
|
|
};
|
|
*/
|
|
|
|
|
|
|
|
// lock free popfirst
|
|
// faster for large loops, bug slower for small loops (~1000) ????
|
|
/*
|
|
class alignas(4096) AtomicRange
|
|
{
|
|
mutex lock;
|
|
atomic<int> begin;
|
|
int end;
|
|
public:
|
|
|
|
void Set (IntRange r)
|
|
{
|
|
lock_guard<mutex> guard(lock);
|
|
// begin = r.begin();
|
|
begin.store(r.begin(), std::memory_order_relaxed);
|
|
end = r.end();
|
|
}
|
|
|
|
void SetNoLock (IntRange r)
|
|
{
|
|
begin.store(r.begin(), std::memory_order_relaxed);
|
|
end = r.end();
|
|
}
|
|
|
|
// IntRange Get()
|
|
// {
|
|
// lock_guard<mutex> guard(lock);
|
|
// return IntRange(begin, end);
|
|
// }
|
|
|
|
bool PopFirst (int & first)
|
|
{
|
|
// int oldbegin = begin;
|
|
int oldbegin = begin.load(std::memory_order_relaxed);
|
|
if (oldbegin >= end) return false;
|
|
while (!begin.compare_exchange_weak (oldbegin, oldbegin+1,
|
|
std::memory_order_relaxed, std::memory_order_relaxed))
|
|
if (oldbegin >= end) return false;
|
|
|
|
first = oldbegin;
|
|
return true;
|
|
}
|
|
|
|
bool PopHalf (IntRange & r)
|
|
{
|
|
// int oldbegin = begin;
|
|
int oldbegin = begin.load(std::memory_order_relaxed);
|
|
if (oldbegin >= end) return false;
|
|
|
|
lock_guard<mutex> guard(lock);
|
|
while (!begin.compare_exchange_weak (oldbegin, (oldbegin+end+1)/2,
|
|
std::memory_order_relaxed, std::memory_order_relaxed))
|
|
if (oldbegin >= end) return false;
|
|
|
|
r = IntRange(oldbegin, (oldbegin+end+1)/2);
|
|
return true;
|
|
}
|
|
};
|
|
|
|
|
|
// inline ostream & operator<< (ostream & ost, AtomicRange & r)
|
|
// {
|
|
// ost << r.Get();
|
|
// return ost;
|
|
// }
|
|
*/
|
|
|
|
|
|
|
|
class alignas(4096) AtomicRange
|
|
{
|
|
atomic<size_t> begin;
|
|
atomic<size_t> end;
|
|
public:
|
|
|
|
void Set (IntRange r)
|
|
{
|
|
begin.store(std::numeric_limits<size_t>::max(), std::memory_order_release);
|
|
end.store(r.end(), std::memory_order_release);
|
|
begin.store(r.begin(), std::memory_order_release);
|
|
}
|
|
|
|
void SetNoLock (IntRange r)
|
|
{
|
|
end.store(r.end(), std::memory_order_release);
|
|
begin.store(r.begin(), std::memory_order_release);
|
|
}
|
|
|
|
// IntRange Get()
|
|
// {
|
|
// lock_guard<mutex> guard(lock);
|
|
// return IntRange(begin, end);
|
|
// }
|
|
|
|
bool PopFirst (size_t & first)
|
|
{
|
|
first = begin++;
|
|
return first < end;
|
|
/*
|
|
// int oldbegin = begin;
|
|
size_t oldbegin = begin.load(std::memory_order_acquire);
|
|
if (oldbegin >= end) return false;
|
|
while (!begin.compare_exchange_weak (oldbegin, oldbegin+1,
|
|
std::memory_order_relaxed, std::memory_order_relaxed))
|
|
if (oldbegin >= end) return false;
|
|
|
|
first = oldbegin;
|
|
return true;
|
|
*/
|
|
}
|
|
|
|
bool PopHalf (IntRange & r)
|
|
{
|
|
// int oldbegin = begin;
|
|
size_t oldbegin = begin.load(std::memory_order_acquire);
|
|
size_t oldend = end.load(std::memory_order_acquire);
|
|
if (oldbegin >= oldend) return false;
|
|
|
|
// lock_guard<mutex> guard(lock);
|
|
while (!begin.compare_exchange_weak (oldbegin, (oldbegin+oldend+1)/2,
|
|
std::memory_order_relaxed, std::memory_order_relaxed))
|
|
{
|
|
oldend = end.load(std::memory_order_acquire);
|
|
if (oldbegin >= oldend) return false;
|
|
}
|
|
|
|
r = IntRange(oldbegin, (oldbegin+oldend+1)/2);
|
|
return true;
|
|
}
|
|
};
|
|
|
|
|
|
|
|
|
|
class SharedLoop2
|
|
{
|
|
Array<AtomicRange> ranges;
|
|
atomic<size_t> processed;
|
|
atomic<size_t> total;
|
|
atomic<int> participants;
|
|
|
|
class SharedIterator
|
|
{
|
|
FlatArray<AtomicRange> ranges;
|
|
atomic<size_t> & processed;
|
|
size_t total;
|
|
size_t myval;
|
|
size_t processed_by_me = 0;
|
|
int me;
|
|
int steal_from;
|
|
public:
|
|
SharedIterator (FlatArray<AtomicRange> _ranges, atomic<size_t> & _processed, size_t _total,
|
|
int _me, bool begin_it)
|
|
: ranges(_ranges), processed(_processed), total(_total)
|
|
{
|
|
if (begin_it)
|
|
{
|
|
// me = TaskManager::GetThreadId();
|
|
me = _me;
|
|
steal_from = me;
|
|
GetNext();
|
|
}
|
|
}
|
|
~SharedIterator()
|
|
{
|
|
if (processed_by_me)
|
|
processed += processed_by_me;
|
|
}
|
|
|
|
SharedIterator & operator++ () { GetNext(); return *this;}
|
|
|
|
void GetNext()
|
|
{
|
|
size_t nr;
|
|
if (ranges[me].PopFirst(nr))
|
|
{
|
|
processed_by_me++;
|
|
myval = nr;
|
|
return;
|
|
}
|
|
GetNext2();
|
|
}
|
|
|
|
void GetNext2()
|
|
{
|
|
processed += processed_by_me;
|
|
processed_by_me = 0;
|
|
|
|
// done with my work, going to steal ...
|
|
while (1)
|
|
{
|
|
if (processed >= total) return;
|
|
|
|
steal_from++;
|
|
if (steal_from == ranges.Size()) steal_from = 0;
|
|
|
|
// steal half of the work reserved for 'from':
|
|
IntRange steal;
|
|
if (ranges[steal_from].PopHalf(steal))
|
|
{
|
|
myval = steal.First();
|
|
processed_by_me++;
|
|
if (myval+1 < steal.Next())
|
|
ranges[me].Set (IntRange(myval+1, steal.Next()));
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
size_t operator* () const { return myval; }
|
|
bool operator!= (const SharedIterator & it2) const { return processed < total; }
|
|
};
|
|
|
|
|
|
public:
|
|
SharedLoop2 ()
|
|
: ranges(TaskManager::GetNumThreads())
|
|
{ ; }
|
|
|
|
SharedLoop2 (IntRange r)
|
|
: ranges(TaskManager::GetNumThreads())
|
|
{
|
|
Reset (r);
|
|
}
|
|
|
|
void Reset (IntRange r)
|
|
{
|
|
for (size_t i = 0; i < ranges.Size(); i++)
|
|
ranges[i].SetNoLock (r.Split(i,ranges.Size()));
|
|
|
|
total.store(r.Size(), std::memory_order_relaxed);
|
|
participants.store(0, std::memory_order_relaxed);
|
|
processed.store(0, std::memory_order_release);
|
|
}
|
|
|
|
SharedIterator begin()
|
|
{
|
|
/*
|
|
int me = participants++;
|
|
if (me < ranges.Size())
|
|
return SharedIterator (ranges, processed, total, me, true);
|
|
else
|
|
// more participants than buckets. set processed to total, and the loop is terminated immediately
|
|
return SharedIterator (ranges, total, total, me, true);
|
|
*/
|
|
return SharedIterator (ranges, processed, total, TaskManager::GetThreadId(), true);
|
|
}
|
|
|
|
SharedIterator end() { return SharedIterator (ranges, processed, total, -1, false); }
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
class Partitioning
|
|
{
|
|
Array<size_t> part;
|
|
size_t total_costs;
|
|
public:
|
|
Partitioning () { ; }
|
|
|
|
template <typename T>
|
|
Partitioning (const Array<T> & apart) { part = apart; }
|
|
|
|
template <typename T>
|
|
Partitioning & operator= (const Array<T> & apart) { part = apart; return *this; }
|
|
|
|
size_t GetTotalCosts() const { return total_costs; }
|
|
|
|
template <typename TFUNC>
|
|
void Calc (size_t n, TFUNC costs, int size = task_manager ? task_manager->GetNumThreads() : 1)
|
|
{
|
|
Array<size_t> prefix (n);
|
|
|
|
/*
|
|
size_t sum = 0;
|
|
for (auto i : ngstd::Range(n))
|
|
{
|
|
sum += costs(i);
|
|
prefix[i] = sum;
|
|
}
|
|
total_costs = sum;
|
|
*/
|
|
|
|
Array<size_t> partial_sums(TaskManager::GetNumThreads()+1);
|
|
partial_sums[0] = 0;
|
|
ParallelJob
|
|
([&] (TaskInfo ti)
|
|
{
|
|
IntRange r = IntRange(n).Split(ti.task_nr, ti.ntasks);
|
|
size_t mysum = 0;
|
|
for (size_t i : r)
|
|
{
|
|
size_t c = costs(i);
|
|
mysum += c;
|
|
prefix[i] = c;
|
|
}
|
|
partial_sums[ti.task_nr+1] = mysum;
|
|
});
|
|
|
|
for (size_t i = 1; i < partial_sums.Size(); i++)
|
|
partial_sums[i] += partial_sums[i-1];
|
|
total_costs = partial_sums.Last();
|
|
|
|
ParallelJob
|
|
([&] (TaskInfo ti)
|
|
{
|
|
IntRange r = IntRange(n).Split(ti.task_nr, ti.ntasks);
|
|
size_t mysum = partial_sums[ti.task_nr];
|
|
for (size_t i : r)
|
|
{
|
|
mysum += prefix[i];
|
|
prefix[i] = mysum;
|
|
}
|
|
});
|
|
|
|
|
|
part.SetSize (size+1);
|
|
part[0] = 0;
|
|
|
|
for (int i = 1; i <= size; i++)
|
|
part[i] = BinSearch (prefix, total_costs*i/size);
|
|
}
|
|
|
|
size_t Size() const { return part.Size()-1; }
|
|
IntRange operator[] (size_t i) const { return ngcore::Range(part[i], part[i+1]); }
|
|
IntRange Range() const { return ngcore::Range(part[0], part[Size()]); }
|
|
|
|
|
|
|
|
|
|
private:
|
|
template <typename Tarray>
|
|
int BinSearch(const Tarray & v, size_t i) {
|
|
int n = v.Size();
|
|
if (n == 0) return 0;
|
|
|
|
int first = 0;
|
|
int last = n-1;
|
|
if(v[0]>i) return 0;
|
|
if(v[n-1] <= i) return n;
|
|
while(last-first>1) {
|
|
int m = (first+last)/2;
|
|
if(v[m]<i)
|
|
first = m;
|
|
else
|
|
last = m;
|
|
}
|
|
return first;
|
|
}
|
|
};
|
|
|
|
|
|
inline std::ostream & operator<< (std::ostream & ost, const Partitioning & part)
|
|
{
|
|
for (int i : Range(part.Size()))
|
|
ost << part[i] << " ";
|
|
return ost;
|
|
}
|
|
|
|
|
|
// tasks must be a multiple of part.size
|
|
template <typename TFUNC>
|
|
NETGEN_INLINE void ParallelFor (const Partitioning & part, TFUNC f, int tasks_per_thread = 1)
|
|
{
|
|
if (task_manager)
|
|
{
|
|
int ntasks = tasks_per_thread * task_manager->GetNumThreads();
|
|
if (ntasks % part.Size() != 0)
|
|
throw Exception ("tasks must be a multiple of part.size");
|
|
|
|
task_manager -> CreateJob
|
|
([&] (TaskInfo & ti)
|
|
{
|
|
int tasks_per_part = ti.ntasks / part.Size();
|
|
int mypart = ti.task_nr / tasks_per_part;
|
|
int num_in_part = ti.task_nr % tasks_per_part;
|
|
|
|
auto myrange = part[mypart].Split (num_in_part, tasks_per_part);
|
|
for (auto i : myrange) f(i);
|
|
}, ntasks);
|
|
}
|
|
else
|
|
{
|
|
for (auto i : part.Range())
|
|
f(i);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
template <typename TFUNC>
|
|
NETGEN_INLINE void ParallelForRange (const Partitioning & part, TFUNC f,
|
|
int tasks_per_thread = 1, TotalCosts costs = 1000)
|
|
{
|
|
if (task_manager && costs() >= 1000)
|
|
{
|
|
int ntasks = tasks_per_thread * task_manager->GetNumThreads();
|
|
if (ntasks % part.Size() != 0)
|
|
throw Exception ("tasks must be a multiple of part.size");
|
|
|
|
task_manager -> CreateJob
|
|
([&] (TaskInfo & ti)
|
|
{
|
|
int tasks_per_part = ti.ntasks / part.Size();
|
|
int mypart = ti.task_nr / tasks_per_part;
|
|
int num_in_part = ti.task_nr % tasks_per_part;
|
|
|
|
auto myrange = part[mypart].Split (num_in_part, tasks_per_part);
|
|
f(myrange);
|
|
}, ntasks);
|
|
}
|
|
else
|
|
{
|
|
f(part.Range());
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
template <typename FUNC, typename OP, typename T>
|
|
auto ParallelReduce (size_t n, FUNC f, OP op, T initial1)
|
|
{
|
|
typedef decltype (op(initial1,initial1)) TRES;
|
|
TRES initial(initial1);
|
|
/*
|
|
for (size_t i = 0; i < n; i++)
|
|
initial = op(initial, f(i));
|
|
*/
|
|
Array<TRES> part_reduce(TaskManager::GetNumThreads());
|
|
ParallelJob ([&] (TaskInfo ti)
|
|
{
|
|
auto r = Range(n).Split(ti.task_nr, ti.ntasks);
|
|
auto var = initial;
|
|
for (auto i : r)
|
|
var = op(var, f(i));
|
|
part_reduce[ti.task_nr] = var;
|
|
});
|
|
for (auto v : part_reduce)
|
|
initial = op(initial, v);
|
|
return initial;
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// // some suggar for working with arrays
|
|
//
|
|
// template <typename T> template <typename T2>
|
|
// const FlatArray<T> FlatArray<T>::operator= (ParallelValue<T2> val)
|
|
// {
|
|
// ParallelForRange (Size(),
|
|
// [this, val] (IntRange r)
|
|
// {
|
|
// for (auto i : r)
|
|
// (*this)[i] = val;
|
|
// });
|
|
// return *this;
|
|
// }
|
|
//
|
|
// template <typename T> template <typename T2>
|
|
// const FlatArray<T> FlatArray<T>::operator= (ParallelFunction<T2> func)
|
|
// {
|
|
// ParallelForRange (Size(),
|
|
// [this, func] (IntRange r)
|
|
// {
|
|
// for (auto i : r)
|
|
// (*this)[i] = func(i);
|
|
// });
|
|
// return *this;
|
|
// }
|
|
|
|
class Tasks
|
|
{
|
|
size_t num;
|
|
public:
|
|
explicit Tasks (size_t _num = TaskManager::GetNumThreads()) : num(_num) { ; }
|
|
auto GetNum() const { return num; }
|
|
};
|
|
|
|
/* currently not used, plus causing problems on MSVC 2017
|
|
template <typename T, typename std::enable_if<ngstd::has_call_operator<T>::value, int>::type = 0>
|
|
inline ParallelFunction<T> operator| (const T & func, Tasks tasks)
|
|
{
|
|
return func;
|
|
}
|
|
|
|
template <typename T, typename std::enable_if<!ngstd::has_call_operator<T>::value, int>::type = 0>
|
|
inline ParallelValue<T> operator| (const T & obj, Tasks tasks)
|
|
{
|
|
return obj;
|
|
}
|
|
|
|
inline Tasks operator "" _tasks_per_thread (unsigned long long n)
|
|
{
|
|
return Tasks(n * TaskManager::GetNumThreads());
|
|
}
|
|
*/
|
|
|
|
/*
|
|
thought to be used as: array = 1 | tasks
|
|
class DefaultTasks
|
|
{
|
|
public:
|
|
operator Tasks () const { return TaskManager::GetNumThreads(); }
|
|
};
|
|
static DefaultTasks tasks;
|
|
*/
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#ifdef USE_NUMA
|
|
|
|
template <typename T>
|
|
class NumaInterleavedArray : public Array<T>
|
|
{
|
|
T * numa_ptr;
|
|
size_t numa_size;
|
|
public:
|
|
NumaInterleavedArray () { numa_size = 0; numa_ptr = nullptr; }
|
|
NumaInterleavedArray (size_t s)
|
|
: Array<T> (s, (T*)numa_alloc_interleaved(s*sizeof(T)))
|
|
{
|
|
numa_ptr = this->data;
|
|
numa_size = s;
|
|
}
|
|
|
|
~NumaInterleavedArray ()
|
|
{
|
|
numa_free (numa_ptr, numa_size*sizeof(T));
|
|
}
|
|
|
|
NumaInterleavedArray & operator= (T val)
|
|
{
|
|
Array<T>::operator= (val);
|
|
return *this;
|
|
}
|
|
|
|
NumaInterleavedArray & operator= (NumaInterleavedArray && a2)
|
|
{
|
|
Array<T>::operator= ((Array<T>&&)a2);
|
|
ngcore::Swap (numa_ptr, a2.numa_ptr);
|
|
ngcore::Swap (numa_size, a2.numa_size);
|
|
return *this;
|
|
}
|
|
|
|
void Swap (NumaInterleavedArray & b)
|
|
{
|
|
Array<T>::Swap(b);
|
|
ngcore::Swap (numa_ptr, b.numa_ptr);
|
|
ngcore::Swap (numa_size, b.numa_size);
|
|
}
|
|
|
|
void SetSize (size_t size)
|
|
{
|
|
std::cerr << "************************* NumaDistArray::SetSize not overloaded" << std::endl;
|
|
Array<T>::SetSize(size);
|
|
}
|
|
};
|
|
|
|
template <typename T>
|
|
class NumaDistributedArray : public Array<T>
|
|
{
|
|
T * numa_ptr;
|
|
size_t numa_size;
|
|
public:
|
|
NumaDistributedArray () { numa_size = 0; numa_ptr = nullptr; }
|
|
NumaDistributedArray (size_t s)
|
|
: Array<T> (s, (T*)numa_alloc_local(s*sizeof(T)))
|
|
{
|
|
numa_ptr = this->data;
|
|
numa_size = s;
|
|
|
|
/* int avail = */ numa_available(); // initialize libnuma
|
|
int num_nodes = numa_num_configured_nodes();
|
|
size_t pagesize = numa_pagesize();
|
|
|
|
int npages = ceil ( double(s)*sizeof(T) / pagesize );
|
|
|
|
// cout << "size = " << numa_size << endl;
|
|
// cout << "npages = " << npages << endl;
|
|
|
|
for (int i = 0; i < num_nodes; i++)
|
|
{
|
|
int beg = (i * npages) / num_nodes;
|
|
int end = ( (i+1) * npages) / num_nodes;
|
|
// cout << "node " << i << " : [" << beg << "-" << end << ")" << endl;
|
|
numa_tonode_memory(numa_ptr+beg*pagesize/sizeof(T), (end-beg)*pagesize, i);
|
|
}
|
|
}
|
|
|
|
~NumaDistributedArray ()
|
|
{
|
|
numa_free (numa_ptr, numa_size*sizeof(T));
|
|
}
|
|
|
|
NumaDistributedArray & operator= (NumaDistributedArray && a2)
|
|
{
|
|
Array<T>::operator= ((Array<T>&&)a2);
|
|
ngcore::Swap (numa_ptr, a2.numa_ptr);
|
|
ngcore::Swap (numa_size, a2.numa_size);
|
|
return *this;
|
|
}
|
|
|
|
void Swap (NumaDistributedArray & b)
|
|
{
|
|
Array<T>::Swap(b);
|
|
ngcore::Swap (numa_ptr, b.numa_ptr);
|
|
ngcore::Swap (numa_size, b.numa_size);
|
|
}
|
|
|
|
void SetSize (size_t size)
|
|
{
|
|
std::cerr << "************************* NumaDistArray::SetSize not overloaded" << std::endl;
|
|
Array<T>::SetSize(size);
|
|
}
|
|
};
|
|
|
|
|
|
|
|
template <typename T>
|
|
class NumaLocalArray : public Array<T>
|
|
{
|
|
T * numa_ptr;
|
|
size_t numa_size;
|
|
public:
|
|
NumaLocalArray () { numa_size = 0; numa_ptr = nullptr; }
|
|
NumaLocalArray (size_t s)
|
|
: Array<T> (s, (T*)numa_alloc_local(s*sizeof(T)))
|
|
{
|
|
numa_ptr = this->data;
|
|
numa_size = s;
|
|
}
|
|
|
|
~NumaLocalArray ()
|
|
{
|
|
numa_free (numa_ptr, numa_size*sizeof(T));
|
|
}
|
|
|
|
NumaLocalArray & operator= (T val)
|
|
{
|
|
Array<T>::operator= (val);
|
|
return *this;
|
|
}
|
|
|
|
NumaLocalArray & operator= (NumaLocalArray && a2)
|
|
{
|
|
Array<T>::operator= ((Array<T>&&)a2);
|
|
ngcore::Swap (numa_ptr, a2.numa_ptr);
|
|
ngcore::Swap (numa_size, a2.numa_size);
|
|
return *this;
|
|
}
|
|
|
|
void Swap (NumaLocalArray & b)
|
|
{
|
|
Array<T>::Swap(b);
|
|
ngcore::Swap (numa_ptr, b.numa_ptr);
|
|
ngcore::Swap (numa_size, b.numa_size);
|
|
}
|
|
|
|
void SetSize (size_t size)
|
|
{
|
|
std::cerr << "************************* NumaDistArray::SetSize not overloaded" << std::endl;
|
|
Array<T>::SetSize(size);
|
|
}
|
|
};
|
|
|
|
|
|
#else // USE_NUMA
|
|
|
|
template <typename T>
|
|
using NumaDistributedArray = Array<T>;
|
|
|
|
template <typename T>
|
|
using NumaInterleavedArray = Array<T>;
|
|
|
|
template <typename T>
|
|
using NumaLocalArray = Array<T>;
|
|
|
|
#endif // USE_NUMA
|
|
|
|
}
|
|
|
|
|
|
|
|
#endif // NETGEN_CORE_TASKMANAGER_HPP
|