This commit is contained in:
Yoann Audouin 2022-05-11 09:10:21 +02:00
parent f4fda5460f
commit 642fc469af
9 changed files with 397 additions and 85 deletions

View File

@ -90,6 +90,7 @@ SET(SMESHimpl_HEADERS
SMESH_SMESH.hxx
MG_ADAPT.hxx
SMESH_Homard.hxx
ctpl.h
)
# --- sources ---

View File

@ -155,6 +155,41 @@ SMESH_Mesh* SMESH_Gen::CreateMesh(bool theIsEmbeddedMode)
return aMesh;
}
//=============================================================================
/*
* Parallel compute of a submesh
* This function is used to pass to thread_pool
*/
//=============================================================================
const std::function<void(int,
SMESH_subMesh*,
SMESH_subMesh::compute_event,
SMESH_subMesh*,
bool,
TopTools_IndexedMapOfShape *,
TSetOfInt*)>
parallel_compute([&] (int id,
SMESH_subMesh* sm,
SMESH_subMesh::compute_event event,
SMESH_subMesh *shapeSM,
bool aShapeOnly,
TopTools_IndexedMapOfShape *allowedSubShapes,
TSetOfInt* aShapesId) -> void
{
if (sm->GetComputeState() == SMESH_subMesh::READY_TO_COMPUTE)
{
sm->SetAllowedSubShapes( fillAllowed( shapeSM, aShapeOnly, allowedSubShapes ));
//setCurrentSubMesh( sm );
sm->ComputeStateEngine(event);
//setCurrentSubMesh( nullptr );
sm->SetAllowedSubShapes( nullptr );
}
if ( aShapesId )
aShapesId->insert( sm->GetId() );
});
//=============================================================================
/*
* Compute a mesh
@ -182,6 +217,11 @@ bool SMESH_Gen::Compute(SMESH_Mesh & aMesh,
const bool complexShapeFirst = true;
const int globalAlgoDim = 100;
// Pool of thread for computation
if (!_pool){
_pool = new ctpl::thread_pool(2);
}
SMESH_subMeshIteratorPtr smIt;
// Fix of Issue 22150. Due to !BLSURF->OnlyUnaryInput(), BLSURF computes edges
@ -202,7 +242,9 @@ bool SMESH_Gen::Compute(SMESH_Mesh & aMesh,
// Mesh all the sub-shapes starting from vertices
// ===============================================
TopAbs_ShapeEnum previousShapeType = TopAbs_VERTEX;
smIt = shapeSM->getDependsOnIterator(includeSelf, !complexShapeFirst);
std::vector<std::future<void>> pending;
while ( smIt->more() )
{
SMESH_subMesh* smToCompute = smIt->next();
@ -213,6 +255,18 @@ bool SMESH_Gen::Compute(SMESH_Mesh & aMesh,
if ( !aMesh.HasShapeToMesh() && shapeType == TopAbs_VERTEX )
continue;
std::cout << "Shape Type" << shapeType << " previous" << previousShapeType << std::endl;
if (shapeType != previousShapeType) {
// Waiting for all thread for the previous type to end
for(auto it =std::begin(pending); it != std::end(pending); ++it){
std::cout << "Waiting" << std::endl;
it->wait();
}
//Resetting threaded pool info
previousShapeType = shapeType;
pending.clear();
}
// check for preview dimension limitations
if ( aShapesId && GetShapeDim( shapeType ) > (int)aDim )
{
@ -221,25 +275,18 @@ bool SMESH_Gen::Compute(SMESH_Mesh & aMesh,
smToCompute->ComputeStateEngine( SMESH_subMesh::CHECK_COMPUTE_STATE );
continue;
}
pending.push_back(_pool->push(parallel_compute, smToCompute, computeEvent,
shapeSM, aShapeOnly, allowedSubShapes,
aShapesId));
std::cout << "Launched " << smToCompute << " shape type " << shapeType << std::endl;
if (smToCompute->GetComputeState() == SMESH_subMesh::READY_TO_COMPUTE)
{
if (_compute_canceled)
return false;
smToCompute->SetAllowedSubShapes( fillAllowed( shapeSM, aShapeOnly, allowedSubShapes ));
setCurrentSubMesh( smToCompute );
smToCompute->ComputeStateEngine( computeEvent );
setCurrentSubMesh( nullptr );
smToCompute->SetAllowedSubShapes( nullptr );
}
// we check all the sub-meshes here and detect if any of them failed to compute
if (smToCompute->GetComputeState() == SMESH_subMesh::FAILED_TO_COMPUTE &&
( shapeType != TopAbs_EDGE || !SMESH_Algo::isDegenerated( TopoDS::Edge( shape ))))
ret = false;
else if ( aShapesId )
aShapesId->insert( smToCompute->GetId() );
for(auto it =std::begin(pending); it != std::end(pending); ++it){
it->wait();
}
pending.clear();
//aMesh.GetMeshDS()->Modified();
return ret;
}

View File

@ -34,6 +34,8 @@
#include "SMESH_Algo.hxx"
#include "SMESH_ComputeError.hxx"
#include "ctpl.h"
#include <map>
#include <list>
@ -41,6 +43,7 @@
#include <vector>
#include <string>
#include <TopoDS_Shape.hxx>
#include <TopTools_IndexedMapOfShape.hxx>
@ -184,6 +187,8 @@ private:
volatile bool _compute_canceled;
std::list< SMESH_subMesh* > _sm_current;
// TODO: Replace by number of thread
ctpl::thread_pool * _pool = nullptr; //thread pool for computation
};
#endif

View File

@ -43,6 +43,8 @@
#include "MEDCouplingMemArray.hxx"
#include "ctpl.h"
#include <map>
#include <list>
#include <vector>
@ -382,6 +384,9 @@ class SMESH_EXPORT SMESH_Mesh
std::ostream& Dump(std::ostream & save);
void Lock() {_my_lock.lock();};
void Unlock() {_my_lock.unlock();};
private:
void exportMEDCommmon(DriverMED_W_SMESHDS_Mesh& myWriter,
@ -428,6 +433,9 @@ protected:
// 2) to forget not loaded mesh data at hyp modification
TCallUp* _callUp;
// Mutex for multhitreading write in SMESH_Mesh
std::mutex _my_lock;
protected:
SMESH_Mesh();
SMESH_Mesh(const SMESH_Mesh&) {};

View File

@ -1392,12 +1392,14 @@ bool SMESH_subMesh::ComputeStateEngine(compute_event event)
else if (( event == COMPUTE || event == COMPUTE_SUBMESH )
&& !_alwaysComputed )
{
_father->Lock();
const TopoDS_Vertex & V = TopoDS::Vertex( _subShape );
gp_Pnt P = BRep_Tool::Pnt(V);
if ( SMDS_MeshNode * n = _father->GetMeshDS()->AddNode(P.X(), P.Y(), P.Z()) ) {
_father->GetMeshDS()->SetNodeOnVertex(n,_Id);
_computeState = COMPUTE_OK;
}
_father->Unlock();
}
if ( event == MODIF_ALGO_STATE )
cleanDependants();
@ -1500,6 +1502,7 @@ bool SMESH_subMesh::ComputeStateEngine(compute_event event)
case COMPUTE:
case COMPUTE_SUBMESH:
{
_father->Lock();
algo = GetAlgo();
ASSERT(algo);
ret = algo->CheckHypothesis((*_father), _subShape, hyp_status);
@ -1541,6 +1544,7 @@ bool SMESH_subMesh::ComputeStateEngine(compute_event event)
break; // goto exit
}
}
_father->Unlock();
// Compute
// to restore cout that may be redirected by algo
@ -1569,6 +1573,7 @@ bool SMESH_subMesh::ComputeStateEngine(compute_event event)
}
else
{
std::cout<<"Running compute for " << _father << " of shape type " << shape.ShapeType() << std::endl;
ret = algo->Compute((*_father), shape);
}
// algo can set _computeError of submesh
@ -1729,7 +1734,9 @@ bool SMESH_subMesh::ComputeStateEngine(compute_event event)
updateDependantsState( SUBMESH_COMPUTED );
}
// let algo clear its data gathered while algo->Compute()
_father->Lock();
algo->CheckHypothesis((*_father), _subShape, hyp_status);
_father->Unlock();
}
break;
case COMPUTE_CANCELED: // nothing to do
@ -1897,7 +1904,7 @@ bool SMESH_subMesh::ComputeStateEngine(compute_event event)
break;
}
notifyListenersOnEvent( event, COMPUTE_EVENT );
//notifyListenersOnEvent( event, COMPUTE_EVENT );
return ret;
}

240
src/SMESH/ctpl.h Normal file
View File

@ -0,0 +1,240 @@
/*********************************************************
*
* Copyright (C) 2014 by Vitaliy Vitsentiy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*********************************************************/
#ifndef __ctpl_thread_pool_H__
#define __ctpl_thread_pool_H__
#include <functional>
#include <thread>
#include <atomic>
#include <vector>
#include <memory>
#include <exception>
#include <future>
#include <mutex>
#include <boost/lockfree/queue.hpp>
#ifndef _ctplThreadPoolLength_
#define _ctplThreadPoolLength_ 100
#endif
// thread pool to run user's functors with signature
// ret func(int id, other_params)
// where id is the index of the thread that runs the functor
// ret is some return type
namespace ctpl {
class thread_pool {
public:
thread_pool() : q(_ctplThreadPoolLength_) { this->init(); }
thread_pool(int nThreads, int queueSize = _ctplThreadPoolLength_) : q(queueSize) { this->init(); this->resize(nThreads); }
// the destructor waits for all the functions in the queue to be finished
~thread_pool() {
this->stop(true);
}
// get the number of running threads in the pool
int size() { return static_cast<int>(this->threads.size()); }
// number of idle threads
int n_idle() { return this->nWaiting; }
std::thread & get_thread(int i) { return *this->threads[i]; }
// change the number of threads in the pool
// should be called from one thread, otherwise be careful to not interleave, also with this->stop()
// nThreads must be >= 0
void resize(int nThreads) {
if (!this->isStop && !this->isDone) {
int oldNThreads = static_cast<int>(this->threads.size());
if (oldNThreads <= nThreads) { // if the number of threads is increased
this->threads.resize(nThreads);
this->flags.resize(nThreads);
for (int i = oldNThreads; i < nThreads; ++i) {
this->flags[i] = std::make_shared<std::atomic<bool>>(false);
this->set_thread(i);
}
}
else { // the number of threads is decreased
for (int i = oldNThreads - 1; i >= nThreads; --i) {
*this->flags[i] = true; // this thread will finish
this->threads[i]->detach();
}
{
// stop the detached threads that were waiting
std::unique_lock<std::mutex> lock(this->mutex);
this->cv.notify_all();
}
this->threads.resize(nThreads); // safe to delete because the threads are detached
this->flags.resize(nThreads); // safe to delete because the threads have copies of shared_ptr of the flags, not originals
}
}
}
// empty the queue
void clear_queue() {
std::function<void(int id)> * _f;
while (this->q.pop(_f))
delete _f; // empty the queue
}
// pops a functional wraper to the original function
std::function<void(int)> pop() {
std::function<void(int id)> * _f = nullptr;
this->q.pop(_f);
std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
std::function<void(int)> f;
if (_f)
f = *_f;
return f;
}
// wait for all computing threads to finish and stop all threads
// may be called asyncronously to not pause the calling thread while waiting
// if isWait == true, all the functions in the queue are run, otherwise the queue is cleared without running the functions
void stop(bool isWait = false) {
if (!isWait) {
if (this->isStop)
return;
this->isStop = true;
for (int i = 0, n = this->size(); i < n; ++i) {
*this->flags[i] = true; // command the threads to stop
}
this->clear_queue(); // empty the queue
}
else {
if (this->isDone || this->isStop)
return;
this->isDone = true; // give the waiting threads a command to finish
}
{
std::unique_lock<std::mutex> lock(this->mutex);
this->cv.notify_all(); // stop all waiting threads
}
for (int i = 0; i < static_cast<int>(this->threads.size()); ++i) { // wait for the computing threads to finish
if (this->threads[i]->joinable())
this->threads[i]->join();
}
// if there were no threads in the pool but some functors in the queue, the functors are not deleted by the threads
// therefore delete them here
this->clear_queue();
this->threads.clear();
this->flags.clear();
}
template<typename F, typename... Rest>
auto push(F && f, Rest&&... rest) ->std::future<decltype(f(0, rest...))> {
auto pck = std::make_shared<std::packaged_task<decltype(f(0, rest...))(int)>>(
std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<Rest>(rest)...)
);
auto _f = new std::function<void(int id)>([pck](int id) {
(*pck)(id);
});
this->q.push(_f);
std::unique_lock<std::mutex> lock(this->mutex);
this->cv.notify_one();
return pck->get_future();
}
// run the user's function that excepts argument int - id of the running thread. returned value is templatized
// operator returns std::future, where the user can get the result and rethrow the catched exceptins
template<typename F>
auto push(F && f) ->std::future<decltype(f(0))> {
auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(std::forward<F>(f));
auto _f = new std::function<void(int id)>([pck](int id) {
(*pck)(id);
});
this->q.push(_f);
std::unique_lock<std::mutex> lock(this->mutex);
this->cv.notify_one();
return pck->get_future();
}
private:
// deleted
thread_pool(const thread_pool &);// = delete;
thread_pool(thread_pool &&);// = delete;
thread_pool & operator=(const thread_pool &);// = delete;
thread_pool & operator=(thread_pool &&);// = delete;
void set_thread(int i) {
std::shared_ptr<std::atomic<bool>> flag(this->flags[i]); // a copy of the shared ptr to the flag
auto f = [this, i, flag/* a copy of the shared ptr to the flag */]() {
std::atomic<bool> & _flag = *flag;
std::function<void(int id)> * _f;
bool isPop = this->q.pop(_f);
while (true) {
while (isPop) { // if there is anything in the queue
std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
(*_f)(i);
if (_flag)
return; // the thread is wanted to stop, return even if the queue is not empty yet
else
isPop = this->q.pop(_f);
}
// the queue is empty here, wait for the next command
std::unique_lock<std::mutex> lock(this->mutex);
++this->nWaiting;
this->cv.wait(lock, [this, &_f, &isPop, &_flag](){ isPop = this->q.pop(_f); return isPop || this->isDone || _flag; });
--this->nWaiting;
if (!isPop)
return; // if the queue is empty and this->isDone == true or *flag then return
}
};
this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique()
}
void init() { this->nWaiting = 0; this->isStop = false; this->isDone = false; }
std::vector<std::unique_ptr<std::thread>> threads;
std::vector<std::shared_ptr<std::atomic<bool>>> flags;
mutable boost::lockfree::queue<std::function<void(int id)> *> q;
std::atomic<bool> isDone;
std::atomic<bool> isStop;
std::atomic<int> nWaiting; // how many threads are waiting
std::mutex mutex;
std::condition_variable cv;
};
}
#endif // __ctpl_thread_pool_H__

View File

@ -1182,6 +1182,7 @@ bool StdMeshers_Regular_1D::Compute(SMESH_Mesh & theMesh, const TopoDS_Shape & t
SMESHDS_Mesh * meshDS = theMesh.GetMeshDS();
theMesh.Lock();
const TopoDS_Edge & EE = TopoDS::Edge(theShape);
TopoDS_Edge E = TopoDS::Edge(EE.Oriented(TopAbs_FORWARD));
int shapeID = meshDS->ShapeToIndex( E );
@ -1196,9 +1197,10 @@ bool StdMeshers_Regular_1D::Compute(SMESH_Mesh & theMesh, const TopoDS_Shape & t
ASSERT(!VLast.IsNull());
const SMDS_MeshNode * nFirst = SMESH_Algo::VertexNode( VFirst, meshDS );
const SMDS_MeshNode * nLast = SMESH_Algo::VertexNode( VLast, meshDS );
if ( !nFirst || !nLast )
if ( !nFirst || !nLast ){
theMesh.Unlock();
return error( COMPERR_BAD_INPUT_MESH, "No node on vertex");
}
// remove elements created by e.g. pattern mapping (PAL21999)
// CLEAN event is incorrectly ptopagated seemingly due to Propagation hyp
// so TEMPORARY solution is to clean the submesh manually
@ -1242,6 +1244,7 @@ bool StdMeshers_Regular_1D::Compute(SMESH_Mesh & theMesh, const TopoDS_Shape & t
BRepAdaptor_Curve C3d( E );
if ( ! computeInternalParameters( theMesh, C3d, length, f, l, params, reversed, true )) {
theMesh.Unlock();
return false;
}
redistributeNearVertices( theMesh, C3d, length, params, VFirst, VLast );
@ -1332,6 +1335,7 @@ bool StdMeshers_Regular_1D::Compute(SMESH_Mesh & theMesh, const TopoDS_Shape & t
meshDS->SetMeshElementOnShape(edge, shapeID);
}
}
theMesh.Unlock();
return true;
}