mirror of
https://git.salome-platform.org/gitpub/modules/smesh.git
synced 2025-01-13 18:20:34 +05:00
Replacing ctpl by boost::thread_pool
This commit is contained in:
parent
446efab777
commit
7a915a21ea
@ -90,7 +90,6 @@ SET(SMESHimpl_HEADERS
|
|||||||
SMESH_SMESH.hxx
|
SMESH_SMESH.hxx
|
||||||
MG_ADAPT.hxx
|
MG_ADAPT.hxx
|
||||||
SMESH_Homard.hxx
|
SMESH_Homard.hxx
|
||||||
ctpl.h
|
|
||||||
DriverMesh.hxx
|
DriverMesh.hxx
|
||||||
DriverStep.hxx
|
DriverStep.hxx
|
||||||
)
|
)
|
||||||
|
@ -49,6 +49,7 @@
|
|||||||
|
|
||||||
#include "memoire.h"
|
#include "memoire.h"
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
|
#include <functional>
|
||||||
|
|
||||||
#ifdef WIN32
|
#ifdef WIN32
|
||||||
#include <windows.h>
|
#include <windows.h>
|
||||||
@ -58,6 +59,7 @@
|
|||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
#include <boost/filesystem.hpp>
|
#include <boost/filesystem.hpp>
|
||||||
|
#include <boost/asio.hpp>
|
||||||
namespace fs = boost::filesystem;
|
namespace fs = boost::filesystem;
|
||||||
|
|
||||||
// Environment variable separator
|
// Environment variable separator
|
||||||
@ -247,7 +249,6 @@ bool SMESH_Gen::Compute(SMESH_Mesh & aMesh,
|
|||||||
// ===============================================
|
// ===============================================
|
||||||
|
|
||||||
TopAbs_ShapeEnum previousShapeType = TopAbs_VERTEX;
|
TopAbs_ShapeEnum previousShapeType = TopAbs_VERTEX;
|
||||||
std::vector<std::future<void>> pending;
|
|
||||||
int nbThreads = aMesh.GetNbThreads();
|
int nbThreads = aMesh.GetNbThreads();
|
||||||
auto begin = std::chrono::high_resolution_clock::now();
|
auto begin = std::chrono::high_resolution_clock::now();
|
||||||
|
|
||||||
@ -269,9 +270,7 @@ bool SMESH_Gen::Compute(SMESH_Mesh & aMesh,
|
|||||||
//DEBUG std::cout << "Shape Type" << shapeType << " previous" << previousShapeType << std::endl;
|
//DEBUG std::cout << "Shape Type" << shapeType << " previous" << previousShapeType << std::endl;
|
||||||
if ((aMesh.IsParallel()||nbThreads!=0) && shapeType != previousShapeType) {
|
if ((aMesh.IsParallel()||nbThreads!=0) && shapeType != previousShapeType) {
|
||||||
// Waiting for all threads for the previous type to end
|
// Waiting for all threads for the previous type to end
|
||||||
for(auto &it: pending){
|
aMesh.wait();
|
||||||
it.wait();
|
|
||||||
}
|
|
||||||
|
|
||||||
std::string file_name;
|
std::string file_name;
|
||||||
switch(previousShapeType){
|
switch(previousShapeType){
|
||||||
@ -298,7 +297,6 @@ bool SMESH_Gen::Compute(SMESH_Mesh & aMesh,
|
|||||||
}
|
}
|
||||||
//Resetting threaded pool info
|
//Resetting threaded pool info
|
||||||
previousShapeType = shapeType;
|
previousShapeType = shapeType;
|
||||||
pending.clear();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// check for preview dimension limitations
|
// check for preview dimension limitations
|
||||||
@ -311,9 +309,11 @@ bool SMESH_Gen::Compute(SMESH_Mesh & aMesh,
|
|||||||
}
|
}
|
||||||
if(aMesh.IsParallel())
|
if(aMesh.IsParallel())
|
||||||
{
|
{
|
||||||
pending.push_back(aMesh._pool->push(compute_function, smToCompute, computeEvent,
|
std::cout << "Submitting thread function " << std::endl;
|
||||||
shapeSM, aShapeOnly, allowedSubShapes,
|
boost::asio::post(*(aMesh._pool), [](){std::cerr<< "In Here" << std::endl;});
|
||||||
aShapesId));
|
boost::asio::post(*(aMesh._pool), std::bind(compute_function, 1, smToCompute, computeEvent,
|
||||||
|
shapeSM, aShapeOnly, allowedSubShapes,
|
||||||
|
aShapesId));
|
||||||
} else {
|
} else {
|
||||||
auto begin2 = std::chrono::high_resolution_clock::now();
|
auto begin2 = std::chrono::high_resolution_clock::now();
|
||||||
|
|
||||||
@ -334,10 +334,7 @@ bool SMESH_Gen::Compute(SMESH_Mesh & aMesh,
|
|||||||
// TODO: Check error handling in parallel mode
|
// TODO: Check error handling in parallel mode
|
||||||
if(aMesh.IsParallel()){
|
if(aMesh.IsParallel()){
|
||||||
// Waiting for the thread for Solids to finish
|
// Waiting for the thread for Solids to finish
|
||||||
for(auto &it:pending){
|
aMesh.wait();
|
||||||
it.wait();
|
|
||||||
}
|
|
||||||
pending.clear();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
aMesh.GetMeshDS()->Modified();
|
aMesh.GetMeshDS()->Modified();
|
||||||
|
@ -34,8 +34,6 @@
|
|||||||
|
|
||||||
#include "SMESH_Algo.hxx"
|
#include "SMESH_Algo.hxx"
|
||||||
#include "SMESH_ComputeError.hxx"
|
#include "SMESH_ComputeError.hxx"
|
||||||
#include "ctpl.h"
|
|
||||||
|
|
||||||
|
|
||||||
#include <map>
|
#include <map>
|
||||||
#include <list>
|
#include <list>
|
||||||
|
@ -43,13 +43,12 @@
|
|||||||
|
|
||||||
#include "MEDCouplingMemArray.hxx"
|
#include "MEDCouplingMemArray.hxx"
|
||||||
|
|
||||||
#include "ctpl.h"
|
|
||||||
|
|
||||||
#include <map>
|
#include <map>
|
||||||
#include <list>
|
#include <list>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
#include <ostream>
|
#include <ostream>
|
||||||
#include <boost/filesystem.hpp>
|
#include <boost/filesystem.hpp>
|
||||||
|
#include <boost/asio/thread_pool.hpp>
|
||||||
|
|
||||||
#ifdef WIN32
|
#ifdef WIN32
|
||||||
#pragma warning(disable:4251) // Warning DLL Interface ...
|
#pragma warning(disable:4251) // Warning DLL Interface ...
|
||||||
@ -385,7 +384,7 @@ class SMESH_EXPORT SMESH_Mesh
|
|||||||
|
|
||||||
std::ostream& Dump(std::ostream & save);
|
std::ostream& Dump(std::ostream & save);
|
||||||
|
|
||||||
// Data for parallel computation
|
// Parallel computation functions
|
||||||
|
|
||||||
void Lock() {_my_lock.lock();};
|
void Lock() {_my_lock.lock();};
|
||||||
void Unlock() {_my_lock.unlock();};
|
void Unlock() {_my_lock.unlock();};
|
||||||
@ -393,15 +392,16 @@ class SMESH_EXPORT SMESH_Mesh
|
|||||||
int GetNbThreads(){return _NbThreads;};
|
int GetNbThreads(){return _NbThreads;};
|
||||||
void SetNbThreads(int nbThreads){_NbThreads=nbThreads;};
|
void SetNbThreads(int nbThreads){_NbThreads=nbThreads;};
|
||||||
|
|
||||||
void InitPoolThreads(){_pool = new ctpl::thread_pool(_NbThreads);};
|
void InitPoolThreads(){_pool = new boost::asio::thread_pool(_NbThreads);};
|
||||||
void DeletePoolThreads(){delete _pool;};
|
void DeletePoolThreads(){delete _pool;};
|
||||||
|
|
||||||
|
void wait(){_pool->join(); DeletePoolThreads(); InitPoolThreads(); }
|
||||||
|
|
||||||
bool IsParallel(){return _NbThreads > 0;}
|
bool IsParallel(){return _NbThreads > 0;}
|
||||||
|
|
||||||
// Temporary folder used during parallel Computation
|
// Temporary folder used during parallel Computation
|
||||||
boost::filesystem::path tmp_folder;
|
boost::filesystem::path tmp_folder;
|
||||||
// TODO: Replace by number of thread
|
boost::asio::thread_pool * _pool = nullptr; //thread pool for computation
|
||||||
ctpl::thread_pool * _pool = nullptr; //thread pool for computation
|
|
||||||
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
240
src/SMESH/ctpl.h
240
src/SMESH/ctpl.h
@ -1,240 +0,0 @@
|
|||||||
|
|
||||||
/*********************************************************
|
|
||||||
*
|
|
||||||
* Copyright (C) 2014 by Vitaliy Vitsentiy
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*
|
|
||||||
*********************************************************/
|
|
||||||
|
|
||||||
|
|
||||||
#ifndef __ctpl_thread_pool_H__
|
|
||||||
#define __ctpl_thread_pool_H__
|
|
||||||
|
|
||||||
#include <functional>
|
|
||||||
#include <thread>
|
|
||||||
#include <atomic>
|
|
||||||
#include <vector>
|
|
||||||
#include <memory>
|
|
||||||
#include <exception>
|
|
||||||
#include <future>
|
|
||||||
#include <mutex>
|
|
||||||
#include <boost/lockfree/queue.hpp>
|
|
||||||
|
|
||||||
|
|
||||||
#ifndef _ctplThreadPoolLength_
|
|
||||||
#define _ctplThreadPoolLength_ 100
|
|
||||||
#endif
|
|
||||||
|
|
||||||
|
|
||||||
// thread pool to run user's functors with signature
|
|
||||||
// ret func(int id, other_params)
|
|
||||||
// where id is the index of the thread that runs the functor
|
|
||||||
// ret is some return type
|
|
||||||
|
|
||||||
|
|
||||||
namespace ctpl {
|
|
||||||
|
|
||||||
class thread_pool {
|
|
||||||
|
|
||||||
public:
|
|
||||||
|
|
||||||
thread_pool() : q(_ctplThreadPoolLength_) { this->init(); }
|
|
||||||
thread_pool(int nThreads, int queueSize = _ctplThreadPoolLength_) : q(queueSize) { this->init(); this->resize(nThreads); }
|
|
||||||
|
|
||||||
// the destructor waits for all the functions in the queue to be finished
|
|
||||||
~thread_pool() {
|
|
||||||
this->stop(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
// get the number of running threads in the pool
|
|
||||||
int size() { return static_cast<int>(this->threads.size()); }
|
|
||||||
|
|
||||||
// number of idle threads
|
|
||||||
int n_idle() { return this->nWaiting; }
|
|
||||||
std::thread & get_thread(int i) { return *this->threads[i]; }
|
|
||||||
|
|
||||||
// change the number of threads in the pool
|
|
||||||
// should be called from one thread, otherwise be careful to not interleave, also with this->stop()
|
|
||||||
// nThreads must be >= 0
|
|
||||||
void resize(int nThreads) {
|
|
||||||
if (!this->isStop && !this->isDone) {
|
|
||||||
int oldNThreads = static_cast<int>(this->threads.size());
|
|
||||||
if (oldNThreads <= nThreads) { // if the number of threads is increased
|
|
||||||
this->threads.resize(nThreads);
|
|
||||||
this->flags.resize(nThreads);
|
|
||||||
|
|
||||||
for (int i = oldNThreads; i < nThreads; ++i) {
|
|
||||||
this->flags[i] = std::make_shared<std::atomic<bool>>(false);
|
|
||||||
this->set_thread(i);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else { // the number of threads is decreased
|
|
||||||
for (int i = oldNThreads - 1; i >= nThreads; --i) {
|
|
||||||
*this->flags[i] = true; // this thread will finish
|
|
||||||
this->threads[i]->detach();
|
|
||||||
}
|
|
||||||
{
|
|
||||||
// stop the detached threads that were waiting
|
|
||||||
std::unique_lock<std::mutex> lock(this->mutex);
|
|
||||||
this->cv.notify_all();
|
|
||||||
}
|
|
||||||
this->threads.resize(nThreads); // safe to delete because the threads are detached
|
|
||||||
this->flags.resize(nThreads); // safe to delete because the threads have copies of shared_ptr of the flags, not originals
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// empty the queue
|
|
||||||
void clear_queue() {
|
|
||||||
std::function<void(int id)> * _f;
|
|
||||||
while (this->q.pop(_f))
|
|
||||||
delete _f; // empty the queue
|
|
||||||
}
|
|
||||||
|
|
||||||
// pops a functional wraper to the original function
|
|
||||||
std::function<void(int)> pop() {
|
|
||||||
std::function<void(int id)> * _f = nullptr;
|
|
||||||
this->q.pop(_f);
|
|
||||||
std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
|
|
||||||
|
|
||||||
std::function<void(int)> f;
|
|
||||||
if (_f)
|
|
||||||
f = *_f;
|
|
||||||
return f;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// wait for all computing threads to finish and stop all threads
|
|
||||||
// may be called asyncronously to not pause the calling thread while waiting
|
|
||||||
// if isWait == true, all the functions in the queue are run, otherwise the queue is cleared without running the functions
|
|
||||||
void stop(bool isWait = false) {
|
|
||||||
if (!isWait) {
|
|
||||||
if (this->isStop)
|
|
||||||
return;
|
|
||||||
this->isStop = true;
|
|
||||||
for (int i = 0, n = this->size(); i < n; ++i) {
|
|
||||||
*this->flags[i] = true; // command the threads to stop
|
|
||||||
}
|
|
||||||
this->clear_queue(); // empty the queue
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
if (this->isDone || this->isStop)
|
|
||||||
return;
|
|
||||||
this->isDone = true; // give the waiting threads a command to finish
|
|
||||||
}
|
|
||||||
{
|
|
||||||
std::unique_lock<std::mutex> lock(this->mutex);
|
|
||||||
this->cv.notify_all(); // stop all waiting threads
|
|
||||||
}
|
|
||||||
for (int i = 0; i < static_cast<int>(this->threads.size()); ++i) { // wait for the computing threads to finish
|
|
||||||
if (this->threads[i]->joinable())
|
|
||||||
this->threads[i]->join();
|
|
||||||
}
|
|
||||||
// if there were no threads in the pool but some functors in the queue, the functors are not deleted by the threads
|
|
||||||
// therefore delete them here
|
|
||||||
this->clear_queue();
|
|
||||||
this->threads.clear();
|
|
||||||
this->flags.clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
template<typename F, typename... Rest>
|
|
||||||
auto push(F && f, Rest&&... rest) ->std::future<decltype(f(0, rest...))> {
|
|
||||||
auto pck = std::make_shared<std::packaged_task<decltype(f(0, rest...))(int)>>(
|
|
||||||
std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<Rest>(rest)...)
|
|
||||||
);
|
|
||||||
|
|
||||||
auto _f = new std::function<void(int id)>([pck](int id) {
|
|
||||||
(*pck)(id);
|
|
||||||
});
|
|
||||||
this->q.push(_f);
|
|
||||||
|
|
||||||
std::unique_lock<std::mutex> lock(this->mutex);
|
|
||||||
this->cv.notify_one();
|
|
||||||
|
|
||||||
return pck->get_future();
|
|
||||||
}
|
|
||||||
|
|
||||||
// run the user's function that excepts argument int - id of the running thread. returned value is templatized
|
|
||||||
// operator returns std::future, where the user can get the result and rethrow the catched exceptins
|
|
||||||
template<typename F>
|
|
||||||
auto push(F && f) ->std::future<decltype(f(0))> {
|
|
||||||
auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(std::forward<F>(f));
|
|
||||||
|
|
||||||
auto _f = new std::function<void(int id)>([pck](int id) {
|
|
||||||
(*pck)(id);
|
|
||||||
});
|
|
||||||
this->q.push(_f);
|
|
||||||
|
|
||||||
std::unique_lock<std::mutex> lock(this->mutex);
|
|
||||||
this->cv.notify_one();
|
|
||||||
|
|
||||||
return pck->get_future();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private:
|
|
||||||
|
|
||||||
// deleted
|
|
||||||
thread_pool(const thread_pool &);// = delete;
|
|
||||||
thread_pool(thread_pool &&);// = delete;
|
|
||||||
thread_pool & operator=(const thread_pool &);// = delete;
|
|
||||||
thread_pool & operator=(thread_pool &&);// = delete;
|
|
||||||
|
|
||||||
void set_thread(int i) {
|
|
||||||
std::shared_ptr<std::atomic<bool>> flag(this->flags[i]); // a copy of the shared ptr to the flag
|
|
||||||
auto f = [this, i, flag/* a copy of the shared ptr to the flag */]() {
|
|
||||||
std::atomic<bool> & _flag = *flag;
|
|
||||||
std::function<void(int id)> * _f;
|
|
||||||
bool isPop = this->q.pop(_f);
|
|
||||||
while (true) {
|
|
||||||
while (isPop) { // if there is anything in the queue
|
|
||||||
std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
|
|
||||||
(*_f)(i);
|
|
||||||
|
|
||||||
if (_flag)
|
|
||||||
return; // the thread is wanted to stop, return even if the queue is not empty yet
|
|
||||||
else
|
|
||||||
isPop = this->q.pop(_f);
|
|
||||||
}
|
|
||||||
|
|
||||||
// the queue is empty here, wait for the next command
|
|
||||||
std::unique_lock<std::mutex> lock(this->mutex);
|
|
||||||
++this->nWaiting;
|
|
||||||
this->cv.wait(lock, [this, &_f, &isPop, &_flag](){ isPop = this->q.pop(_f); return isPop || this->isDone || _flag; });
|
|
||||||
--this->nWaiting;
|
|
||||||
|
|
||||||
if (!isPop)
|
|
||||||
return; // if the queue is empty and this->isDone == true or *flag then return
|
|
||||||
}
|
|
||||||
};
|
|
||||||
this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique()
|
|
||||||
}
|
|
||||||
|
|
||||||
void init() { this->nWaiting = 0; this->isStop = false; this->isDone = false; }
|
|
||||||
|
|
||||||
std::vector<std::unique_ptr<std::thread>> threads;
|
|
||||||
std::vector<std::shared_ptr<std::atomic<bool>>> flags;
|
|
||||||
mutable boost::lockfree::queue<std::function<void(int id)> *> q;
|
|
||||||
std::atomic<bool> isDone;
|
|
||||||
std::atomic<bool> isStop;
|
|
||||||
std::atomic<int> nWaiting; // how many threads are waiting
|
|
||||||
|
|
||||||
std::mutex mutex;
|
|
||||||
std::condition_variable cv;
|
|
||||||
};
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
#endif // __ctpl_thread_pool_H__
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user