impove base mixin, filesystem model

This commit is contained in:
L-Nafaryus 2024-08-05 15:15:20 +05:00
parent 383d7c57ab
commit 9986429bdf
Signed by: L-Nafaryus
GPG Key ID: 553C97999B363D38
5 changed files with 240 additions and 11 deletions

View File

@ -1,3 +1,27 @@
from sqlalchemy.orm import declarative_base
from typing import Optional, Self
from sqlalchemy.orm import DeclarativeBase
Base = declarative_base()
class Base(DeclarativeBase):
def to_dict(self) -> dict:
return {key: getattr(self, key) for key in self.__table__.columns.keys()}
def clone(self) -> Optional[Self]:
"""Clone model.
Included: columns and values, foreign keys
Ignored: primary keys, relationships
"""
# if not inspect(self).persistent:
# return
cloned = self.__class__(
**{
key: getattr(self, key)
for key in self.__table__.columns.keys()
# ignore primary keys
if key not in self.__table__.primary_key.columns.keys()
}
)
return cloned

View File

@ -141,7 +141,28 @@ class File(Base):
async def copy(
self, directory: Optional["Directory"], session: SessionContext, config: Config
) -> Self:
pass
session.add(self)
await session.refresh(self, attribute_names=["repository"])
repository_path = await self.repository.path(session, config)
file_path = await self.path(session, config)
directory_path = (
await directory.path(session, config) if directory else repository_path
)
new_path = File.generate_name(file_path, directory_path, self.name)
try:
await aioshutil.copy(file_path, new_path)
except OSError as e:
raise FileError("Failed to move file:", *e.args)
cloned = self.clone()
cloned.name = new_path.name
cloned.parent_id = directory.id if directory else None
session.add(cloned)
await session.flush()
return self
async def move(
self, directory: Optional["Directory"], session: SessionContext, config: Config
@ -161,7 +182,9 @@ class File(Base):
except OSError as e:
raise FileError("Failed to move file:", *e.args)
self.name = new_path.name
self.parent_id = directory.id if directory else None
self.updated = time()
await session.flush()
return self
@ -197,6 +220,7 @@ class File(Base):
raise FileError(f"Failed to rename file at /{relative_path}", *e.args)
self.name = new_path.name
self.updated = time()
await session.flush()
return self

View File

@ -0,0 +1,181 @@
from typing import Optional, Self, Iterator, TypeVar
from pathlib import Path
import aiofiles
from aiofiles import os as async_os
from aiofiles import ospath as async_path
import aioshutil
import re
class FileSystemError(Exception):
pass
T = TypeVar("T")
def wrapped_next(i: Iterator[T]) -> Optional[T]:
try:
return next(i)
except StopIteration:
return None
class FileSystem:
def __init__(self, path: Path, working_directory: Path):
if path == Path():
raise FileSystemError("The given path is empty")
if working_directory == Path():
raise FileSystemError("The given working directory is empty")
self.path = path
self.working_directory = working_directory
self.relative_path = path.relative_to(working_directory)
async def exists(self) -> bool:
return await async_path.exists(self.path)
async def size(self) -> int:
return await async_path.getsize(self.path)
async def is_file(self) -> bool:
return await async_path.isfile(self.path)
async def is_directory(self) -> bool:
return await async_path.isdir(self.path)
async def remove(self):
try:
if await self.is_file():
await aiofiles.os.remove(self.path)
if await self.is_directory():
await aioshutil.rmtree(str(self.path))
except OSError as e:
raise FileSystemError(
f"Failed to remove file system content at /{self.relative_path}:",
*e.args,
)
async def generate_name(self, target_directory: Path, name: str) -> str:
"""Generate name based on target directory contents and self type."""
count = 1
new_path = target_directory.joinpath(name)
if new_path == self.path:
return name
while await async_path.exists(new_path):
if self.is_file():
if with_counter := re.match(r"^(.+)(\.\d+)(\.\w+)$", new_path.name):
new_name, _, extension = with_counter.groups()
elif with_extension := re.match(r"^(.+)(\.\w+)$", new_path.name):
new_name, extension = with_extension.groups()
new_path = target_directory.joinpath(
"{}.{}.{}".format(new_name, count, extension)
)
if self.is_directory():
if with_counter := re.match(r"^(.+)(\.\d+)$", new_path.name):
new_name, _ = with_counter.groups()
else:
new_name = new_path.name
new_path = target_directory.joinpath("{}.{}".format(new_name, count))
count += 1
return new_path.name
async def move(
self,
target_directory: Path,
new_name: Optional[str] = None,
force: bool = False,
):
new_name = new_name if new_name else self.path.name
try:
if (
await async_path.exists(target_directory.joinpath(new_name))
and not force
):
raise FileSystemError(
"Failed to move content to target destination: already exists"
)
await aioshutil.move(
self.path,
target_directory.joinpath(
await self.generate_name(target_directory, new_name)
),
)
except Exception as e:
raise FileSystemError(
f"Failed to move content from /{self.relative_path}:",
*e.args,
)
async def rename(self, new_name: str, force: bool = False):
await self.move(self.path.parent, new_name=new_name, force=force)
async def copy(
self,
target_directory: Path,
new_name: Optional[str] = None,
force: bool = False,
):
new_name = new_name if new_name else self.path.name
try:
if (
await async_path.exists(target_directory.joinpath(new_name))
and not force
):
raise FileSystemError(
"Failed to copy content to target destination: already exists"
)
new_path = target_directory.joinpath(
await self.generate_name(target_directory, new_name)
)
if await self.is_file():
await aioshutil.copy(self.path, new_path)
if await self.is_directory():
await aioshutil.copytree(self.path, new_path)
except Exception as e:
raise FileSystemError(
f"Failed to copy content from /{self.relative_path}:",
*e.args,
)
async def make_directory(self):
try:
if await self.exists():
raise FileSystemError("Failed to create directory: already exists")
await async_os.mkdir(self.path)
except Exception as e:
raise FileSystemError(
f"Failed to create directory at /{self.relative_path}:",
*e.args,
)
async def write_file(self, data: bytes):
try:
if await self.exists():
raise FileSystemError("Failed to write file: already exists")
async with aiofiles.open(self.path, mode="wb") as file:
await file.write(data)
except Exception as e:
raise FileSystemError(
f"Failed to write file to /{self.relative_path}:",
*e.args,
)

View File

@ -4,7 +4,7 @@ from uuid import UUID, uuid4
from pathlib import Path
import shutil
from sqlalchemy import BigInteger, ForeignKey
from sqlalchemy import BigInteger, ForeignKey, inspect
from sqlalchemy.orm import mapped_column, Mapped, relationship
from sqlalchemy.orm.attributes import InstrumentedAttribute
import sqlalchemy as sa
@ -86,13 +86,6 @@ class Repository(Base):
await session.delete(self)
await session.flush()
def to_dict(self) -> dict:
return {
k: getattr(self, k)
for k, v in Repository.__dict__.items()
if isinstance(v, InstrumentedAttribute)
}
async def update(self, session: SessionContext):
await session.execute(
sa.update(Repository).values(self.to_dict()).where(Repository.id == self.id)

View File

@ -181,6 +181,13 @@ async def test_repository(data, tmpdir, session: SessionContext, config: Config)
assert (await repository.path(session, config)).exists()
assert await Repository.from_user(data.user, session) == repository
await session.refresh(repository, attribute_names=["user"])
cloned_repository = repository.clone()
assert cloned_repository.id is None and cloned_repository.user is None
session.add(cloned_repository)
await session.flush()
assert cloned_repository.id is not None
await repository.remove(session, config)
make_transient(repository)
session.add(repository)