Mod: improved database

Mod: new cli
Mod: improved ultimate runner
Mod: improved multiprocessing
This commit is contained in:
L-Nafaryus 2021-12-08 19:04:38 +05:00
parent 7d0fe5a4a9
commit 64a5cf1a6d
10 changed files with 489 additions and 185 deletions

View File

@ -89,7 +89,7 @@ def anisotropy():
)
@click.option(
"-s", "--stage", "stage",
type = click.Choice(["all", "mesh", "flow", "postProcessing"]),
type = click.Choice(["all", "shape", "mesh", "flow", "postProcessing"]),
default = "all",
help = "Current computation stage"
)
@ -106,12 +106,9 @@ def anisotropy():
cls = KeyValueOption,
help = "Overwrite existing parameter (except control variables)"
)
def compute(path, configFile, nprocs, stage, overwrite):
def compute(path, configFile, nprocs, stage, overwrite, params):
from anisotropy.core.runner import UltimateRunner
from anisotropy.core.config import DefaultConfig
from anisotropy.database.models import Execution
from copy import deepcopy
from datetime import datetime
config = DefaultConfig()
@ -123,27 +120,13 @@ def compute(path, configFile, nprocs, stage, overwrite):
stage = stage,
overwrite = overwrite
)
config.expand()
exec_id = Execution(date = datetime.now())
exec_id.save()
baseRunner = UltimateRunner(config = config, exec_id = exec_id)
queue = []
for case in config.cases:
caseConfig = deepcopy(config)
caseConfig.purge()
m_shape = Shape(exec_id = exec_id, **case)
m_shape.save()
caseRunner = UltimateRunner(config = caseConfig, exec_id = exec_id)
queue.append(caseRunner)
baseRunner.parallel(queue)
runner = UltimateRunner(config = config)
runner.fill()
runner.start()
##############
"""
def version():
msg = "Missed package anisotropy"
@ -261,10 +244,10 @@ def update(force, params, path):
@anisotropy.command(
help = """Compute cases by chain (mesh -> flow)
Control parameters: type, direction, theta (each parameter affects on a queue)
"""
# help = """#Compute cases by chain (mesh -> flow)
#
# Control parameters: type, direction, theta (each parameter affects on a queue)
"""
)
@click.option(
"-s", "--stage", "stage",
@ -636,7 +619,7 @@ def show(params, path, printlist, export, fields, output):
elif output == "plot":
plt.show()
"""
###
# CLI entry
##

View File

@ -5,7 +5,7 @@
import os
import toml
from copy import deepcopy
from numpy import arange, array
from numpy import arange, array, round
class Config(object):
@ -63,11 +63,12 @@ class Config(object):
# Expand structures for each direction and each theta
for structure in self.content["structures"]:
thetaA = arange(
# ISSUE: precision error 0.06999999999999999
thetaA = round(arange(
structure["theta"][0],
structure["theta"][1] + structure["thetaStep"],
structure["thetaStep"]
)
), 9)
directionA = array(structure["directions"], dtype = float)
for direction in directionA:

View File

@ -4,54 +4,122 @@
from datetime import datetime
from os import path
import logging
from anisotropy.core.config import DefaultConfig
from anisotropy.core.utils import parallel, ParallelRunner, setupLogger
from anisotropy.database import *
from anisotropy.shaping import Simple, BodyCentered, FaceCentered
from anisotropy.meshing import Mesh
from anisotropy.openfoam.presets import CreatePatchDict
from anisotropy.solving.onephase import OnePhaseFlow
logger = logging.getLogger("anisotropy")
setupLogger(logger, logging.INFO)
class UltimateRunner(object):
def __init__(self, config = None, exec_id = None):
def __init__(self, config = None, exec_id = None, m_shape = None):
self.config = config or DefaultConfig()
self.database = Database(self.config["database"])
self.database.setup()
if not m_shape:
self.database = Database(self.config["database"])
self.database.setup()
if not exec_id:
self.exec_id = Execution(date = datetime.now())
self.exec_id.save()
with self.database.database:
self.exec_id = Execution(date = datetime.now())
self.exec_id.save()
self.type = "master"
self.m_shape = None
else:
self.exec_id = exec_id
self.type = "worker"
self.m_shape = m_shape
self.shape = None
self.mesh = None
self.flow = None
self.queue = []
def fill(self):
self.config.expand()
for case in self.config.cases:
with self.database.database:
m_shape = Shape(
exec_id = self.exec_id,
**case
)
m_shape.save()
self.queue.append(UltimateRunner(
config = self.config,
exec_id = self.exec_id,
m_shape = m_shape
))
def start(self, queue: list = None, nprocs: int = None):
nprocs = nprocs or self.config["nprocs"]
runners = [ runner.pipeline for runner in self.queue ]
args = [[self.config["stage"]]] * len(self.queue)
parallel = ParallelRunner(nprocs = nprocs)
parallel.start()
for runner in self.queue:
parallel.append(runner.pipeline, args = [self.config["stage"]])
parallel.wait()
#parallel(nprocs, args, runners)
# TODO: if runner done - remove from queue; results from parallel function
def casepath(self):
params = self.config.cases[0]
with self.database.database:
params = Shape.get(
Shape.exec_id == self.exec_id,
Shape.shape_id == self.m_shape.shape_id
)
return path.abspath(path.join(
self.config["build"],
params["label"],
"direction-[{},{},{}]".format(*[ str(d) for d in params["direction"] ]),
"theta-{}".format(params["theta"])
params.label,
"direction-[{},{},{}]".format(*[ str(d) for d in params.direction ]),
"theta-{}".format(params.theta)
))
def computeShape(self):
params = self.config.cases[0]
if not self.type == "worker":
return
self.database = Database(self.config["database"])
self.database.setup()
with self.database.database:
params = Shape.get(
Shape.exec_id == self.exec_id,
Shape.shape_id == self.m_shape.shape_id
)
filename = "shape.step"
logger.info([params.label, params.direction, params.theta])
self.shape = {
"simple": Simple,
"bodyCentered": BodyCentered,
"faceCentered": FaceCentered
}[params["label"]](params["direction"])
}[params.label](params.direction)
self.shape.build()
os.makedirs(self.casepath(), exist_ok = True)
self.shape.export(path.join(self.casepath(), filename))
with self.database.database:
params.shapeStatus = "Done"
params.save()
def computeMesh(self):
params = self.config.cases[0]
@ -122,30 +190,17 @@ class UltimateRunner(object):
stage = stage or self.config["stage"]
if stage in ["shape", "all"]:
with self.database.atomic():
Shape.create(self._exec_id, **self.config.cases[0])
self.computeShape()
elif stage in ["mesh", "all"]:
with self.database.atomic():
Mesh.create(self._exec_id)
self.computeMesh()
elif stage in ["flow", "all"]:
with self.database.atomic():
Flow.create(self._exec_id)
self.computeFlow()
elif stage in ["postProcess", "all"]:
self.postProcess()
def parallel(queue: list, nprocs = None):
nprocs = nprocs or self.config["nprocs"]
parallel(nprocs, [()] * len(queue), [ runner.pipeline for runner in queue ])

View File

@ -5,6 +5,7 @@
import logging
from multiprocessing import Queue, Process, cpu_count
import dill
import socket
import copy
import time
@ -276,7 +277,65 @@ def parallel(np, var, cmd):
return results
class ParallelRunner(object):
def __init__(self, nprocs: int = 1, daemon: bool = True):
self.nprocs = nprocs
self.daemon = daemon
self.processes = []
self.queueInput = Queue(maxsize = 1)
self.queueOutput = Queue()
self.__pos = -1
self.output = []
def append(self, command, args = [], kwargs = {}):
self.__pos += 1
self.queueInput.put(dill.dumps((self.__pos, command, args, kwargs)))
def extend(self, commands: list, args: list = [], kwargs: list = []):
for command, cargs, ckwargs in zip(commands, args, kwargs):
self.append(command, cargs, ckwargs)
@staticmethod
def queueRelease(queueInput, queueOutput):
while True:
pos, command, args, kwargs = dill.loads(queueInput.get())
if pos is None or command is None:
break
output = command(*args, **kwargs)
queueOutput.put((pos, output))
def start(self):
for n in range(self.nprocs):
self.processes.append(Process(
target = self.queueRelease,
args = (self.queueInput, self.queueOutput)
))
for proc in self.processes:
proc.daemon = self.daemon
proc.start()
def wait(self):
for _ in range(self.nprocs):
self.append(None)
self.output = [ [] for _ in range(self.queueOutput.qsize()) ]
for _ in range(self.queueOutput.qsize()):
pos, output = self.queueOutput.get()
self.output[pos] = output
for proc in self.processes:
proc.join()
self.__pos = -1
def portIsFree(address, port):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:

View File

@ -6,10 +6,9 @@ import os
from .models import (
sqliteDB,
Execution,
Physics,
Shape,
Mesh,
Flow
FlowOnephase
)
@ -21,16 +20,21 @@ class Database(object):
def setup(self):
path = os.path.abspath(self.filename)
#os.makedirs(path, exist_ok = True)
self.database.init(path)
self.database.init(
path,
pragmas = { "foreign_keys": 1 },
field_types = { "list": "text" },
autoconnect = False
)
if not os.path.exists(path):
self.database.create_tables([Execution])
self.database.create_tables([
Physics,
Shape,
Mesh,
Flow
])
with self.database:
self.database.create_tables([Execution])
self.database.create_tables([
Shape,
Mesh,
FlowOnephase
])

View File

@ -10,12 +10,13 @@ from peewee import (
IntegerField, BooleanField,
TimeField, DateTimeField
)
from anisotropy.database.utils import ListField, JSONField
from anisotropy.database.utils import JSONField
sqliteDB = SqliteDatabase(
None,
pragmas = { "foreign_keys": 1 },
field_types = { "list": "text" }
field_types = { "list": "text" },
autoconnect = False
)
class Execution(Model):
@ -29,30 +30,15 @@ class Execution(Model):
table_name = "executions"
class Physics(Model):
physics_id = AutoField()
exec_id = ForeignKeyField(Execution, backref = "physics")
volumeCell = FloatField(null = True)
volume = FloatField(null = True)
volumeRounded = FloatField(null = True)
porosity = FloatField(null = True)
porosityRounded = FloatField(null = True)
flowRate = FloatField(null = True)
permeability = FloatField(null = True)
class Meta:
database = sqliteDB
table_name = "physics"
depends_on = Execution
class Shape(Model):
structure_id = AutoField()
exec_id = ForeignKeyField(Execution, backref = "physics")
shape_id = AutoField()
exec_id = ForeignKeyField(Execution, backref = "executions")
shapeStatus = TextField(null = True, default = "Idle")
shapeCalculationTime = TimeField(null = True)
label = TextField(null = True)
direction = ListField(null = True)
direction = JSONField(null = True)
theta = FloatField(null = True)
r0 = FloatField(null = True)
@ -62,6 +48,12 @@ class Shape(Model):
filletsEnabled = BooleanField(null = True)
fillets = FloatField(null = True)
volumeCell = FloatField(null = True)
volume = FloatField(null = True)
volumeRounded = FloatField(null = True)
porosity = FloatField(null = True)
porosityRounded = FloatField(null = True)
class Meta:
database = sqliteDB
table_name = "shapes"
@ -70,7 +62,10 @@ class Shape(Model):
class Mesh(Model):
mesh_id = AutoField()
exec_id = ForeignKeyField(Execution, backref = "meshes")
shape_id = ForeignKeyField(Shape, backref = "shapes")
meshStatus = TextField(null = True, default = "Idle")
meshCalculationTime = TimeField(null = True)
elements = IntegerField(null = True)
edges = IntegerField(null = True)
@ -80,8 +75,6 @@ class Mesh(Model):
prisms = IntegerField(null = True)
pyramids = IntegerField(null = True)
meshStatus = TextField(null = True, default = "Idle")
meshCalculationTime = TimeField(null = True)
class Meta:
database = sqliteDB
@ -89,13 +82,16 @@ class Mesh(Model):
depends_on = Execution
class Flow(Model):
class FlowOnephase(Model):
flow_id = AutoField()
exec_id = ForeignKeyField(Execution, backref = "flows")
mesh_id = ForeignKeyField(Mesh, backref = "meshes")
flowStatus = TextField(null = True, default = "Idle")
flowCalculationTime = TimeField(null = True)
flowRate = FloatField(null = True)
permeability = FloatField(null = True)
class Meta:
database = sqliteDB
table_name = "flows"

View File

@ -4,7 +4,7 @@
from peewee import TextField
import json
from numpy import ndarray
class ListField(TextField):
field_type = "list"
@ -27,7 +27,13 @@ class ListField(TextField):
class JSONField(TextField):
def db_value(self, value):
return json.dumps(value)
if isinstance(value, ndarray):
formatted = list(value)
else:
formatted = value
return json.dumps(formatted)
def python_value(self, value):
if value is not None:

View File

@ -308,131 +308,331 @@
},
{
"cell_type": "code",
"execution_count": 30,
"id": "dc2e8b67-67ea-477a-af94-dd1ceb1a2a24",
"execution_count": null,
"id": "448c99bf-2b6f-43de-92d3-dc1267e766d3",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": 47,
"id": "6f9e3cfd-8945-4738-9bf6-704bca131a9a",
"metadata": {},
"outputs": [],
"source": [
"from anisotropy.core.main import Anisotropy\n",
"model = Anisotropy()\n",
"params_: list = model.loadFromScratch(\"test_anisotropy.toml\")"
"db = SqliteDatabase(\"newtest2.db\")"
]
},
{
"cell_type": "code",
"execution_count": 31,
"id": "e73c9863-e8d7-4ed5-a27e-0c733c3daeb6",
"execution_count": 48,
"id": "ff82a760-902a-416d-a5d6-775ddb418a9c",
"metadata": {},
"outputs": [],
"source": [
"class Test(Model):\n",
" test_id = AutoField()\n",
" text = TextField(null = True)\n",
" \n",
" class Meta:\n",
" database = db\n",
"\n",
"db.create_tables([Test])"
]
},
{
"cell_type": "code",
"execution_count": 49,
"id": "a7f89828-9a8d-43ab-92d4-681fd7cbc416",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"171"
"False"
]
},
"execution_count": 31,
"execution_count": 49,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"len(params_)"
"db.is_closed()"
]
},
{
"cell_type": "code",
"execution_count": 32,
"id": "f532c668-965a-4c7c-aaa9-23a9e308a455",
"metadata": {},
"outputs": [],
"source": [
"params = numpy.array(params_)"
]
},
{
"cell_type": "code",
"execution_count": 40,
"id": "6ad8131f-3e0f-43b9-ab41-b6e3e56a5245",
"metadata": {},
"outputs": [],
"source": [
"from pandas import Series"
]
},
{
"cell_type": "code",
"execution_count": 41,
"id": "6111802f-9e5f-47de-a6ea-f54dd062ecba",
"metadata": {},
"outputs": [],
"source": [
"pparams = Series(params_)"
]
},
{
"cell_type": "code",
"execution_count": 51,
"id": "11f04e87-e2f1-4d22-be85-8b6967935c92",
"metadata": {},
"outputs": [],
"source": [
"test = numpy.array([], dtype = object)"
]
},
{
"cell_type": "code",
"execution_count": 52,
"id": "e1a930c3-e591-41e3-8d76-8316f4b8bfac",
"metadata": {},
"outputs": [],
"source": [
"test += 1"
]
},
{
"cell_type": "code",
"execution_count": 55,
"id": "075e261f-4387-49c3-9c40-292885cffcc6",
"execution_count": 50,
"id": "3cf907ef-8988-4f08-b1c1-6c3637cefd29",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"\u001b[0;31mDocstring:\u001b[0m\n",
"a.fill(value)\n",
"\n",
"Fill the array with a scalar value.\n",
"\n",
"Parameters\n",
"----------\n",
"value : scalar\n",
" All elements of `a` will be assigned this value.\n",
"\n",
"Examples\n",
"--------\n",
">>> a = np.array([1, 2])\n",
">>> a.fill(0)\n",
">>> a\n",
"array([0, 0])\n",
">>> a = np.empty(2)\n",
">>> a.fill(1)\n",
">>> a\n",
"array([1., 1.])\n",
"\u001b[0;31mType:\u001b[0m builtin_function_or_method\n"
"1"
]
},
"execution_count": 50,
"metadata": {},
"output_type": "display_data"
"output_type": "execute_result"
}
],
"source": [
"?test.fill"
"test1 = Test(text = \"hola\")\n",
"test1.save()"
]
},
{
"cell_type": "code",
"execution_count": 51,
"id": "0c91e698-a07b-4b52-bc9d-51a37e179cee",
"metadata": {},
"outputs": [],
"source": [
"test2 = Test.create(text = \"asd\")"
]
},
{
"cell_type": "code",
"execution_count": 52,
"id": "1deee68c-07b7-4003-a29f-1e726aefc9e0",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"1 hola\n",
"2 asd\n"
]
}
],
"source": [
"for row in Test.select():\n",
" print(row.test_id, row.text)"
]
},
{
"cell_type": "code",
"execution_count": 53,
"id": "c59d0a44-b907-427a-91d6-d9c9c72c8bdd",
"metadata": {},
"outputs": [],
"source": [
"from multiprocessing import Process, Queue"
]
},
{
"cell_type": "code",
"execution_count": 54,
"id": "88212490-2dd1-4058-80fe-57ae30bf38cc",
"metadata": {},
"outputs": [],
"source": [
"def queue(cmd, qin, qout, *args):\n",
" while True:\n",
" pos, var = qin.get()\n",
" \n",
" if pos is None:\n",
" break\n",
"\n",
" res = cmd(*var, *args)\n",
"\n",
" qout.put((pos, res))\n",
"\n",
" return"
]
},
{
"cell_type": "code",
"execution_count": 55,
"id": "df6568c8-595d-4c35-9560-16bfe8e3a915",
"metadata": {},
"outputs": [],
"source": [
"def db_save(table):\n",
" with db.atomic():\n",
" return table.save()"
]
},
{
"cell_type": "code",
"execution_count": 56,
"id": "53fa9e79-e3e9-4758-a434-4f411dd15858",
"metadata": {},
"outputs": [],
"source": [
"qin = Queue(1)\n",
"qout = Queue()\n",
"procs = []\n",
"nprocs = 10\n",
"\n",
"for n in range(nprocs):\n",
" args = (db_save, qin, qout)\n",
"\n",
" procs.append(Process(target = queue, args = args))"
]
},
{
"cell_type": "code",
"execution_count": 57,
"id": "86a39b3c-bbb9-4c7e-938b-3330ad7406b9",
"metadata": {},
"outputs": [],
"source": [
"for p in procs:\n",
" p.daemon = True\n",
" p.start()\n",
"\n",
"var = []\n",
"for n in range(50):\n",
" var.append([Test(text = f\"test_{ n }\")])\n",
"\n",
"for n in range(len(var)):\n",
" qin.put((n, var[n]))\n",
"\n",
"for _ in range(nprocs):\n",
" qin.put((None, None))\n",
"\n",
"results = [[] for n in range(len(var))]\n",
"\n",
"for n in range(len(var)):\n",
" index, res = qout.get() \n",
" results[index] = res\n",
"\n",
"for p in procs:\n",
" p.join()"
]
},
{
"cell_type": "code",
"execution_count": 58,
"id": "38e46264-7a61-4215-afe6-827b835ca82b",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"0"
]
},
"execution_count": 58,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"procs[0].exitcode"
]
},
{
"cell_type": "code",
"execution_count": 59,
"id": "d8da8120-ba3e-44d3-9b9b-78f040c957c0",
"metadata": {},
"outputs": [],
"source": [
"test3 = Test.create(text = \"afterproc\")"
]
},
{
"cell_type": "code",
"execution_count": 60,
"id": "c716656c-7aaa-40da-8582-2311f1dd5314",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"1 hola\n",
"2 asd\n",
"3 test_1\n",
"4 test_0\n",
"5 test_11\n",
"6 test_12\n",
"7 test_13\n",
"8 test_3\n",
"9 test_9\n",
"10 test_16\n",
"11 test_2\n",
"12 test_7\n",
"13 test_19\n",
"14 test_20\n",
"15 test_4\n",
"16 test_21\n",
"17 test_23\n",
"18 test_24\n",
"19 test_25\n",
"20 test_26\n",
"21 test_27\n",
"22 test_28\n",
"23 test_18\n",
"24 test_30\n",
"25 test_31\n",
"26 test_32\n",
"27 test_14\n",
"28 test_34\n",
"29 test_35\n",
"30 test_6\n",
"31 test_37\n",
"32 test_38\n",
"33 test_39\n",
"34 test_40\n",
"35 test_41\n",
"36 test_36\n",
"37 test_43\n",
"38 test_44\n",
"39 test_42\n",
"40 test_46\n",
"41 test_47\n",
"42 test_48\n",
"43 test_49\n",
"44 test_45\n",
"45 test_29\n",
"46 test_5\n",
"47 test_10\n",
"48 test_15\n",
"49 test_17\n",
"50 test_33\n",
"51 test_22\n",
"52 test_8\n",
"53 afterproc\n"
]
}
],
"source": [
"for row in Test.select():\n",
" print(row.test_id, row.text)"
]
},
{
"cell_type": "code",
"execution_count": 64,
"id": "f46377ad-0af5-4acf-95bb-955e327f51e8",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"<peewee.SqliteDatabase at 0x7f4cdbc08f70>"
]
},
"execution_count": 64,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"db"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "448c99bf-2b6f-43de-92d3-dc1267e766d3",
"id": "79a02231-365d-4a20-8899-6b5e8dbc8489",
"metadata": {},
"outputs": [],
"source": []
@ -454,7 +654,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.0"
"version": "3.9.7"
}
},
"nbformat": 4,

BIN
playground/newtest.db Normal file

Binary file not shown.

BIN
playground/newtest2.db Normal file

Binary file not shown.