Skip to content

Core

materia.core

cache

CacheError

Bases: Exception

Cache

Cache(url, pool)
PARAMETER DESCRIPTION
url

TYPE: RedisDsn

pool

TYPE: ConnectionPool

Source code in src/materia/core/cache.py
14
15
16
def __init__(self, url: RedisDsn, pool: aioredis.ConnectionPool):
    self.url: RedisDsn = url
    self.pool: aioredis.ConnectionPool = pool
url instance-attribute
url = url
pool instance-attribute
pool = pool
new async staticmethod
new(
    url,
    encoding="utf-8",
    decode_responses=True,
    test_connection=True,
)
PARAMETER DESCRIPTION
url

TYPE: RedisDsn

encoding

TYPE: str DEFAULT: 'utf-8'

decode_responses

TYPE: bool DEFAULT: True

test_connection

TYPE: bool DEFAULT: True

Source code in src/materia/core/cache.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@staticmethod
async def new(
    url: RedisDsn,
    encoding: str = "utf-8",
    decode_responses: bool = True,
    test_connection: bool = True,
) -> Self:
    pool = aioredis.ConnectionPool.from_url(
        str(url), encoding=encoding, decode_responses=decode_responses
    )

    if test_connection:
        try:
            if logger := Logger.instance():
                logger.debug("Testing cache connection")
            connection = pool.make_connection()
            await connection.connect()
        except ConnectionError as e:
            raise CacheError(f"{e}")
        else:
            await connection.disconnect()

    return Cache(url=url, pool=pool)
client async
client()
Source code in src/materia/core/cache.py
42
43
44
45
46
47
@asynccontextmanager
async def client(self) -> AsyncGenerator[aioredis.Redis, Any]:
    try:
        yield aioredis.Redis(connection_pool=self.pool)
    except Exception as e:
        raise CacheError(f"{e}")
pipeline async
pipeline(transaction=True)
PARAMETER DESCRIPTION
transaction

TYPE: bool DEFAULT: True

Source code in src/materia/core/cache.py
49
50
51
52
53
54
55
56
@asynccontextmanager
async def pipeline(self, transaction: bool = True) -> AsyncGenerator[Pipeline, Any]:
    client = await aioredis.Redis(connection_pool=self.pool)

    try:
        yield client.pipeline(transaction=transaction)
    except Exception as e:
        raise CacheError(f"{e}")

config

Application

Bases: BaseModel

user class-attribute instance-attribute
user = 'materia'
group class-attribute instance-attribute
group = 'materia'
mode class-attribute instance-attribute
mode = 'production'
working_directory class-attribute instance-attribute
working_directory = cwd()

Log

Bases: BaseModel

mode class-attribute instance-attribute
mode = 'console'
level class-attribute instance-attribute
level = 'info'
console_format class-attribute instance-attribute
console_format = "<level>{level: <8}</level> <green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> - {message}"
file_format class-attribute instance-attribute
file_format = "<level>{level: <8}</level>: <green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> - {message}"
file class-attribute instance-attribute
file = None
file_rotation class-attribute instance-attribute
file_rotation = '3 days'
file_retention class-attribute instance-attribute
file_retention = '1 week'

Server

Bases: BaseModel

scheme class-attribute instance-attribute
scheme = 'http'
address class-attribute instance-attribute
address = Field(default='127.0.0.1')
port class-attribute instance-attribute
port = 54601
domain class-attribute instance-attribute
domain = 'localhost'
url
url()
Source code in src/materia/core/config.py
41
42
def url(self) -> str:
    return "{}://{}:{}".format(self.scheme, self.address, self.port)

Database

Bases: BaseModel

backend class-attribute instance-attribute
backend = 'postgresql'
scheme class-attribute instance-attribute
scheme = 'postgresql+asyncpg'
address class-attribute instance-attribute
address = Field(default='127.0.0.1')
port class-attribute instance-attribute
port = 5432
name class-attribute instance-attribute
name = 'materia'
user class-attribute instance-attribute
user = 'materia'
password class-attribute instance-attribute
password = None
url
url()
Source code in src/materia/core/config.py
55
56
57
58
59
60
61
62
63
64
65
66
def url(self) -> str:
    if self.backend in ["postgresql"]:
        return (
            "{}://{}:{}@{}:{}".format(
                self.scheme, self.user, self.password, self.address, self.port
            )
            + f"/{self.name}"
            if self.name
            else ""
        )
    else:
        raise NotImplementedError()

Cache

Bases: BaseModel

backend class-attribute instance-attribute
backend = 'redis'
scheme class-attribute instance-attribute
scheme = 'redis'
address class-attribute instance-attribute
address = Field(default='127.0.0.1')
port class-attribute instance-attribute
port = 6379
user class-attribute instance-attribute
user = None
password class-attribute instance-attribute
password = None
database class-attribute instance-attribute
database = 0
url
url()
Source code in src/materia/core/config.py
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def url(self) -> str:
    if self.backend in ["redis"]:
        if self.user and self.password:
            return "{}://{}:{}@{}:{}/{}".format(
                self.scheme,
                self.user,
                self.password,
                self.address,
                self.port,
                self.database,
            )
        else:
            return "{}://{}:{}/{}".format(
                self.scheme, self.address, self.port, self.database
            )
    else:
        raise NotImplementedError()

Security

Bases: BaseModel

secret_key class-attribute instance-attribute
secret_key = None
password_min_length class-attribute instance-attribute
password_min_length = 8
password_hash_algo class-attribute instance-attribute
password_hash_algo = 'bcrypt'
cookie_http_only class-attribute instance-attribute
cookie_http_only = True
cookie_access_token_name class-attribute instance-attribute
cookie_access_token_name = 'materia_at'
cookie_refresh_token_name class-attribute instance-attribute
cookie_refresh_token_name = 'materia_rt'

OAuth2

Bases: BaseModel

enabled class-attribute instance-attribute
enabled = True
jwt_signing_algo class-attribute instance-attribute
jwt_signing_algo = 'HS256'
jwt_signing_key class-attribute instance-attribute
jwt_signing_key = None
jwt_secret class-attribute instance-attribute
jwt_secret = 'changeme'
access_token_lifetime class-attribute instance-attribute
access_token_lifetime = 3600
refresh_token_lifetime class-attribute instance-attribute
refresh_token_lifetime = 730 * 60
refresh_token_validation class-attribute instance-attribute
refresh_token_validation = False

Mailer

Bases: BaseModel

enabled class-attribute instance-attribute
enabled = False
scheme class-attribute instance-attribute
scheme = None
address class-attribute instance-attribute
address = None
port class-attribute instance-attribute
port = None
helo class-attribute instance-attribute
helo = True
cert_file class-attribute instance-attribute
cert_file = None
key_file class-attribute instance-attribute
key_file = None
from_ class-attribute instance-attribute
from_ = None
user class-attribute instance-attribute
user = None
password class-attribute instance-attribute
password = None
plain_text class-attribute instance-attribute
plain_text = False

Cron

Bases: BaseModel

workers_count class-attribute instance-attribute
workers_count = 1

Repository

Bases: BaseModel

capacity class-attribute instance-attribute
capacity = 5 << 30

Config

Bases: BaseSettings

application class-attribute instance-attribute
application = Application()
log class-attribute instance-attribute
log = Log()
server class-attribute instance-attribute
server = Server()
database class-attribute instance-attribute
database = Database()
cache class-attribute instance-attribute
cache = Cache()
security class-attribute instance-attribute
security = Security()
oauth2 class-attribute instance-attribute
oauth2 = OAuth2()
mailer class-attribute instance-attribute
mailer = Mailer()
cron class-attribute instance-attribute
cron = Cron()
repository class-attribute instance-attribute
repository = Repository()
open staticmethod
open(path)
PARAMETER DESCRIPTION
path

TYPE: Path

Source code in src/materia/core/config.py
165
166
167
168
169
170
171
172
173
@staticmethod
def open(path: Path) -> Self | None:
    try:
        data: dict = toml.load(path)
    except Exception as e:
        raise e
        # return None
    else:
        return Config(**data)
write
write(path)
PARAMETER DESCRIPTION
path

TYPE: Path

Source code in src/materia/core/config.py
175
176
177
178
179
180
181
182
183
184
185
def write(self, path: Path):
    dump = self.model_dump()

    # TODO: make normal filter or check model_dump abilities
    for key_first in dump.keys():
        for key_second in dump[key_first].keys():
            if isinstance(dump[key_first][key_second], Path):
                dump[key_first][key_second] = str(dump[key_first][key_second])

    with open(path, "w") as file:
        toml.dump(dump, file)
data_dir staticmethod
data_dir()
Source code in src/materia/core/config.py
187
188
189
190
191
192
193
@staticmethod
def data_dir() -> Path:
    cwd = Path.cwd()
    if environ.get("MATERIA_DEBUG"):
        return cwd / "temp"
    else:
        return cwd

cron

CronError

Bases: Exception

Cron

Cron(workers_count, backend)
PARAMETER DESCRIPTION
workers_count

TYPE: int

backend

TYPE: Celery

Source code in src/materia/core/cron.py
15
16
17
18
19
20
21
22
23
24
25
def __init__(
    self,
    workers_count: int,
    backend: Celery,
):
    self.workers_count = workers_count
    self.backend = backend
    self.workers = []
    self.worker_threads = []

    Cron.__instance__ = self
workers_count instance-attribute
workers_count = workers_count
backend instance-attribute
backend = backend
workers instance-attribute
workers = []
worker_threads instance-attribute
worker_threads = []
new staticmethod
new(
    workers_count=1,
    backend_url=None,
    broker_url=None,
    test_connection=True,
    **kwargs
)
PARAMETER DESCRIPTION
workers_count

TYPE: int DEFAULT: 1

backend_url

TYPE: Optional[RedisDsn] DEFAULT: None

broker_url

TYPE: Optional[RedisDsn] DEFAULT: None

test_connection

TYPE: bool DEFAULT: True

**kwargs

DEFAULT: {}

Source code in src/materia/core/cron.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@staticmethod
def new(
    workers_count: int = 1,
    backend_url: Optional[RedisDsn] = None,
    broker_url: Optional[RedisDsn] = None,
    test_connection: bool = True,
    **kwargs,
):
    cron = Cron(
        workers_count,
        # TODO: change log level
        # TODO: exclude pickle
        # TODO: disable startup banner
        Celery(
            "cron",
            backend=backend_url,
            broker=broker_url,
            broker_connection_retry_on_startup=True,
            task_serializer="pickle",
            accept_content=["pickle", "json"],
            **kwargs,
        ),
    )

    for _ in range(workers_count):
        cron.workers.append(cron.backend.Worker())

    if test_connection:
        try:
            if logger := Logger.instance():
                logger.debug("Testing cron broker connection")
            cron.backend.broker_connection().ensure_connection(max_retries=3)
        except Exception as e:
            raise CronError(f"Failed to connect cron broker: {broker_url}") from e

    return cron
instance staticmethod
instance()
Source code in src/materia/core/cron.py
64
65
66
@staticmethod
def instance() -> Optional[Self]:
    return Cron.__instance__
run_workers
run_workers()
Source code in src/materia/core/cron.py
68
69
70
71
72
def run_workers(self):
    for worker in self.workers:
        thread = Thread(target=worker.start, daemon=True)
        self.worker_threads.append(thread)
        thread.start()

database

SessionContext module-attribute

SessionContext = AsyncIterator[AsyncSession]

SessionMaker module-attribute

SessionMaker = async_sessionmaker[AsyncSession]

ConnectionContext module-attribute

ConnectionContext = AsyncIterator[AsyncConnection]

DatabaseError

Bases: Exception

DatabaseMigrationError

Bases: Exception

Database

Database(url, engine, sessionmaker)
PARAMETER DESCRIPTION
url

TYPE: PostgresDsn

engine

TYPE: AsyncEngine

sessionmaker

TYPE: SessionMaker

Source code in src/materia/core/database.py
38
39
40
41
42
43
44
45
46
def __init__(
    self,
    url: PostgresDsn,
    engine: AsyncEngine,
    sessionmaker: SessionMaker,
):
    self.url: PostgresDsn = url
    self.engine: AsyncEngine = engine
    self.sessionmaker: SessionMaker = sessionmaker
url instance-attribute
url = url
engine instance-attribute
engine = engine
sessionmaker instance-attribute
sessionmaker = sessionmaker
new async staticmethod
new(
    url,
    pool_size=100,
    poolclass=None,
    autocommit=False,
    autoflush=False,
    expire_on_commit=False,
    test_connection=True,
)
PARAMETER DESCRIPTION
url

TYPE: PostgresDsn

pool_size

TYPE: int DEFAULT: 100

poolclass

DEFAULT: None

autocommit

TYPE: bool DEFAULT: False

autoflush

TYPE: bool DEFAULT: False

expire_on_commit

TYPE: bool DEFAULT: False

test_connection

TYPE: bool DEFAULT: True

Source code in src/materia/core/database.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@staticmethod
async def new(
    url: PostgresDsn,
    pool_size: int = 100,
    poolclass=None,
    autocommit: bool = False,
    autoflush: bool = False,
    expire_on_commit: bool = False,
    test_connection: bool = True,
) -> Self:
    engine_options = {"pool_size": pool_size}
    if poolclass == NullPool:
        engine_options = {"poolclass": NullPool}

    engine = create_async_engine(str(url), **engine_options)

    sessionmaker = async_sessionmaker(
        bind=engine,
        autocommit=autocommit,
        autoflush=autoflush,
        expire_on_commit=expire_on_commit,
    )

    database = Database(url=url, engine=engine, sessionmaker=sessionmaker)

    if test_connection:
        try:
            if logger := Logger.instance():
                logger.debug("Testing database connection")
            async with database.connection() as connection:
                await connection.rollback()
        except Exception as e:
            raise DatabaseError(
                f"Failed to connect to database '{url}': {e}"
            ) from e

    return database
dispose async
dispose()
Source code in src/materia/core/database.py
86
87
async def dispose(self):
    await self.engine.dispose()
connection async
connection()
Source code in src/materia/core/database.py
89
90
91
92
93
94
95
96
@asynccontextmanager
async def connection(self) -> ConnectionContext:
    async with self.engine.connect() as connection:
        try:
            yield connection
        except Exception as e:
            await connection.rollback()
            raise DatabaseError(*e.args) from e
session async
session()
Source code in src/materia/core/database.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
@asynccontextmanager
async def session(self) -> SessionContext:
    session = self.sessionmaker()

    try:
        yield session
    except (HTTPException, ValidationError) as e:
        await session.rollback()
        raise e from None
    except Exception as e:
        await session.rollback()
        raise e  # DatabaseError(*e.args) from e
    finally:
        await session.close()
run_sync_migrations
run_sync_migrations(connection)
PARAMETER DESCRIPTION
connection

TYPE: Connection

Source code in src/materia/core/database.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def run_sync_migrations(self, connection: Connection):
    from materia.models.base import Base

    aconfig = AlembicConfig()
    aconfig.set_main_option("sqlalchemy.url", str(self.url))
    aconfig.set_main_option(
        "script_location",
        str(Path(__file__).parent.parent.joinpath("models", "migrations")),
    )

    context = MigrationContext.configure(
        connection=connection,  # type: ignore
        opts={
            "target_metadata": Base.metadata,
            "fn": lambda rev, _: ScriptDirectory.from_config(aconfig)._upgrade_revs(
                "head", rev
            ),
        },
    )

    try:
        with context.begin_transaction():
            with Operations.context(context):
                context.run_migrations()
    except Exception as e:
        raise DatabaseMigrationError(f"{e}")
run_migrations async
run_migrations()
Source code in src/materia/core/database.py
140
141
142
async def run_migrations(self):
    async with self.connection() as connection:
        await connection.run_sync(self.run_sync_migrations)  # type: ignore
rollback_sync_migrations
rollback_sync_migrations(connection)
PARAMETER DESCRIPTION
connection

TYPE: Connection

Source code in src/materia/core/database.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def rollback_sync_migrations(self, connection: Connection):
    from materia.models.base import Base

    aconfig = AlembicConfig()
    aconfig.set_main_option("sqlalchemy.url", str(self.url))
    aconfig.set_main_option(
        "script_location",
        str(Path(__file__).parent.parent.joinpath("models", "migrations")),
    )

    context = MigrationContext.configure(
        connection=connection,  # type: ignore
        opts={
            "target_metadata": Base.metadata,
            "fn": lambda rev, _: ScriptDirectory.from_config(
                aconfig
            )._downgrade_revs("base", rev),
        },
    )

    try:
        with context.begin_transaction():
            with Operations.context(context):
                context.run_migrations()
    except Exception as e:
        raise DatabaseMigrationError(f"{e}")
rollback_migrations async
rollback_migrations()
Source code in src/materia/core/database.py
171
172
173
async def rollback_migrations(self):
    async with self.connection() as connection:
        await connection.run_sync(self.rollback_sync_migrations)  # type: ignore

filesystem

valid_path module-attribute

valid_path = compile('^/(.*/)*([^/]*)$')

FileSystemError

Bases: Exception

FileSystem

FileSystem(path, isolated_directory=None)
PARAMETER DESCRIPTION
path

TYPE: Path

isolated_directory

TYPE: Optional[Path] DEFAULT: None

Source code in src/materia/core/filesystem.py
22
23
24
25
26
27
28
29
30
31
def __init__(self, path: Path, isolated_directory: Optional[Path] = None):
    if path == Path() or path is None:
        raise FileSystemError("The given path is empty")

    self.path = path

    if isolated_directory and not isolated_directory.is_absolute():
        raise FileSystemError("The isolated directory must be absolute")

    self.isolated_directory = isolated_directory
path instance-attribute
path = path
isolated_directory instance-attribute
isolated_directory = isolated_directory
exists async
exists()
Source code in src/materia/core/filesystem.py
35
36
async def exists(self) -> bool:
    return await async_path.exists(self.path)
size async
size()
Source code in src/materia/core/filesystem.py
38
39
async def size(self) -> int:
    return await async_path.getsize(self.path)
is_file async
is_file()
Source code in src/materia/core/filesystem.py
41
42
async def is_file(self) -> bool:
    return await async_path.isfile(self.path)
is_directory async
is_directory()
Source code in src/materia/core/filesystem.py
44
45
async def is_directory(self) -> bool:
    return await async_path.isdir(self.path)
name
name()
Source code in src/materia/core/filesystem.py
47
48
def name(self) -> str:
    return self.path.name
check_isolation async
check_isolation(path)
PARAMETER DESCRIPTION
path

TYPE: Path

Source code in src/materia/core/filesystem.py
50
51
52
53
54
55
56
57
58
59
60
async def check_isolation(self, path: Path):
    if not self.isolated_directory:
        return
    if not (await async_path.exists(self.isolated_directory)):
        raise FileSystemError("Missed isolated directory")
    if not optional(path.relative_to, self.isolated_directory):
        raise FileSystemError(
            "Attempting to work with a path that is outside the isolated directory"
        )
    if self.path == self.isolated_directory:
        raise FileSystemError("Attempting to modify the isolated directory")
remove async
remove(shallow=False)
PARAMETER DESCRIPTION
shallow

TYPE: bool DEFAULT: False

Source code in src/materia/core/filesystem.py
62
63
64
65
66
67
68
69
70
71
async def remove(self, shallow: bool = False):
    await self.check_isolation(self.path)
    try:
        if await self.exists() and await self.is_file() and not shallow:
            await aiofiles.os.remove(self.path)

        if await self.exists() and await self.is_directory() and not shallow:
            await aioshutil.rmtree(str(self.path))
    except OSError as e:
        raise FileSystemError(*e.args) from e
generate_name async
generate_name(target_directory, name)

Generate name based on target directory contents and self type.

PARAMETER DESCRIPTION
target_directory

TYPE: Path

name

TYPE: str

Source code in src/materia/core/filesystem.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
async def generate_name(self, target_directory: Path, name: str) -> str:
    """Generate name based on target directory contents and self type."""
    count = 1
    new_path = target_directory.joinpath(name)

    while await async_path.exists(new_path):
        if await self.is_file():
            if with_counter := re.match(r"^(.+)\.(\d+)\.(\w+)$", new_path.name):
                new_name, _, extension = with_counter.groups()
            elif with_extension := re.match(r"^(.+)\.(\w+)$", new_path.name):
                new_name, extension = with_extension.groups()

            new_path = target_directory.joinpath(
                "{}.{}.{}".format(new_name, count, extension)
            )

        if await self.is_directory():
            if with_counter := re.match(r"^(.+)\.(\d+)$", new_path.name):
                new_name, _ = with_counter.groups()
            else:
                new_name = new_path.name

            new_path = target_directory.joinpath("{}.{}".format(new_name, count))

        count += 1

    return new_path.name
move async
move(
    target_directory,
    new_name=None,
    force=False,
    shallow=False,
)
PARAMETER DESCRIPTION
target_directory

TYPE: Path

new_name

TYPE: Optional[str] DEFAULT: None

force

TYPE: bool DEFAULT: False

shallow

TYPE: bool DEFAULT: False

Source code in src/materia/core/filesystem.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
async def move(
    self,
    target_directory: Path,
    new_name: Optional[str] = None,
    force: bool = False,
    shallow: bool = False,
) -> Self:
    await self.check_isolation(self.path)
    new_path = await self._generate_new_path(
        target_directory, new_name, force=force, shallow=shallow
    )
    target = FileSystem(new_path, self.isolated_directory)

    try:
        if await self.exists() and not shallow:
            await aioshutil.move(self.path, new_path)
    except Exception as e:
        raise FileSystemError(*e.args) from e

    return target
rename async
rename(new_name, force=False, shallow=False)
PARAMETER DESCRIPTION
new_name

TYPE: str

force

TYPE: bool DEFAULT: False

shallow

TYPE: bool DEFAULT: False

Source code in src/materia/core/filesystem.py
139
140
141
142
143
144
async def rename(
    self, new_name: str, force: bool = False, shallow: bool = False
) -> Self:
    return await self.move(
        self.path.parent, new_name=new_name, force=force, shallow=shallow
    )
copy async
copy(
    target_directory,
    new_name=None,
    force=False,
    shallow=False,
)
PARAMETER DESCRIPTION
target_directory

TYPE: Path

new_name

TYPE: Optional[str] DEFAULT: None

force

TYPE: bool DEFAULT: False

shallow

TYPE: bool DEFAULT: False

Source code in src/materia/core/filesystem.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
async def copy(
    self,
    target_directory: Path,
    new_name: Optional[str] = None,
    force: bool = False,
    shallow: bool = False,
) -> Self:
    await self.check_isolation(self.path)
    new_path = await self._generate_new_path(
        target_directory, new_name, force=force, shallow=shallow
    )
    target = FileSystem(new_path, self.isolated_directory)

    try:
        if await self.is_file() and not shallow:
            await aioshutil.copy(self.path, new_path)

        if await self.is_directory() and not shallow:
            await aioshutil.copytree(self.path, new_path)
    except Exception as e:
        raise FileSystemError(*e.args) from e

    return target
make_directory async
make_directory(force=False)
PARAMETER DESCRIPTION
force

TYPE: bool DEFAULT: False

Source code in src/materia/core/filesystem.py
170
171
172
173
174
175
176
177
async def make_directory(self, force: bool = False):
    try:
        if await self.exists() and not force:
            raise FileSystemError("Already exists")

        await async_os.makedirs(self.path, exist_ok=force)
    except Exception as e:
        raise FileSystemError(*e.args)
write_file async
write_file(data, force=False)
PARAMETER DESCRIPTION
data

TYPE: bytes

force

TYPE: bool DEFAULT: False

Source code in src/materia/core/filesystem.py
179
180
181
182
183
184
185
186
187
async def write_file(self, data: bytes, force: bool = False):
    try:
        if await self.exists() and not force:
            raise FileSystemError("Already exists")

        async with aiofiles.open(self.path, mode="wb") as file:
            await file.write(data)
    except Exception as e:
        raise FileSystemError(*e.args)
check_path staticmethod
check_path(path)
PARAMETER DESCRIPTION
path

TYPE: Path

Source code in src/materia/core/filesystem.py
189
190
191
@staticmethod
def check_path(path: Path) -> bool:
    return bool(valid_path.match(str(path)))
normalize staticmethod
normalize(path)

Resolve path and make it relative.

PARAMETER DESCRIPTION
path

TYPE: Path

Source code in src/materia/core/filesystem.py
193
194
195
196
197
198
199
@staticmethod
def normalize(path: Path) -> Path:
    """Resolve path and make it relative."""
    if not path.is_absolute():
        path = Path("/").joinpath(path)

    return Path(*path.resolve().parts[1:])

TemporaryFileTarget

TemporaryFileTarget(
    working_directory, allow_overwrite=True, *args, **kwargs
)

Bases: BaseTarget

PARAMETER DESCRIPTION
working_directory

TYPE: Path

allow_overwrite

TYPE: bool DEFAULT: True

*args

DEFAULT: ()

**kwargs

DEFAULT: {}

Source code in src/materia/core/filesystem.py
203
204
205
206
207
208
209
210
211
212
213
def __init__(
    self, working_directory: Path, allow_overwrite: bool = True, *args, **kwargs
):
    if working_directory == Path():
        raise FileSystemError("The given working directory is empty")

    super().__init__(*args, **kwargs)

    self._mode = "wb" if allow_overwrite else "xb"
    self._fd = None
    self._path = working_directory.joinpath("cache", str(uuid4()))
on_start
on_start()
Source code in src/materia/core/filesystem.py
215
216
217
218
219
def on_start(self):
    if not self._path.parent.exists():
        self._path.parent.mkdir(exist_ok=True)

    self._fd = open(str(self._path), mode="wb")
on_data_received
on_data_received(chunk)
PARAMETER DESCRIPTION
chunk

TYPE: bytes

Source code in src/materia/core/filesystem.py
221
222
223
def on_data_received(self, chunk: bytes):
    if self._fd:
        self._fd.write(chunk)
on_finish
on_finish()
Source code in src/materia/core/filesystem.py
225
226
227
def on_finish(self):
    if self._fd:
        self._fd.close()
path
path()
Source code in src/materia/core/filesystem.py
229
230
def path(self) -> Optional[Path]:
    return self._path
remove
remove()
Source code in src/materia/core/filesystem.py
232
233
234
235
def remove(self):
    if self._fd:
        if (path := Path(self._fd.name)).exists():
            path.unlink()

logging

LogLevel module-attribute

LogLevel = Literal[
    "info", "warning", "error", "critical", "debug", "trace"
]

LogMode module-attribute

LogMode = Literal['console', 'file', 'all']

InterceptHandler

Bases: Handler

emit
emit(record)
PARAMETER DESCRIPTION
record

TYPE: LogRecord

Source code in src/materia/core/logging.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def emit(self, record: logging.LogRecord) -> None:
    level: str | int
    try:
        level = logger.level(record.levelname).name
    except ValueError:
        level = record.levelno

    frame, depth = inspect.currentframe(), 2
    while frame and (depth == 0 or frame.f_code.co_filename == logging.__file__):
        frame = frame.f_back
        depth += 1

    logger.opt(depth=depth, exception=record.exc_info).log(
        level, record.getMessage()
    )

Logger

Logger()
Source code in src/materia/core/logging.py
35
36
def __init__(self):
    raise NotImplementedError()
new staticmethod
new(
    mode="console",
    level="info",
    console_format="<level>{level: <8}</level> <green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> - {message}",
    file_format="<level>{level: <8}</level>: <green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> - {message}",
    file=None,
    file_rotation="3 days",
    file_retention="1 week",
    interceptions=[
        "uvicorn",
        "uvicorn.access",
        "uvicorn.error",
        "uvicorn.asgi",
        "fastapi",
    ],
)
PARAMETER DESCRIPTION
mode

TYPE: LogMode DEFAULT: 'console'

level

TYPE: LogLevel DEFAULT: 'info'

console_format

TYPE: str DEFAULT: '<level>{level: <8}</level> <green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> - {message}'

file_format

TYPE: str DEFAULT: '<level>{level: <8}</level>: <green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> - {message}'

file

TYPE: Optional[Path] DEFAULT: None

file_rotation

TYPE: str DEFAULT: '3 days'

file_retention

TYPE: str DEFAULT: '1 week'

interceptions

TYPE: Sequence[str] DEFAULT: ['uvicorn', 'uvicorn.access', 'uvicorn.error', 'uvicorn.asgi', 'fastapi']

Source code in src/materia/core/logging.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
@staticmethod
def new(
    mode: LogMode = "console",
    level: LogLevel = "info",
    console_format: str = (
        "<level>{level: <8}</level> <green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> - {message}"
    ),
    file_format: str = (
        "<level>{level: <8}</level>: <green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> - {message}"
    ),
    file: Optional[Path] = None,
    file_rotation: str = "3 days",
    file_retention: str = "1 week",
    interceptions: Sequence[str] = [
        "uvicorn",
        "uvicorn.access",
        "uvicorn.error",
        "uvicorn.asgi",
        "fastapi",
    ],
) -> LoggerInstance:
    logger.remove()

    if mode in ["console", "all"]:
        logger.add(
            sys.stdout,
            enqueue=True,
            backtrace=True,
            level=level.upper(),
            format=console_format,
            filter=lambda record: record["level"].name
            in ["INFO", "WARNING", "DEBUG", "TRACE"],
        )
        logger.add(
            sys.stderr,
            enqueue=True,
            backtrace=True,
            level=level.upper(),
            format=console_format,
            filter=lambda record: record["level"].name in ["ERROR", "CRITICAL"],
        )

    if mode in ["file", "all"]:
        logger.add(
            str(file),
            rotation=file_rotation,
            retention=file_retention,
            enqueue=True,
            backtrace=True,
            level=level.upper(),
            format=file_format,
        )

    logging.basicConfig(
        handlers=[InterceptHandler()], level=logging.NOTSET, force=True
    )

    for external_logger in interceptions:
        logging.getLogger(external_logger).handlers = [InterceptHandler()]

    Logger.__instance__ = logger

    return logger  # type: ignore
instance staticmethod
instance()
Source code in src/materia/core/logging.py
102
103
104
@staticmethod
def instance() -> Optional[LoggerInstance]:
    return Logger.__instance__
uvicorn_config staticmethod
uvicorn_config(level)
PARAMETER DESCRIPTION
level

TYPE: LogLevel

Source code in src/materia/core/logging.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
@staticmethod
def uvicorn_config(level: LogLevel) -> dict:
    return {
        "version": 1,
        "disable_existing_loggers": False,
        "handlers": {
            "default": {"class": "materia.core.logging.InterceptHandler"},
            "access": {"class": "materia.core.logging.InterceptHandler"},
        },
        "loggers": {
            "uvicorn": {
                "handlers": ["default"],
                "level": level.upper(),
                "propagate": False,
            },
            "uvicorn.error": {"level": level.upper()},
            "uvicorn.access": {
                "handlers": ["access"],
                "level": level.upper(),
                "propagate": False,
            },
        },
    }

misc

T module-attribute

T = TypeVar('T')

P module-attribute

P = ParamSpec('P')

optional

optional(func, *args, **kwargs)
PARAMETER DESCRIPTION
func

TYPE: Callable[P, T]

*args

TYPE: args DEFAULT: ()

**kwargs

TYPE: kwargs DEFAULT: {}

Source code in src/materia/core/misc.py
 8
 9
10
11
12
13
14
15
def optional(func: Callable[P, T], *args: P.args, **kwargs: P.kwargs) -> Optional[T]:
    try:
        res = func(*args, **kwargs)
    except TypeError as e:
        raise e
    except Exception:
        return None
    return res

optional_next

optional_next(it)
PARAMETER DESCRIPTION
it

TYPE: Iterator[T]

Source code in src/materia/core/misc.py
18
19
def optional_next(it: Iterator[T]) -> Optional[T]:
    return optional(next, it)

optional_string

optional_string(value, format_string=None)
PARAMETER DESCRIPTION
value

TYPE: Any

format_string

TYPE: Optional[str] DEFAULT: None

Source code in src/materia/core/misc.py
22
23
24
25
26
27
28
def optional_string(value: Any, format_string: Optional[str] = None) -> str:
    if value is None:
        return ""
    res = optional(str, value)
    if res is None:
        return ""
    return format_string.format(res)