Mod: improved mesh and shape classes
Mod: exception handling
This commit is contained in:
parent
d724b45e85
commit
7dce03e97a
@ -396,11 +396,5 @@ def show(params, path, printlist, export, fields, output):
|
||||
# CLI entry
|
||||
##
|
||||
if __name__ == "__main__":
|
||||
#try:
|
||||
anisotropy()
|
||||
anisotropy()
|
||||
|
||||
#except KeyboardInterrupt:
|
||||
# click.echo("Interrupted!")
|
||||
|
||||
#finally:
|
||||
# sys.exit(0)
|
||||
|
@ -4,14 +4,15 @@
|
||||
|
||||
from datetime import datetime
|
||||
import os
|
||||
from os import path
|
||||
from os import path, PathLike
|
||||
from pathlib import Path
|
||||
|
||||
from anisotropy.core.config import DefaultConfig
|
||||
|
||||
import logging
|
||||
|
||||
from anisotropy.core.postProcess import PostProcess
|
||||
from anisotropy.core.utils import Timer
|
||||
from anisotropy.core.utils import Timer, ErrorHandler
|
||||
from anisotropy.core.parallel import ParallelRunner
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -22,6 +23,7 @@ from anisotropy.shaping import Simple, BodyCentered, FaceCentered, Shape
|
||||
from anisotropy.meshing import Mesh
|
||||
from anisotropy.solving import OnePhaseFlow
|
||||
|
||||
|
||||
class UltimateRunner(object):
|
||||
def __init__(self, config = None, exec_id: int = None, typo: str = "master"):
|
||||
# Configuration file
|
||||
@ -46,18 +48,9 @@ class UltimateRunner(object):
|
||||
self.exec_id = T.Execution.create(date = datetime.now())
|
||||
|
||||
# Parameters
|
||||
self.shape = None
|
||||
self.mesh = None
|
||||
self.flow = None
|
||||
|
||||
self.queue = []
|
||||
|
||||
|
||||
def dispose(self):
|
||||
self.shape = None
|
||||
self.mesh = None
|
||||
self.flow = None
|
||||
|
||||
def createRow(self):
|
||||
# create a row in each table for the current case
|
||||
with self.database:
|
||||
@ -81,6 +74,7 @@ class UltimateRunner(object):
|
||||
flow = T.FlowOnephase(mesh_id = mesh.mesh_id, **self.config.params)
|
||||
self.database.csave(mesh)
|
||||
|
||||
|
||||
def fill(self):
|
||||
self.config.expand()
|
||||
logger.info(f"Preparing queue: { len(self.config.cases) }")
|
||||
@ -96,7 +90,7 @@ class UltimateRunner(object):
|
||||
}
|
||||
self.queue.append(kwargs)
|
||||
|
||||
|
||||
|
||||
def start(self, queue: list = None, nprocs: int = None):
|
||||
nprocs = nprocs or self.config["nprocs"]
|
||||
|
||||
@ -109,17 +103,22 @@ class UltimateRunner(object):
|
||||
|
||||
parallel.wait()
|
||||
|
||||
def casepath(self):
|
||||
|
||||
@property
|
||||
def casepath(self) -> PathLike:
|
||||
params = self.config.params
|
||||
path = Path(os.environ["ANISOTROPY_CWD"], os.environ["ANISOTROPY_BUILD_DIR"])
|
||||
path /= "execution-{}".format(self.exec_id)
|
||||
path /= "{}-[{},{},{}]-{}".format(
|
||||
params["label"],
|
||||
*[ str(d) for d in params["direction"] ],
|
||||
params["alpha"]
|
||||
)
|
||||
|
||||
execution = "execution-{}".format(self.exec_id)
|
||||
case = "{}-[{},{},{}]-{}".format(params["label"], *[ str(d) for d in params["direction"] ], params["alpha"])
|
||||
dirpath = path.join(os.environ["ANISOTROPY_CWD"], self.config["build"], execution, case)
|
||||
return path.resolve()
|
||||
|
||||
return path.abspath(dirpath)
|
||||
|
||||
def computeShape(self):
|
||||
out, err, returncode = "", "", 0
|
||||
params = self.config.params
|
||||
shapeParams = self.database.getShape(
|
||||
params["label"],
|
||||
@ -131,54 +130,48 @@ class UltimateRunner(object):
|
||||
logger.info("Computing shape for {} with direction = {} and alpha = {}".format(
|
||||
params["label"], params["direction"], params["alpha"]
|
||||
))
|
||||
filename = "shape.step"
|
||||
shapepath = path.join(self.casepath(), filename)
|
||||
shapeFile = self.casepath / "shape.step"
|
||||
timer = Timer()
|
||||
|
||||
if path.exists(shapepath) and shapeParams.shapeStatus == "done" and not self.config["overwrite"]:
|
||||
if shapeFile.exists() and shapeParams.shapeStatus == "done" and not self.config["overwrite"]:
|
||||
logger.info("Shape exists. Skipping ...")
|
||||
return
|
||||
|
||||
shape = {
|
||||
shapeSelected = {
|
||||
"simple": Simple,
|
||||
"bodyCentered": BodyCentered,
|
||||
"faceCentered": FaceCentered
|
||||
}[shapeParams.label]
|
||||
|
||||
self.shape = shape(
|
||||
shape = shapeSelected(
|
||||
direction = shapeParams.direction,
|
||||
alpha = shapeParams.alpha,
|
||||
r0 = shapeParams.r0,
|
||||
filletsEnabled = shapeParams.filletsEnabled
|
||||
)
|
||||
|
||||
try:
|
||||
self.shape.build()
|
||||
with ErrorHandler() as (eh, handler):
|
||||
handler(shape.build)()
|
||||
|
||||
except Exception as e:
|
||||
err = e
|
||||
returncode = 1
|
||||
if not eh.returncode:
|
||||
self.casepath.mkdir(exist_ok = True)
|
||||
|
||||
if not returncode:
|
||||
os.makedirs(self.casepath(), exist_ok = True)
|
||||
out, err, returncode = self.shape.export(path.join(self.casepath(), filename))
|
||||
with ErrorHandler() as (eh, handler):
|
||||
handler(shape.write)(shapeFile)
|
||||
|
||||
if not returncode:
|
||||
if not eh.returncode:
|
||||
shapeParams.shapeStatus = "done"
|
||||
|
||||
shapeParams.volume = self.shape.shape.volume
|
||||
shapeParams.volumeCell = self.shape.cell.volume
|
||||
shapeParams.volume = shape.shape.volume
|
||||
shapeParams.volumeCell = shape.cell.volume
|
||||
shapeParams.porosity = shapeParams.volume / shapeParams.volumeCell
|
||||
|
||||
else:
|
||||
logger.error(err)
|
||||
shapeParams.shapeStatus = "failed"
|
||||
logger.error(eh.error)
|
||||
|
||||
#with self.database:
|
||||
shapeParams.shapeExecutionTime = timer.elapsed()
|
||||
#shapeParams.save()
|
||||
self.database.csave(shapeParams)
|
||||
self.dispose()
|
||||
|
||||
|
||||
def computeMesh(self):
|
||||
out, err, returncode = "", "", 0
|
||||
@ -193,40 +186,45 @@ class UltimateRunner(object):
|
||||
logger.info("Computing mesh for {} with direction = {} and alpha = {}".format(
|
||||
params["label"], params["direction"], params["alpha"]
|
||||
))
|
||||
filename = "mesh.mesh"
|
||||
meshpath = path.join(self.casepath(), filename)
|
||||
meshFile = self.casepath / "mesh.mesh"
|
||||
timer = Timer()
|
||||
|
||||
if path.exists(meshpath) and meshParams.meshStatus == "done" and not self.config["overwrite"]:
|
||||
if meshFile.exists() and meshParams.meshStatus == "done" and not self.config["overwrite"]:
|
||||
logger.info("Mesh exists. Skipping ...")
|
||||
return
|
||||
|
||||
if not self.shape:
|
||||
shapefile = "shape.step"
|
||||
filepath = path.join(self.casepath(), shapefile)
|
||||
# Shape
|
||||
shape = None
|
||||
shapeFile = self.casepath / "shape.step"
|
||||
|
||||
if not path.exists(filepath) and not path.isfile(filepath):
|
||||
err = f"File not found: { filepath }"
|
||||
returncode = 2
|
||||
|
||||
if not returncode:
|
||||
self.shape = Shape()
|
||||
self.shape.load(filepath)
|
||||
if not shapeFile.exists() and not shapeFile.is_file():
|
||||
err = f"File not found: { shapeFile }"
|
||||
returncode = 2
|
||||
|
||||
if not returncode:
|
||||
self.mesh = Mesh(self.shape.shape)
|
||||
shape = Shape().read(shapeFile)
|
||||
|
||||
# Mesh
|
||||
if not returncode:
|
||||
mesh = Mesh(shape.shape)
|
||||
|
||||
try:
|
||||
self.mesh.build()
|
||||
mesh.generate()
|
||||
|
||||
except Exception as e:
|
||||
err = e
|
||||
returncode = 1
|
||||
|
||||
if not returncode:
|
||||
os.makedirs(self.casepath(), exist_ok = True)
|
||||
out, err, returncode = self.mesh.export(path.join(self.casepath(), filename))
|
||||
out, err, returncode = self.mesh.export(path.join(self.casepath(), "mesh.msh"))
|
||||
self.casepath.mkdir(exist_ok = True)
|
||||
|
||||
try:
|
||||
mesh.write(meshFile)
|
||||
mesh.write(self.casepath / "mesh.msh")
|
||||
|
||||
except Exception as e:
|
||||
err = e
|
||||
returncode = 1
|
||||
|
||||
if not returncode:
|
||||
meshParams.meshStatus = "done"
|
||||
@ -241,7 +239,7 @@ class UltimateRunner(object):
|
||||
with self.database:
|
||||
meshParams.meshExecutionTime = timer.elapsed()
|
||||
meshParams.save()
|
||||
self.dispose()
|
||||
|
||||
|
||||
def computeFlow(self):
|
||||
params = self.config.params
|
||||
@ -261,37 +259,33 @@ class UltimateRunner(object):
|
||||
flowParams.viscosityKinematic = flowParams.viscosity / flowParams.density
|
||||
|
||||
with self.database:
|
||||
flowParams.save()
|
||||
|
||||
flowParams.save()
|
||||
|
||||
with self.database:
|
||||
self.flow = OnePhaseFlow(
|
||||
flow = OnePhaseFlow(
|
||||
direction = params["direction"],
|
||||
**self.database.getFlowOnephase(*query, to_dict = True),
|
||||
path = self.casepath()
|
||||
path = self.casepath
|
||||
)
|
||||
|
||||
if not self.shape:
|
||||
filename = "shape.step"
|
||||
filepath = path.join(self.casepath(), filename)
|
||||
# Shape
|
||||
shapeFile = self.casepath / "shape.step"
|
||||
|
||||
if not path.exists(filepath) and not path.isfile(filepath):
|
||||
err = f"File not found: { filepath }"
|
||||
returncode = 2
|
||||
if not shapeFile.exists() and not shapeFile.is_file():
|
||||
err = f"File not found: { shapeFile }"
|
||||
returncode = 2
|
||||
|
||||
if not returncode:
|
||||
self.shape = Shape()
|
||||
self.shape.load(filepath)
|
||||
if not returncode:
|
||||
shape = Shape().read(shapeFile)
|
||||
|
||||
faces = [ (n, face.name) for n, face in enumerate(self.shape.shape.faces) ]
|
||||
createPatchDict = OnePhaseFlow.facesToPatches(faces)
|
||||
|
||||
self.flow.append(createPatchDict)
|
||||
self.flow.write()
|
||||
# Patches from occ to openfoam
|
||||
patches = shape.patches(group = True, shiftIndex = True, prefix = "patch")
|
||||
flow.createPatches(patches)
|
||||
flow.write()
|
||||
|
||||
# Build a flow
|
||||
try:
|
||||
out, err, returncode = self.flow.build()
|
||||
out, err, returncode = flow.build()
|
||||
|
||||
except Exception as e:
|
||||
out, err, returncode = "", e, 1
|
||||
@ -308,6 +302,7 @@ class UltimateRunner(object):
|
||||
flowParams.flowExecutionTime = timer.elapsed()
|
||||
flowParams.save()
|
||||
|
||||
|
||||
def computePostProcess(self):
|
||||
params = self.config.params
|
||||
flowParams = self.database.getFlowOnephase(
|
||||
@ -321,7 +316,7 @@ class UltimateRunner(object):
|
||||
params["label"], params["direction"], params["alpha"]
|
||||
))
|
||||
|
||||
postProcess = PostProcess(self.casepath())
|
||||
postProcess = PostProcess(self.casepath)
|
||||
|
||||
if flowParams.flowStatus == "done":
|
||||
flowParams.flowRate = postProcess.flowRate("outlet")
|
||||
|
@ -6,6 +6,7 @@ import logging
|
||||
import copy
|
||||
import time
|
||||
from types import FunctionType
|
||||
import contextlib
|
||||
|
||||
|
||||
class CustomFormatter(logging.Formatter):
|
||||
@ -206,3 +207,33 @@ class Timer(object):
|
||||
|
||||
def elapsed(self):
|
||||
return time.monotonic() - self.start
|
||||
|
||||
|
||||
class ErrorHandler(contextlib.AbstractContextManager):
|
||||
def __init__(self):
|
||||
self.error = ""
|
||||
self.returncode = 0
|
||||
self.traceback = None
|
||||
|
||||
def __enter__(self):
|
||||
return self, self.handler
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
if exc_type:
|
||||
self.error = exc_value.args
|
||||
self.returncode = 1
|
||||
self.traceback = traceback
|
||||
|
||||
def handle(self, obj):
|
||||
def inner(*args, **kwargs):
|
||||
try:
|
||||
output = obj(*args, **kwargs)
|
||||
|
||||
except Exception as e:
|
||||
self.error = e.args
|
||||
self.returncode = 1
|
||||
|
||||
else:
|
||||
return output
|
||||
|
||||
return inner
|
||||
|
@ -269,7 +269,7 @@ def plotDraw(clicks, execution, structure, direction, data):
|
||||
)
|
||||
|
||||
if not direction == "all":
|
||||
query = qeury.where(models.Shape.direction == json.loads(direction))
|
||||
query = query.where(models.Shape.direction == json.loads(direction))
|
||||
|
||||
with db:
|
||||
if query.exists():
|
||||
|
@ -4,9 +4,11 @@
|
||||
|
||||
from netgen.occ import OCCGeometry
|
||||
from netgen import meshing
|
||||
from numpy import array
|
||||
import numpy
|
||||
from numpy import array, ndarray, linalg
|
||||
import os
|
||||
from .utils import extractPoints, extractCells
|
||||
from .utils import extractNetgenPoints, extractNetgenCells
|
||||
from . import metrics
|
||||
import meshio
|
||||
|
||||
|
||||
@ -19,154 +21,130 @@ class NotSupportedMeshFormat(Exception):
|
||||
def __init__(self, msg):
|
||||
super().__init__(self, msg)
|
||||
|
||||
|
||||
class MeshingParameters(object):
|
||||
def __init__(self, **kwargs):
|
||||
self.preset(2, **kwargs)
|
||||
|
||||
def preset(self, key: int, **kwargs):
|
||||
"""Apply predefined parameters.
|
||||
|
||||
:param key:
|
||||
0: very_coarse
|
||||
1: coarse
|
||||
2: moderate
|
||||
3: fine
|
||||
4: very_fine
|
||||
"""
|
||||
self.maxh = kwargs.get("maxh", 0.2)
|
||||
self.curvaturesafety = kwargs.get("curvaturesafety", [1, 1.5, 2, 3, 5][key])
|
||||
self.segmentsperedge = kwargs.get("segmentsperedge", [0.3, 0.5, 1, 2, 3][key])
|
||||
self.grading = kwargs.get("grading", [0.7, 0.5, 0.3, 0.2, 0.1][key])
|
||||
self.chartdistfac = kwargs.get("chartdistfac", [0.8, 1, 1.5, 2, 5][key])
|
||||
self.linelengthfac = kwargs.get("linelengthfac", [0.2, 0.35, 0.5, 1.5, 3][key])
|
||||
self.closeedgefac = kwargs.get("closeedgefac", [0.5, 1, 2, 3.5, 5][key])
|
||||
self.minedgelen = kwargs.get("minedgelen", [0.002, 0.02, 0.2, 1.0, 2.0][key])
|
||||
self.surfmeshcurvfac = kwargs.get("surfmeshcurvfac", [1, 1.5, 2, 3, 5.0][key])
|
||||
self.optsteps2d = kwargs.get("optsteps2d", 5)
|
||||
self.optsteps3d = kwargs.get("optsetps3d", 5)
|
||||
|
||||
return self
|
||||
|
||||
def get(self) -> meshing.MeshingParameters:
|
||||
return meshing.MeshingParameters(**self.__dict__)
|
||||
|
||||
def __repr__(self):
|
||||
return str(self.__dict__)
|
||||
|
||||
|
||||
class Mesh(object):
|
||||
def __init__(self, shape: OCCGeometry = None):
|
||||
self.geometry = OCCGeometry(shape) if shape else None
|
||||
def __init__(self, shape = None):
|
||||
self.shape = shape
|
||||
self.mesh = None
|
||||
|
||||
# Parameters
|
||||
self.maxh = 0.2
|
||||
self.curvaturesafety = 5
|
||||
self.segmentsperedge = 3
|
||||
self.grading = 0.1
|
||||
self.chartdistfac = 5
|
||||
self.linelengthfac = 3
|
||||
self.closeedgefac = 5
|
||||
self.minedgelen = 2.0
|
||||
self.surfmeshcurvfac = 5.0
|
||||
self.optsteps2d = 5
|
||||
self.optsteps3d = 5
|
||||
|
||||
self.points = []
|
||||
self.cells = []
|
||||
self.boundary = []
|
||||
|
||||
@property
|
||||
def parameters(self):
|
||||
return meshing.MeshingParameters(
|
||||
maxh = self.maxh,
|
||||
curvaturesafety = self.curvaturesafety,
|
||||
segmentsperedge = self.segmentsperedge,
|
||||
grading = self.grading,
|
||||
chartdistfac = self.chartdistfac,
|
||||
linelengthfac = self.linelengthfac,
|
||||
closeedgefac = self.closeedgefac,
|
||||
minedgelen = self.minedgelen,
|
||||
surfmeshcurvfac = self.surfmeshcurvfac,
|
||||
optsteps2d = self.optsteps2d,
|
||||
optsteps3d = self.optsteps3d
|
||||
)
|
||||
def geometry(self):
|
||||
return OCCGeometry(self.shape)
|
||||
|
||||
def build(self):
|
||||
if self.geometry:
|
||||
self.mesh = self.geometry.GenerateMesh(self.parameters)
|
||||
def generate(self, parameters: MeshingParameters = None, refineSteps: int = 0, scale: float = 0):
|
||||
if not self.geometry:
|
||||
raise NoGeometrySpecified("Cannot build mesh without geometry")
|
||||
|
||||
parameters = parameters or MeshingParameters()
|
||||
mesh = self.geometry.GenerateMesh(parameters.get())
|
||||
|
||||
else:
|
||||
raise NoGeometrySpecified("Specify a geometry to build a mesh")
|
||||
if refineSteps > 0:
|
||||
for n in range(refineSteps):
|
||||
mesh.Refine()
|
||||
|
||||
mesh.OptimizeMesh2d(parameters)
|
||||
mesh.OptimizeVolumeMesh(parameters)
|
||||
|
||||
if scale > 0:
|
||||
mesh.Scale(scale)
|
||||
|
||||
self.points = extractNetgenPoints(mesh)
|
||||
self.cells = []
|
||||
|
||||
formats = {
|
||||
"vol": "Netgen Format",
|
||||
"mesh": "Neutral Format",
|
||||
"msh": "Gmsh2 Format"
|
||||
}
|
||||
for dim in range(4):
|
||||
self.cells.extend(extractNetgenCells(dim, mesh))
|
||||
|
||||
def load(self, filename: str):
|
||||
return self
|
||||
|
||||
|
||||
def read(self, filename: str):
|
||||
"""Import a mesh.
|
||||
|
||||
Use `Mesh.formats` to see supported formats.
|
||||
|
||||
:param filename:
|
||||
Name of the file to store the given mesh in.
|
||||
Name of the mesh file.
|
||||
"""
|
||||
ext = os.path.splitext(filename)[1][1: ]
|
||||
mesh = meshio.read(filename)
|
||||
|
||||
if ext in self.formats.keys():
|
||||
self.mesh = meshing.Mesh()
|
||||
self.mesh.Load(filename)
|
||||
|
||||
else:
|
||||
raise NotSupportedMeshFormat(f"Mesh format '{ ext }' is not supported")
|
||||
self.points = mesh.points
|
||||
self.cells = mesh.cells
|
||||
|
||||
return self
|
||||
|
||||
def export(self, filename: str):
|
||||
def write(self, filename: str):
|
||||
"""Export a mesh.
|
||||
|
||||
Use `Mesh.formats` to see supported formats.
|
||||
|
||||
:param filename:
|
||||
Name of the file to store the given mesh in.
|
||||
"""
|
||||
mesh = meshio.Mesh(self.points, self.cells)
|
||||
mesh.write(filename)
|
||||
|
||||
|
||||
@property
|
||||
def volumes(self) -> list[ndarray]:
|
||||
points = []
|
||||
|
||||
for cellBlock in self.cells:
|
||||
if cellBlock.dim == 3:
|
||||
points.extend([ *self.points[cellBlock.data] ])
|
||||
|
||||
return points
|
||||
|
||||
@property
|
||||
def volume(self) -> float:
|
||||
"""Volume of whole mesh.
|
||||
|
||||
:return:
|
||||
Output, error messages and returncode
|
||||
Volume.
|
||||
"""
|
||||
out, err, returncode = "", "", 0
|
||||
ext = os.path.splitext(filename)[1][1: ]
|
||||
|
||||
try:
|
||||
if ext == "vol":
|
||||
self.mesh.Save(filename)
|
||||
|
||||
elif ext in self.formats.keys():
|
||||
self.mesh.Export(filename, self.formats[ext])
|
||||
|
||||
else:
|
||||
raise NotSupportedMeshFormat(f"Mesh format '{ ext }' is not supported")
|
||||
|
||||
except (NotSupportedMeshFormat, Exception) as e:
|
||||
err = e
|
||||
returncode = 1
|
||||
|
||||
return out, err, returncode
|
||||
|
||||
def to_meshio(self):
|
||||
points = extractPoints(self.mesh.Points())
|
||||
cells = []
|
||||
|
||||
if len(self.mesh.Elements1D()) > 0:
|
||||
cells.extend([ cells_ for cells_ in extractCells(1, self.mesh.Elements1D()).items() ])
|
||||
|
||||
if len(self.mesh.Elements2D()) > 0:
|
||||
cells.extend([ cells_ for cells_ in extractCells(2, self.mesh.Elements2D()).items() ])
|
||||
|
||||
if len(self.mesh.Elements3D()) > 0:
|
||||
cells.extend([ cells_ for cells_ in extractCells(3, self.mesh.Elements3D()).items() ])
|
||||
|
||||
return meshio.Mesh(points, cells)
|
||||
|
||||
@staticmethod
|
||||
def volumeTetra(points: array) -> float:
|
||||
return 1 / 6 * linalg.det(numpy.append(points.transpose(), numpy.array([[1, 1, 1, 1]]), axis = 0))
|
||||
return sum([ metrics.volume(cell) for cell in self.volumes ])
|
||||
|
||||
@property
|
||||
def volumes(self) -> array:
|
||||
def faces(self) -> list[ndarray]:
|
||||
points = []
|
||||
|
||||
for cells in self.cells:
|
||||
if cells.dim == 3:
|
||||
points.extend([ self.mesh.points[cell] for cell in cells.data ])
|
||||
for cellBlock in self.cells:
|
||||
if cellBlock.dim == 2:
|
||||
points.extend([ *self.points[cellBlock.data] ])
|
||||
|
||||
return array(points)
|
||||
return points
|
||||
|
||||
@property
|
||||
def faces(self) -> array:
|
||||
points = []
|
||||
|
||||
for cells in self.cells:
|
||||
if cells.dim == 2:
|
||||
points.extend([ self.mesh.points[cell] for cell in cells.data ])
|
||||
|
||||
return array(points)
|
||||
|
||||
@property
|
||||
def edges(self) -> array:
|
||||
points = []
|
||||
|
||||
for cells in self.cells:
|
||||
if cells.dim == 1:
|
||||
points.extend([ self.mesh.points[cell] for cell in cells.data ])
|
||||
|
||||
return array(points)
|
||||
|
||||
|
||||
|
||||
# tetras = numpy.array([ [ [ vertex for vertex in mesh[index] ] for index in element.vertices ] for element in self.mesh.Elements3D() ])
|
||||
# volumes = numpy.array([ 1 / 6 * linalg.det(numpy.append(tetra.transpose(), numpy.array([[1, 1, 1, 1]]), axis = 0)) for tetra in tetras ])
|
||||
|
||||
|
@ -2,4 +2,18 @@
|
||||
# This file is part of anisotropy.
|
||||
# License: GNU GPL version 3, see the file "LICENSE" for details.
|
||||
|
||||
import numpy
|
||||
from numpy import ndarray
|
||||
from numpy import linalg
|
||||
from .utils import detectCellType
|
||||
|
||||
|
||||
def volume(points: ndarray) -> ndarray:
|
||||
cellType = detectCellType(3, len(points))
|
||||
|
||||
if cellType == "tetra":
|
||||
return 1 / 6 * linalg.det(numpy.append(points.transpose(), numpy.array([[1, 1, 1, 1]]), axis = 0))
|
||||
|
||||
else:
|
||||
raise Exception(f"Not supported cell type '{ cellType }'")
|
||||
|
||||
|
@ -1,31 +1,68 @@
|
||||
from __future__ import annotations
|
||||
from netgen import meshing
|
||||
from meshio._mesh import topological_dimension
|
||||
from meshio._common import num_nodes_per_cell
|
||||
from numpy import array
|
||||
from numpy import array, asarray, ndarray
|
||||
|
||||
|
||||
def detectTopology(dimension: dict, num_nodes: dict):
|
||||
def detectCellType(dimension: int, num_nodes: int):
|
||||
for dim in topological_dimension.keys():
|
||||
for num in num_nodes_per_cell.keys():
|
||||
if topological_dimension[dim] == dimension and num_nodes_per_cell[num] == num_nodes and dim == num:
|
||||
return dim
|
||||
|
||||
class CellBlock:
|
||||
def __init__(self, cellType: str, data: list | ndarray, tags: list[str] | None = None):
|
||||
self.type = cellType
|
||||
self.data = data
|
||||
|
||||
def extractPoints(points):
|
||||
return array([ point.p for point in points ], dtype = float)
|
||||
if cellType.startswith("polyhedron"):
|
||||
self.dim = 3
|
||||
|
||||
else:
|
||||
self.data = asarray(self.data)
|
||||
self.dim = topological_dimension[cellType]
|
||||
|
||||
self.tags = [] if tags is None else tags
|
||||
|
||||
def __repr__(self):
|
||||
items = [
|
||||
"CellBlock",
|
||||
f"type: { self.type }",
|
||||
f"num cells: { len(self.data) }",
|
||||
f"tags: { self.tags }",
|
||||
]
|
||||
return "<" + ", ".join(items) + ">"
|
||||
|
||||
def __len__(self):
|
||||
return len(self.data)
|
||||
|
||||
def extractNetgenPoints(mesh: meshing.Mesh) -> ndarray:
|
||||
return array([ point.p for point in mesh.Points() ], dtype = float)
|
||||
|
||||
|
||||
def extractCells(dimension: int, elements):
|
||||
cellsNew = {}
|
||||
def extractNetgenCells(dim: int, mesh: meshing.Mesh) -> list[CellBlock]:
|
||||
cellsDict = {}
|
||||
elements = {
|
||||
0: mesh.Elements0D(),
|
||||
1: mesh.Elements1D(),
|
||||
2: mesh.Elements2D(),
|
||||
3: mesh.Elements3D()
|
||||
}[dim]
|
||||
|
||||
if len(elements) == 0:
|
||||
return []
|
||||
|
||||
for cell in elements:
|
||||
cellTopo = detectTopology(dimension, len(cell.points))
|
||||
# shift indicies, they should starts from zero
|
||||
cellType = detectCellType(dim, len(cell.points))
|
||||
# shift indicies, they should start from zero
|
||||
cellNew = array([ pointId.nr for pointId in cell.points ], dtype = int) - 1
|
||||
|
||||
if cellsNew.get(cellTopo):
|
||||
cellsNew[cellTopo].append(cellNew)
|
||||
if cellsDict.get(cellType):
|
||||
cellsDict[cellType].append(cellNew)
|
||||
|
||||
else:
|
||||
cellsNew[cellTopo] = [ cellNew ]
|
||||
|
||||
return cellsNew
|
||||
cellsDict[cellType] = [ cellNew ]
|
||||
|
||||
cells = [ CellBlock(key, value) for key, value in cellsDict.items() ]
|
||||
|
||||
return cells
|
||||
|
@ -6,6 +6,8 @@ from netgen.occ import *
|
||||
import numpy
|
||||
from numpy import linalg
|
||||
import os
|
||||
from os import PathLike
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class ShapeError(Exception):
|
||||
@ -17,7 +19,7 @@ class Shape(object):
|
||||
self.groups = {}
|
||||
self.shape = None
|
||||
|
||||
def export(self, filename: str):
|
||||
def write(self, filename: PathLike):
|
||||
"""Export a shape.
|
||||
|
||||
Supported formats: step.
|
||||
@ -29,11 +31,12 @@ class Shape(object):
|
||||
Output, error messages and returncode
|
||||
"""
|
||||
out, err, returncode = "", "", 0
|
||||
ext = os.path.splitext(filename)[1][1: ]
|
||||
path = Path(filename).resolve()
|
||||
ext = path.suffix[1: ]
|
||||
|
||||
try:
|
||||
if ext == "step":
|
||||
self.shape.WriteStep(filename)
|
||||
self.shape.WriteStep(path)
|
||||
|
||||
else:
|
||||
raise NotImplementedError(f"{ ext } is not supported")
|
||||
@ -48,11 +51,12 @@ class Shape(object):
|
||||
|
||||
return out, err, returncode
|
||||
|
||||
def load(self, filename: str):
|
||||
ext = os.path.splitext(filename)[1][1:]
|
||||
def read(self, filename: PathLike):
|
||||
path = Path(filename).resolve()
|
||||
ext = path.suffix[1: ]
|
||||
|
||||
if ext in ["step", "iges", "brep"]:
|
||||
self.shape = OCCGeometry(filename).shape
|
||||
self.shape = OCCGeometry(path).shape
|
||||
|
||||
else:
|
||||
raise NotImplementedError(f"Shape format '{ext}' is not supported")
|
||||
|
@ -135,25 +135,14 @@ class OnePhaseFlow(FoamCase):
|
||||
u
|
||||
])
|
||||
|
||||
@staticmethod
|
||||
def facesToPatches(faces: tuple[int, str]):
|
||||
|
||||
def createPatches(self, patches: dict):
|
||||
# initial 43 unnamed patches ->
|
||||
# 6 named patches (inlet, outlet, wall, symetry0 - 3/5) ->
|
||||
# 4 inGroups (inlet, outlet, wall, symetry)
|
||||
|
||||
createPatchDict = F.CreatePatchDict()
|
||||
createPatchDict["patches"] = []
|
||||
patches = {}
|
||||
|
||||
for n, name in faces:
|
||||
# shifted index
|
||||
n += 1
|
||||
|
||||
if patches.get(name):
|
||||
patches[name].append(f"patch{n}")
|
||||
|
||||
else:
|
||||
patches[name] = [f"patch{n}"]
|
||||
|
||||
for name in patches.keys():
|
||||
if name == "inlet":
|
||||
@ -182,7 +171,7 @@ class OnePhaseFlow(FoamCase):
|
||||
"patches": patches[name]
|
||||
})
|
||||
|
||||
return createPatchDict
|
||||
self.append(createPatchDict)
|
||||
|
||||
def build(self) -> tuple[str, str, int]:
|
||||
# TODO: configure working directory (FoamCase)
|
||||
|
Loading…
Reference in New Issue
Block a user