Mod: working database improvements

Fix: FoamRunner exceptions
Mod: configs for subprocess runners
This commit is contained in:
L-Nafaryus 2021-12-20 20:52:46 +05:00
parent 6601525d22
commit 8e769ec1ce
No known key found for this signature in database
GPG Key ID: C76D8DCD2727DBB7
7 changed files with 295 additions and 109 deletions

View File

@ -161,6 +161,9 @@ def compute(path, configFile, nprocs, stage, overwrite, params, verbose):
logger.info(f"Loading file from { filepath }")
config.load(configFile)
else:
logger.info("Using default configuration")
config.update(
nprocs = nprocs,
stage = stage,

View File

@ -11,7 +11,8 @@ from numpy import arange, array, round
class Config(object):
def __init__(self):
self.content = {}
self.options = {},
self.options = {}
self.params = {}
self.cases = None
def __getitem__(self, key):
@ -54,9 +55,16 @@ class Config(object):
with open(path, "w") as io:
toml.dump(self.content, io, encoder = toml.TomlNumpyEncoder())
def minimize(self):
self.content = None
self.cases = None
def purge(self):
self.content = {}
self.cases = {}
self.minimize()
self.params = None
def copy(self):
return deepcopy(self)
def expand(self):
self.cases = []
@ -82,6 +90,12 @@ class Config(object):
"filletsEnabled": structure["filletsEnabled"]
})
def chooseParams(self, idn: int):
if len(self.cases) > 0:
self.params = self.cases[idn]
else:
raise IndexError("list index out of range in cause of zero length of 'cases'")
class DefaultConfig(Config):
def __init__(self):

View File

@ -22,29 +22,31 @@ from anisotropy.solving import OnePhaseFlow
from multiprocessing import current_process, parent_process
class UltimateRunner(object):
def __init__(self, config = None, t_exec = None, t_shape = None):
def __init__(self, config = None, exec_id: int = None): #t_exec = None, t_shape = None):
# Configuration file
self.config = config or DefaultConfig()
# Process recognition
typo = True if not t_exec else False
typo = True if not exec_id else False
#if current_process().name == "MainProcess" and parent_process() == None:
# current_process().name = "master"
# Database preparation
if typo: #current_process().name == "master":
self.prepareDatabase()
self.database = Database(path = self.config["database"])
if typo: #current_process().name == "master":
with self.database:
self.t_exec = T.Execution(date = datetime.now())
self.t_exec.save()
self.t_shape = None
#self.t_shape = None
else:
self.t_exec = t_exec
self.t_shape = t_shape
#self.t_exec = self.database.getExecution(exec_id)
self.exec_id = exec_id
#self.t_exec = t_exec
#self.t_shape = t_shape
# Parameters
self.shape = None
@ -54,35 +56,37 @@ class UltimateRunner(object):
self.queue = []
def prepareDatabase(self):
# NOTE: separate function in cause of unpicklability of connections (use after process is started)
self.database = Database(path = self.config["database"])
def createRow(self):
# create a row in each table for the current case
with self.database:
self.t_mesh = T.Mesh(t_exec = self.t_exec, shape_id = self.t_shape)
self.t_mesh.save()
self.t_flow = T.FlowOnephase(t_exec = self.t_exec, mesh_id = self.t_mesh)
self.t_flow.save()
t_shape = T.Shape(exec_id = self.exec_id, **self.config.params)
t_shape.save()
t_mesh = T.Mesh(shape_id = t_shape.shape_id)
t_mesh.save()
t_flow = T.FlowOnephase(mesh_id = t_mesh.mesh_id)
t_flow.save()
def fill(self):
self.config.expand()
logger.info(f"Preparing queue: { len(self.config.cases) }")
for case in self.config.cases:
with self.database:
t_shape = T.Shape(
exec_id = self.t_exec,
**case
)
t_shape.save()
for idn, case in enumerate(self.config.cases):
config = self.config.copy()
config.chooseParams(idn)
config.minimize()
#with self.database:
# t_shape = T.Shape(
# exec_id = self.t_exec,
# **case
# )
# t_shape.save()
self.queue.append(UltimateRunner(
config = self.config,
t_exec = self.t_exec,
t_shape = t_shape
config = config,
exec_id = self.t_exec.exec_id
#t_exec = self.t_exec,
#t_shape = t_shape
))
@ -100,15 +104,24 @@ class UltimateRunner(object):
# TODO: if runner done - remove from queue; results from parallel function
def casepath(self):
with self.database:
params = T.Shape.get(
T.Shape.exec_id == self.t_exec,
T.Shape.shape_id == self.t_shape.shape_id
)
#with self.database:
# params = T.Shape.get(
# T.Shape.exec_id == self.t_exec,
# T.Shape.shape_id == self.t_shape.shape_id
# )
params = self.config.params
shapeParams = self.database.getShape(
params["label"],
params["direction"],
params["alpha"],
self.exec_id
)
direction = "direction-[{},{},{}]".format(*[ str(d) for d in params.direction ])
alpha = "alpha-{}".format(params.alpha)
dirpath = path.join(self.config["build"], params.label, direction, alpha)
execution = "execution-{}".format(self.exec_id)
case = "{}-[{},{},{}]-{}".format(params["label"], *[ str(d) for d in params["direction"] ], params["alpha"])
#alpha = "alpha-{}".format(shapeParams.alpha)
#dirpath = path.join(self.config["build"], shapeParams.label, direction, alpha)
dirpath = path.join(self.config["build"], execution, case)
return path.abspath(dirpath)
@ -116,14 +129,22 @@ class UltimateRunner(object):
#if current_process().name == "master":
# return
with self.database:
params = T.Shape.get(
T.Shape.exec_id == self.t_exec,
T.Shape.shape_id == self.t_shape.shape_id
)
#with self.database:
# params = T.Shape.get(
# T.Shape.exec_id == self.t_exec,
# T.Shape.shape_id == self.t_shape.shape_id
# )
params = self.config.params
shapeParams = self.database.getShape(
params["label"],
params["direction"],
params["alpha"],
self.exec_id
)
logger.info("Computing shape for {} with direction = {} and alpha = {}".format(
params.label, params.direction, params.alpha
params["label"], params["direction"], params["alpha"]
))
filename = "shape.step"
timer = Timer()
@ -132,46 +153,55 @@ class UltimateRunner(object):
"simple": Simple,
"bodyCentered": BodyCentered,
"faceCentered": FaceCentered
}[params.label]
}[shapeParams.label]
self.shape = shape(
direction = params.direction,
alpha = params.alpha,
r0 = params.r0,
filletsEnabled = params.filletsEnabled
direction = shapeParams.direction,
alpha = shapeParams.alpha,
r0 = shapeParams.r0,
filletsEnabled = shapeParams.filletsEnabled
)
#out, err, returncode = self.shape.build()
# TODO: wrap build function for exceptions
self.shape.build()
os.makedirs(self.casepath(), exist_ok = True)
out, err, returncode = self.shape.export(path.join(self.casepath(), filename))
if returncode == 0:
params.shapeStatus = "done"
shapeParams.shapeStatus = "done"
else:
logger.error(err)
params.shapeStatus = "failed"
shapeParams.shapeStatus = "failed"
with self.database:
params.shapeExecutionTime = timer.elapsed()
params.save()
shapeParams.shapeExecutionTime = timer.elapsed()
shapeParams.save()
def computeMesh(self):
#if not self.type == "worker":
# return
with self.database:
t_params = T.Shape.get(
T.Shape.exec_id == self.t_exec,
T.Shape.shape_id == self.t_shape.shape_id
)
params = T.Mesh.get(
T.Mesh.shape_id == self.t_shape.shape_id
)
#with self.database:
# t_params = T.Shape.get(
# T.Shape.exec_id == self.t_exec,
# T.Shape.shape_id == self.t_shape.shape_id
# )
# params = T.Mesh.get(
# T.Mesh.shape_id == self.t_shape.shape_id
# )
params = self.config.params
meshParams = self.database.getMesh(
params["label"],
params["direction"],
params["alpha"],
self.exec_id
)
logger.info("Computing mesh for {} with direction = {} and alpha = {}".format(
t_params.label, t_params.direction, t_params.alpha
params["label"], params["direction"], params["alpha"]
))
filename = "mesh.mesh"
timer = Timer()
@ -179,40 +209,49 @@ class UltimateRunner(object):
# TODO: load from object or file
self.mesh = Mesh(self.shape.shape)
#out, err, returncode = self.mesh.build()
# TODO: wrap build function for exceptions
self.mesh.build()
os.makedirs(self.casepath(), exist_ok = True)
out, err, returncode = self.mesh.export(path.join(self.casepath(), filename))
if returncode == 0:
params.meshStatus = "done"
meshParams.meshStatus = "done"
else:
logger.error(err)
params.meshStatus = "failed"
meshParams.meshStatus = "failed"
with self.database:
params.meshExecutionTime = timer.elapsed()
params.save()
meshParams.meshExecutionTime = timer.elapsed()
meshParams.save()
def computeFlow(self):
# if not self.type == "worker":
# return
with self.database:
t_params = T.Shape.get(
T.Shape.exec_id == self.t_exec,
T.Shape.shape_id == self.t_shape.shape_id
)
m_params = T.Mesh.get(
T.Mesh.shape_id == self.t_shape.shape_id
)
params = T.FlowOnephase.get(
T.FlowOnephase.mesh_id == self.t_mesh.mesh_id
)
#with self.database:
# t_params = T.Shape.get(
# T.Shape.exec_id == self.t_exec,
# T.Shape.shape_id == self.t_shape.shape_id
# )
# m_params = T.Mesh.get(
# T.Mesh.shape_id == self.t_shape.shape_id
# )
# params = T.FlowOnephase.get(
# T.FlowOnephase.mesh_id == self.t_mesh.mesh_id
# )
params = self.config.params
flowParams = self.database.getFlowOnephase(
params["label"],
params["direction"],
params["alpha"],
self.exec_id
)
logger.info("Computing flow for {} with direction = {} and alpha = {}".format(
t_params.label, t_params.direction, t_params.alpha
params["label"], params["direction"], params["alpha"]
))
timer = Timer()
@ -274,18 +313,18 @@ class UltimateRunner(object):
logger.error(e, exc_info = True)
if returncode == 0:
params.flowStatus = "done"
flowParams.flowStatus = "done"
else:
#logger.error(err)
params.flowStatus = "failed"
flowParams.flowStatus = "failed"
with self.database:
params.flowExecutionTime = timer.elapsed()
params.save()
flowParams.flowExecutionTime = timer.elapsed()
flowParams.save()
def pipeline(self, stage: str = None):
self.prepareDatabase()
self.database = Database(path = self.config["database"])
self.createRow()
stage = stage or self.config["stage"]

View File

@ -39,9 +39,8 @@ class Database(SqliteDatabase):
)
models.__database_proxy__.initialize(self)
self.connect()
self.create_tables(self.tables)
self.close()
with self:
self.create_tables(self.tables)
def getExecution(self, idn):
query = models.Execution.select().where(models.Execution.exec_id == idn)
@ -53,10 +52,9 @@ class Database(SqliteDatabase):
def getLatest(self):
query = models.Execution.select()
#self.connect()
with self:
table = query[-1] if query.exists() else None
#self.close()
return table
@ -67,15 +65,15 @@ class Database(SqliteDatabase):
.select()
.join(models.Execution, JOIN.LEFT_OUTER)
.where(
models.Execution.exec_id == execution.exec_id,
models.Execution.exec_id == execution,
models.Shape.label == label,
models.Shape.direction == direction,
models.Shape.alpha == alpha
)
)
self.connect()
table = query.get() if query.exists() else None
self.close()
with self:
table = query.get() if query.exists() else None
return table
@ -87,35 +85,35 @@ class Database(SqliteDatabase):
.join(models.Shape, JOIN.LEFT_OUTER)
.join(models.Execution, JOIN.LEFT_OUTER)
.where(
models.Execution.exec_id == execution.exec_id,
models.Execution.exec_id == execution,
models.Shape.label == label,
models.Shape.direction == direction,
models.Shape.alpha == alpha
)
)
self.connect()
table = query.get() if query.exists() else None
self.close()
with self:
table = query.get() if query.exists() else None
return table
def getFlowOnephase(self, label, direction, alpha, execution = None):
execution = execution or self.getLatest()
query = (
models.Mesh
models.FlowOnephase
.select()
.join(models.Mesh, JOIN.LEFT_OUTER)
.join(models.Shape, JOIN.LEFT_OUTER)
.join(models.Execution, JOIN.LEFT_OUTER)
.where(
models.Execution.exec_id == execution.exec_id,
models.Execution.exec_id == execution,
models.Shape.label == label,
models.Shape.direction == direction,
models.Shape.alpha == alpha
)
)
self.connect()
table = query.get() if query.exists() else None
self.close()
with self:
table = query.get() if query.exists() else None
return table

View File

@ -11,7 +11,7 @@ from peewee import (
TimeField, DateTimeField, Proxy
)
from .utils import JSONField
#from playhouse.sqlite_ext import JSONField
__database_proxy__ = Proxy()
class Execution(Model):

View File

@ -2,7 +2,16 @@
# This file is part of anisotropy.
# License: GNU GPL version 3, see the file "LICENSE" for details.
from peewee import TextField
from peewee import (
TextField,
ColumnBase,
Value,
fn,
Node,
Expression,
OP,
Field
)
import json
from numpy import ndarray
@ -24,7 +33,7 @@ class ListField(TextField):
return pval
"""
class JSONField(TextField):
# TODO: fix double quotes when use __eq__ in 'where' method
field_type = "TEXT"
@ -41,3 +50,126 @@ class JSONField(TextField):
def python_value(self, value):
if value is not None:
return json.loads(value)
"""
class JSONPath(ColumnBase):
def __init__(self, field, path=None):
super(JSONPath, self).__init__()
self._field = field
self._path = path or ()
@property
def path(self):
return Value('$%s' % ''.join(self._path))
def __getitem__(self, idx):
if isinstance(idx, int):
item = '[%s]' % idx
else:
item = '.%s' % idx
return JSONPath(self._field, self._path + (item,))
def set(self, value, as_json=None):
if as_json or isinstance(value, (list, dict)):
value = fn.json(self._field._json_dumps(value))
return fn.json_set(self._field, self.path, value)
def update(self, value):
return self.set(fn.json_patch(self, self._field._json_dumps(value)))
def remove(self):
return fn.json_remove(self._field, self.path)
def json_type(self):
return fn.json_type(self._field, self.path)
def length(self):
return fn.json_array_length(self._field, self.path)
def children(self):
return fn.json_each(self._field, self.path)
def tree(self):
return fn.json_tree(self._field, self.path)
def __sql__(self, ctx):
return ctx.sql(fn.json_extract(self._field, self.path)
if self._path else self._field)
class JSONField(TextField):
field_type = 'TEXT'
unpack = False
def __init__(self, json_dumps=None, json_loads=None, **kwargs):
self._json_dumps = json_dumps or json.dumps
self._json_loads = json_loads or json.loads
super(JSONField, self).__init__(**kwargs)
def python_value(self, value):
if value is not None:
try:
return json.loads(value)
except (TypeError, ValueError):
return value
def db_value(self, value):
if value is not None:
if isinstance(value, ndarray):
value = list(value)
if not isinstance(value, Node):
value = json.dumps(value)
return value
def _e(op):
def inner(self, rhs):
if isinstance(rhs, (list, dict)):
rhs = Value(rhs, converter=self.db_value, unpack=False)
return Expression(self, op, rhs)
return inner
__eq__ = _e(OP.EQ)
__ne__ = _e(OP.NE)
__gt__ = _e(OP.GT)
__ge__ = _e(OP.GTE)
__lt__ = _e(OP.LT)
__le__ = _e(OP.LTE)
__hash__ = Field.__hash__
def __getitem__(self, item):
return JSONPath(self)[item]
def set(self, value, as_json=None):
return JSONPath(self).set(value, as_json)
def update(self, data):
return JSONPath(self).update(data)
def remove(self):
return JSONPath(self).remove()
def json_type(self):
return fn.json_type(self)
def length(self):
return fn.json_array_length(self)
def children(self):
"""
Schema of `json_each` and `json_tree`:
key,
value,
type TEXT (object, array, string, etc),
atom (value for primitive/scalar types, NULL for array and object)
id INTEGER (unique identifier for element)
parent INTEGER (unique identifier of parent element or NULL)
fullkey TEXT (full path describing element)
path TEXT (path to the container of the current element)
json JSON hidden (1st input parameter to function)
root TEXT hidden (2nd input parameter, path at which to start)
"""
return fn.json_each(self)
def tree(self):
return fn.json_tree(self)

View File

@ -52,7 +52,7 @@ class FoamRunner(object):
self.returncode = proc.returncode
except FileNotFoundError as err:
self.error = err
self.error = err.args[1]
self.returncode = 2
logger.error(self.error, exc_info = True)
@ -68,6 +68,6 @@ class FoamRunner(object):
io.write(f"Exit code { self.returncode }")
if not self.returncode == 0 and self.exit:
raise Exception(f"Subprocess failed: { proc.args }")
raise Exception(f"Subprocess failed: { self.error }")
return self.output, self.error, self.returncode