From 402e7633059552517fe41708cd9d0492fda38aa0 Mon Sep 17 00:00:00 2001 From: L-Nafaryus Date: Mon, 30 Aug 2021 21:05:11 +0500 Subject: [PATCH] Mod: working multiprocessing mode --- anisotropy/core/database.py | 35 +++++++++++++++++++++++++++-------- anisotropy/core/main.py | 3 ++- anisotropy/core/utils.py | 18 ++++++++++-------- 3 files changed, 39 insertions(+), 17 deletions(-) diff --git a/anisotropy/core/database.py b/anisotropy/core/database.py index f06616a..e953121 100644 --- a/anisotropy/core/database.py +++ b/anisotropy/core/database.py @@ -14,9 +14,27 @@ from anisotropy.core.models import ( Mesh, SubMesh, MeshResult, Flow, FlowApproximation, FlowResult ) +from peewee import OperationalError logger = logging.getLogger(env["logger_name"]) -setupLogger(logger, logging.INFO, env["LOG"]) +#setupLogger(logger, logging.INFO, env["LOG"]) + +# NOTE: Temporary solution for multiprocessing mode when database is locked +def tryUntilDone(func): + def inner(*args, **kwargs): + done = False + + while not done: + try: + ret = func(*args, **kwargs) + done = True + + except OperationalError: + pass + + return ret + + return inner class Database(object): @@ -155,20 +173,21 @@ class Database(object): ) ) - structureID = self._updateStructure(params["structure"], query) + structureID = tryUntilDone(self._updateStructure)(params["structure"], query) - meshID = self._updateMesh(params["mesh"], query, structureID) + meshID = tryUntilDone(self._updateMesh)(params["mesh"], query, structureID) for submeshParams in params.get("submesh", []): - self._updateSubMesh(submeshParams, query, meshID) + tryUntilDone(self._updateSubMesh)(submeshParams, query, meshID) - self._updateMeshResult(params.get("meshresult", {}), query, meshID) + tryUntilDone(self._updateMeshResult)(params.get("meshresult", {}), query, meshID) - flowID = self._updateFlow(params["flow"], query, structureID) + flowID = tryUntilDone(self._updateFlow)(params["flow"], query, structureID) - self._updateFlowApproximation(params.get("flowapprox", {}), query, flowID) + tryUntilDone(self._updateFlowApproximation)(params.get("flowapprox", {}), query, flowID) + + tryUntilDone(self._updateFlowResult)(params.get("flowresult", {}), query, flowID) - self._updateFlowResult(params.get("flowresult", {}), query, flowID) def _updateStructure(self, src: dict, queryMain) -> int: raw = deepcopy(src) diff --git a/anisotropy/core/main.py b/anisotropy/core/main.py index 712c80f..f4a7c09 100644 --- a/anisotropy/core/main.py +++ b/anisotropy/core/main.py @@ -250,13 +250,14 @@ class Anisotropy(object): path ] manager = salomepl.utils.SalomeManager() + casepath = self.getCasePath(path) return manager.execute( scriptpath, *salomeargs, timeout = self.env["salome_timeout"], root = self.env["ROOT"], - logpath = self.env["LOG"] + logpath = os.path.join(casepath, "logs") ) diff --git a/anisotropy/core/utils.py b/anisotropy/core/utils.py index b057153..7f1bdff 100644 --- a/anisotropy/core/utils.py +++ b/anisotropy/core/utils.py @@ -18,7 +18,7 @@ class CustomFormatter(logging.Formatter): red = "\x1b[31;21m" bold_red = "\x1b[31;1m" reset = "\x1b[0m" - format = "[ %(asctime)s ] [ %(levelname)s ] %(message)s" + format = "[ %(asctime)s ] [ %(processName)s ] [ %(levelname)s ] %(message)s" formats = { logging.DEBUG: grey + format + reset, @@ -41,12 +41,14 @@ class CustomFormatter(logging.Formatter): def setupLogger(logger, level: int, filepath: str = None): """Applies settings to logger - :param logger: Instance of :class:`logging.Logger` - :type logger: Instance of :class:`logging.Logger` - :param level: Logging level (logging.INFO, logging.WARNING, ..) - :type level: int - :param filepath: Path to directory - :type filepath: str, optional + :param logger: + Instance of :class:`logging.Logger` + + :param level: + Logging level (logging.INFO, logging.WARNING, ..) + + :param filepath: + Path to directory """ logger.handlers = [] logger.setLevel(level) @@ -215,7 +217,7 @@ def queue(cmd, qin, qout, *args): break # Execute command - res = cmd(var, *args) + res = cmd(*var, *args) # Put results to the queue qout.put((pos, res))