Mod: working mesh stage
This commit is contained in:
parent
7454714d70
commit
ace2cd0247
@ -61,22 +61,22 @@ class Config(object):
|
||||
def expand(self):
|
||||
self.cases = []
|
||||
|
||||
# Expand structures for each direction and each theta
|
||||
# Expand structures for each direction and each alpha
|
||||
for structure in self.content["structures"]:
|
||||
# ISSUE: precision error 0.06999999999999999
|
||||
thetaA = round(arange(
|
||||
structure["theta"][0],
|
||||
structure["theta"][1] + structure["thetaStep"],
|
||||
structure["thetaStep"]
|
||||
alphaA = round(arange(
|
||||
structure["alpha"][0],
|
||||
structure["alpha"][1] + structure["alphaStep"],
|
||||
structure["alphaStep"]
|
||||
), 9)
|
||||
directionA = array(structure["directions"], dtype = float)
|
||||
|
||||
for direction in directionA:
|
||||
for theta in thetaA:
|
||||
for alpha in alphaA:
|
||||
self.cases.append({
|
||||
"label": structure["label"],
|
||||
"theta": theta,
|
||||
"thetaStep": structure["thetaStep"],
|
||||
"alpha": alpha,
|
||||
"alphaStep": structure["alphaStep"],
|
||||
"direction": direction,
|
||||
"r0": structure["r0"],
|
||||
"filletsEnabled": structure["filletsEnabled"]
|
||||
@ -101,13 +101,13 @@ class DefaultConfig(Config):
|
||||
self.options = deepcopy(self.content["options"])
|
||||
|
||||
labels = ["simple", "bodyCentered", "faceCentered"]
|
||||
thetas = [[0.01, 0.28], [0.01, 0.17], [0.01, 0.13]]
|
||||
alphas = [[0.01, 0.28], [0.01, 0.17], [0.01, 0.13]]
|
||||
|
||||
for label, theta in zip(labels, thetas):
|
||||
for label, alpha in zip(labels, alphas):
|
||||
self.content["structures"].append({
|
||||
"label": label,
|
||||
"theta": theta,
|
||||
"thetaStep": 0.01,
|
||||
"alpha": alpha,
|
||||
"alphaStep": 0.01,
|
||||
"directions": [[1, 0, 0], [0, 0, 1], [1, 1, 1]],
|
||||
"r0": 1,
|
||||
"filletsEnabled": True
|
||||
|
@ -9,7 +9,7 @@ from os import path
|
||||
from anisotropy.core.config import DefaultConfig
|
||||
|
||||
import logging
|
||||
from anisotropy.core.utils import parallel, ParallelRunner, setupLogger
|
||||
from anisotropy.core.utils import ParallelRunner, Timer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -21,29 +21,34 @@ from anisotropy.shaping import Simple, BodyCentered, FaceCentered
|
||||
from anisotropy.meshing import Mesh
|
||||
from anisotropy.openfoam.presets import CreatePatchDict
|
||||
from anisotropy.solving.onephase import OnePhaseFlow
|
||||
|
||||
from multiprocessing import current_process, parent_process
|
||||
|
||||
class UltimateRunner(object):
|
||||
def __init__(self, config = None, exec_id = None, t_shape = None):
|
||||
|
||||
def __init__(self, config = None, t_exec = None, t_shape = None):
|
||||
# Configuration file
|
||||
self.config = config or DefaultConfig()
|
||||
|
||||
self.type = "master" if not exec_id else "worker"
|
||||
# Process recognition
|
||||
typo = True if not t_exec else False
|
||||
#if current_process().name == "MainProcess" and parent_process() == None:
|
||||
# current_process().name = "master"
|
||||
|
||||
if self.type == "master":
|
||||
# Database preparation
|
||||
if typo: #current_process().name == "master":
|
||||
self.prepareDatabase()
|
||||
|
||||
if self.type == "master":
|
||||
if typo: #current_process().name == "master":
|
||||
with self.database:
|
||||
self.exec_id = T.Execution(date = datetime.now())
|
||||
self.exec_id.save()
|
||||
self.t_exec = T.Execution(date = datetime.now())
|
||||
self.t_exec.save()
|
||||
|
||||
self.t_shape = None
|
||||
|
||||
else:
|
||||
self.exec_id = exec_id
|
||||
self.t_exec = t_exec
|
||||
self.t_shape = t_shape
|
||||
|
||||
# Parameters
|
||||
self.shape = None
|
||||
self.mesh = None
|
||||
self.flow = None
|
||||
@ -52,10 +57,19 @@ class UltimateRunner(object):
|
||||
|
||||
|
||||
def prepareDatabase(self):
|
||||
# NOTE: separate function in cause of unpicklability of connections
|
||||
# NOTE: separate function in cause of unpicklability of connections (use after process is started)
|
||||
self.database = database
|
||||
self.database.setup(self.config["database"])
|
||||
|
||||
def createRow(self):
|
||||
# create a row in each table for the current case
|
||||
with self.database:
|
||||
|
||||
self.t_mesh = T.Mesh(t_exec = self.t_exec, shape_id = self.t_shape)
|
||||
self.t_mesh.save()
|
||||
self.t_flow = T.FlowOnephase(t_exec = self.t_exec, mesh_id = self.t_mesh)
|
||||
self.t_flow.save()
|
||||
|
||||
def fill(self):
|
||||
self.config.expand()
|
||||
logger.info(f"Preparing queue: { len(self.config.cases) }")
|
||||
@ -63,17 +77,18 @@ class UltimateRunner(object):
|
||||
for case in self.config.cases:
|
||||
with self.database:
|
||||
t_shape = T.Shape(
|
||||
exec_id = self.exec_id,
|
||||
exec_id = self.t_exec,
|
||||
**case
|
||||
)
|
||||
t_shape.save()
|
||||
|
||||
self.queue.append(UltimateRunner(
|
||||
config = self.config,
|
||||
exec_id = self.exec_id,
|
||||
t_exec = self.t_exec,
|
||||
t_shape = t_shape
|
||||
))
|
||||
|
||||
|
||||
def start(self, queue: list = None, nprocs: int = None):
|
||||
nprocs = nprocs or self.config["nprocs"]
|
||||
|
||||
@ -90,62 +105,77 @@ class UltimateRunner(object):
|
||||
def casepath(self):
|
||||
with self.database:
|
||||
params = T.Shape.get(
|
||||
T.Shape.exec_id == self.exec_id,
|
||||
T.Shape.exec_id == self.t_exec,
|
||||
T.Shape.shape_id == self.t_shape.shape_id
|
||||
)
|
||||
|
||||
direction = "direction-[{},{},{}]".format(*[ str(d) for d in params.direction ])
|
||||
theta = "theta-{}".format(params.theta)
|
||||
dirpath = path.join(self.config["build"], params.label, direction, theta)
|
||||
alpha = "alpha-{}".format(params.alpha)
|
||||
dirpath = path.join(self.config["build"], params.label, direction, alpha)
|
||||
|
||||
return path.abspath(dirpath)
|
||||
|
||||
def computeShape(self):
|
||||
if not self.type == "worker":
|
||||
return
|
||||
#if current_process().name == "master":
|
||||
# return
|
||||
|
||||
with self.database:
|
||||
params = T.Shape.get(
|
||||
T.Shape.exec_id == self.exec_id,
|
||||
T.Shape.exec_id == self.t_exec,
|
||||
T.Shape.shape_id == self.t_shape.shape_id
|
||||
)
|
||||
|
||||
logger.info("Computing shape for {} with direction = {} and theta = {}".format(params.label, params.direction, params.theta))
|
||||
logger.info("Computing shape for {} with direction = {} and alpha = {}".format(params.label, params.direction, params.alpha))
|
||||
out, err, returncode = "", "", 0
|
||||
filename = "shape.step"
|
||||
timer = Timer()
|
||||
|
||||
self.shape = {
|
||||
shape = {
|
||||
"simple": Simple,
|
||||
"bodyCentered": BodyCentered,
|
||||
"faceCentered": FaceCentered
|
||||
}[params.label]
|
||||
|
||||
self.shape(params.direction)
|
||||
self.shape = shape(
|
||||
direction = params.direction,
|
||||
alpha = params.alpha,
|
||||
r0 = params.r0,
|
||||
filletsEnabled = params.filletsEnabled
|
||||
)
|
||||
self.shape.build()
|
||||
|
||||
os.makedirs(self.casepath(), exist_ok = True)
|
||||
self.shape.export(path.join(self.casepath(), filename))
|
||||
out, err, returncode = self.shape.export(path.join(self.casepath(), filename))
|
||||
|
||||
if returncode == 0:
|
||||
params.shapeStatus = "done"
|
||||
params.shapeExecutionTime = timer.elapsed()
|
||||
|
||||
else:
|
||||
logger.error(err)
|
||||
|
||||
params.shapeStatus = "failed"
|
||||
params.shapeExecutionTime = timer.elapsed()
|
||||
|
||||
with self.database:
|
||||
params.shapeStatus = "Done"
|
||||
params.save()
|
||||
|
||||
def computeMesh(self):
|
||||
if not self.type == "worker":
|
||||
return
|
||||
#if not self.type == "worker":
|
||||
# return
|
||||
|
||||
with self.database:
|
||||
params = (T.Mesh.select(T.Shape, T.Mesh)
|
||||
.join(
|
||||
T.Mesh,
|
||||
JOIN.INNER,
|
||||
on = (T.Mesh.shape_id == T.Shape.shape_id)
|
||||
).where(
|
||||
T.Shape.exec_id == self.exec_id,
|
||||
t_params = T.Shape.get(
|
||||
T.Shape.exec_id == self.t_exec,
|
||||
T.Shape.shape_id == self.t_shape.shape_id
|
||||
))
|
||||
)
|
||||
params = T.Mesh.get(
|
||||
T.Mesh.shape_id == self.t_shape.shape_id
|
||||
)
|
||||
|
||||
logger.info("Computing mesh for {} with direction = {} and theta = {}".format(params.label, params.direction, params.theta))
|
||||
logger.info("Computing mesh for {} with direction = {} and alpha = {}".format(t_params.label, t_params.direction, t_params.alpha))
|
||||
filename = "mesh.mesh"
|
||||
timer = Timer()
|
||||
|
||||
self.mesh = Mesh(self.shape.shape)
|
||||
self.mesh.build()
|
||||
@ -153,6 +183,11 @@ class UltimateRunner(object):
|
||||
os.makedirs(self.casepath(), exist_ok = True)
|
||||
self.mesh.export(path.join(self.casepath(), filename))
|
||||
|
||||
with self.database:
|
||||
params.meshStatus = "done"
|
||||
params.meshExecutionTime = timer.elapsed()
|
||||
params.save()
|
||||
|
||||
def computeFlow(self):
|
||||
params = self.config.cases[0]
|
||||
flow = OnePhaseFlow(path = self.casepath())
|
||||
@ -210,20 +245,21 @@ class UltimateRunner(object):
|
||||
|
||||
def pipeline(self, stage: str = None):
|
||||
self.prepareDatabase()
|
||||
self.createRow()
|
||||
|
||||
stage = stage or self.config["stage"]
|
||||
|
||||
if stage in ["shape", "all"]:
|
||||
self.computeShape()
|
||||
|
||||
elif stage in ["mesh", "all"]:
|
||||
if stage in ["mesh", "all"]:
|
||||
self.computeMesh()
|
||||
|
||||
elif stage in ["flow", "all"]:
|
||||
self.computeFlow()
|
||||
#elif stage in ["flow", "all"]:
|
||||
# self.computeFlow()
|
||||
|
||||
elif stage in ["postProcess", "all"]:
|
||||
self.postProcess()
|
||||
#elif stage in ["postProcess", "all"]:
|
||||
# self.postProcess()
|
||||
|
||||
|
||||
|
||||
|
@ -220,75 +220,15 @@ def timer(func: FunctionType) -> (tuple, float):
|
||||
|
||||
class Timer(object):
|
||||
def __init__(self):
|
||||
self.update()
|
||||
|
||||
def update(self):
|
||||
self.start = time.monotonic()
|
||||
|
||||
def elapsed(self):
|
||||
return time.monotonic() - self.start
|
||||
|
||||
|
||||
def queue(cmd, qin, qout, *args):
|
||||
|
||||
while True:
|
||||
# Get item from the queue
|
||||
pos, var = qin.get()
|
||||
|
||||
# Exit point
|
||||
if pos is None:
|
||||
break
|
||||
|
||||
# Execute command
|
||||
res = cmd(*var, *args)
|
||||
|
||||
# Put results to the queue
|
||||
qout.put((pos, res))
|
||||
|
||||
return
|
||||
|
||||
|
||||
def parallel(np, var, cmd):
|
||||
|
||||
varcount = len(var)
|
||||
|
||||
processes = []
|
||||
nprocs = np if np <= cpu_count() else cpu_count()
|
||||
|
||||
qin = Queue(1)
|
||||
qout = Queue()
|
||||
|
||||
# Create processes
|
||||
for n in range(nprocs):
|
||||
pargs = [cmd, qin, qout]
|
||||
|
||||
p = Process(target = queue, args = tuple(pargs))
|
||||
|
||||
processes.append(p)
|
||||
|
||||
# Start processes
|
||||
for p in processes:
|
||||
p.daemon = True
|
||||
p.start()
|
||||
|
||||
# Fill queue
|
||||
for n in range(varcount):
|
||||
qin.put((n, var[n]))
|
||||
|
||||
for _ in range(nprocs):
|
||||
qin.put((None, None))
|
||||
|
||||
# Get results
|
||||
results = [[] for n in range(varcount)]
|
||||
|
||||
for n in range(varcount):
|
||||
index, res = qout.get()
|
||||
|
||||
results[index] = res
|
||||
|
||||
# Wait until each processor has finished
|
||||
for p in processes:
|
||||
p.join()
|
||||
|
||||
return results
|
||||
|
||||
class ParallelRunner(object):
|
||||
def __init__(self, nprocs: int = 1, daemon: bool = True):
|
||||
self.nprocs = nprocs
|
||||
@ -326,7 +266,7 @@ class ParallelRunner(object):
|
||||
self.processes.append(Process(
|
||||
target = self.queueRelease,
|
||||
args = (self.queueInput, self.queueOutput),
|
||||
name = f"PP-{ n + 1 }"
|
||||
name = f"worker-{ n + 1 }"
|
||||
))
|
||||
|
||||
for proc in self.processes:
|
||||
@ -349,10 +289,6 @@ class ParallelRunner(object):
|
||||
self.__pos = -1
|
||||
|
||||
|
||||
def portIsFree(address, port):
|
||||
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||
return s.connect_ex((address, port)) == 0
|
||||
|
||||
|
||||
|
||||
|
@ -31,12 +31,12 @@ class Shape(Model):
|
||||
shape_id = AutoField()
|
||||
exec_id = ForeignKeyField(Execution, backref = "executions")
|
||||
|
||||
shapeStatus = TextField(null = True, default = "Idle")
|
||||
shapeCalculationTime = TimeField(null = True)
|
||||
shapeStatus = TextField(null = True, default = "idle")
|
||||
shapeExecutionTime = TimeField(null = True)
|
||||
|
||||
label = TextField(null = True)
|
||||
direction = JSONField(null = True)
|
||||
theta = FloatField(null = True)
|
||||
alpha = FloatField(null = True)
|
||||
|
||||
r0 = FloatField(null = True)
|
||||
L = FloatField(null = True)
|
||||
@ -61,8 +61,8 @@ class Mesh(Model):
|
||||
mesh_id = AutoField()
|
||||
shape_id = ForeignKeyField(Shape, backref = "shapes")
|
||||
|
||||
meshStatus = TextField(null = True, default = "Idle")
|
||||
meshCalculationTime = TimeField(null = True)
|
||||
meshStatus = TextField(null = True, default = "idle")
|
||||
meshExecutionTime = TimeField(null = True)
|
||||
|
||||
elements = IntegerField(null = True)
|
||||
edges = IntegerField(null = True)
|
||||
@ -83,8 +83,8 @@ class FlowOnephase(Model):
|
||||
flow_id = AutoField()
|
||||
mesh_id = ForeignKeyField(Mesh, backref = "meshes")
|
||||
|
||||
flowStatus = TextField(null = True, default = "Idle")
|
||||
flowCalculationTime = TimeField(null = True)
|
||||
flowStatus = TextField(null = True, default = "idle")
|
||||
flowExecutionTime = TimeField(null = True)
|
||||
|
||||
flowRate = FloatField(null = True)
|
||||
permeability = FloatField(null = True)
|
||||
|
@ -43,10 +43,8 @@ class TestCore(unittest.TestCase):
|
||||
|
||||
pathOld = os.path.abspath(".")
|
||||
config = self.core.DefaultConfig()
|
||||
config.expand()
|
||||
config.cases = [ config.cases[0] ]
|
||||
|
||||
runner = self.core.UltimateRunner(config = config, exec_id = True)
|
||||
runner = self.core.UltimateRunner(config = config)
|
||||
|
||||
runner.computeShape()
|
||||
self.assertTrue(path.isfile(path.join(runner.casepath(), "shape.step")))
|
||||
|
Loading…
Reference in New Issue
Block a user