Compare commits

..

2 Commits

Author SHA1 Message Date
aefedfe187
complete base file and directory models 2024-08-05 18:29:03 +05:00
9986429bdf
impove base mixin, filesystem model 2024-08-05 15:15:20 +05:00
7 changed files with 309 additions and 92 deletions

View File

@ -16,6 +16,8 @@ from materia.models.database import (
from materia.models.user import User, UserCredentials, UserInfo from materia.models.user import User, UserCredentials, UserInfo
from materia.models.filesystem import FileSystem
from materia.models.repository import ( from materia.models.repository import (
Repository, Repository,
RepositoryInfo, RepositoryInfo,

View File

@ -1,3 +1,27 @@
from sqlalchemy.orm import declarative_base from typing import Optional, Self
from sqlalchemy.orm import DeclarativeBase
Base = declarative_base()
class Base(DeclarativeBase):
def to_dict(self) -> dict:
return {key: getattr(self, key) for key in self.__table__.columns.keys()}
def clone(self) -> Optional[Self]:
"""Clone model.
Included: columns and values, foreign keys
Ignored: primary keys, relationships
"""
# if not inspect(self).persistent:
# return
cloned = self.__class__(
**{
key: getattr(self, key)
for key in self.__table__.columns.keys()
# ignore primary keys
if key not in self.__table__.primary_key.columns.keys()
}
)
return cloned

View File

@ -47,22 +47,19 @@ class Directory(Base):
await session.flush() await session.flush()
await session.refresh(self, attribute_names=["repository"]) await session.refresh(self, attribute_names=["repository"])
relative_path = await self.relative_path(session) repository_path = await self.repository.path(session, config)
directory_path = await self.path(session, config) directory_path = await self.path(session, config)
try: new_directory = FileSystem(directory_path, repository_path)
directory_path.mkdir() await new_directory.make_directory()
except OSError as e:
raise DirectoryError(
f"Failed to create directory at /{relative_path}:",
*e.args,
)
return self return self
async def remove(self, session: SessionContext, config: Config): async def remove(self, session: SessionContext, config: Config):
session.add(self) session.add(self)
await session.refresh(self, attribute_names=["directories", "files"]) await session.refresh(
self, attribute_names=["repository", "directories", "files"]
)
if self.directories: if self.directories:
for directory in self.directories: for directory in self.directories:
@ -72,15 +69,11 @@ class Directory(Base):
for file in self.files: for file in self.files:
file.remove(session, config) file.remove(session, config)
relative_path = await self.relative_path(session) repository_path = await self.repository.path(session, config)
directory_path = await self.path(session, config) directory_path = await self.path(session, config)
try: current_directory = FileSystem(directory_path, repository_path)
shutil.rmtree(str(directory_path)) await current_directory.remove()
except OSError as e:
raise DirectoryError(
f"Failed to remove directory at /{relative_path}:", *e.args
)
await session.delete(self) await session.delete(self)
await session.flush() await session.flush()
@ -153,38 +146,59 @@ class Directory(Base):
async def copy( async def copy(
self, directory: Optional["Directory"], session: SessionContext, config: Config self, directory: Optional["Directory"], session: SessionContext, config: Config
) -> Self: ) -> Self:
pass session.add(self)
await session.refresh(self, attribute_names=["repository"])
repository_path = await self.repository.path(session, config)
directory_path = await self.path(session, config)
directory_path = (
await directory.path(session, config) if directory else repository_path
)
current_directory = FileSystem(directory_path, repository_path)
new_directory = await current_directory.copy(directory_path)
cloned = self.clone()
cloned.name = new_directory.name()
cloned.parent_id = directory.id if directory else None
session.add(cloned)
await session.flush()
return self
async def move( async def move(
self, directory: Optional["Directory"], session: SessionContext, config: Config self, directory: Optional["Directory"], session: SessionContext, config: Config
) -> Self: ) -> Self:
pass session.add(self)
await session.refresh(self, attribute_names=["repository"])
repository_path = await self.repository.path(session, config)
directory_path = await self.path(session, config)
directory_path = (
await directory.path(session, config) if directory else repository_path
)
current_directory = FileSystem(directory_path, repository_path)
moved_directory = await current_directory.move(directory_path)
self.name = moved_directory.name()
self.parent_id = directory.id if directory else None
self.updated = time()
await session.flush()
return self
async def rename(self, name: str, session: SessionContext, config: Config) -> Self: async def rename(self, name: str, session: SessionContext, config: Config) -> Self:
session.add(self) session.add(self)
await session.refresh(self, attribute_names=["repository"])
repository_path = await self.repository.path(session, config)
directory_path = await self.path(session, config) directory_path = await self.path(session, config)
relative_path = await self.relative_path(session)
new_path = directory_path.with_name(name)
identity = 1
while True: current_directory = FileSystem(directory_path, repository_path)
if new_path == directory_path: renamed_directory = await current_directory.rename(name, force=True)
break
if not new_path.exists():
break
new_path = directory_path.with_name(f"{name}.{str(identity)}") self.name = renamed_directory.name()
identity += 1
try:
await aiofiles.os.rename(directory_path, new_path)
except OSError as e:
raise DirectoryError(
f"Failed to rename directory at /{relative_path}", *e.args
)
self.name = new_path.name
await session.flush() await session.flush()
return self return self
@ -222,3 +236,4 @@ class DirectoryInfo(BaseModel):
from materia.models.repository import Repository from materia.models.repository import Repository
from materia.models.file import File from materia.models.file import File
from materia.models.filesystem import FileSystem

View File

@ -47,18 +47,12 @@ class File(Base):
await session.flush() await session.flush()
await session.refresh(self, attribute_names=["repository"]) await session.refresh(self, attribute_names=["repository"])
relative_path = await self.relative_path(session)
file_path = await self.path(session, config) file_path = await self.path(session, config)
size = None
try: new_file = FileSystem(file_path, await self.repository.path(session, config))
async with aiofiles.open(file_path, mode="wb") as file: await new_file.write_file(data)
await file.write(data)
size = (await aiofiles.os.stat(file_path)).st_size
except OSError as e:
raise FileError(f"Failed to write file at /{relative_path}", *e.args)
self.size = size self.size = await new_file.size()
await session.flush() await session.flush()
return self return self
@ -66,13 +60,10 @@ class File(Base):
async def remove(self, session: SessionContext, config: Config): async def remove(self, session: SessionContext, config: Config):
session.add(self) session.add(self)
relative_path = await self.relative_path(session)
file_path = await self.path(session, config) file_path = await self.path(session, config)
try: new_file = FileSystem(file_path, await self.repository.path(session, config))
await aiofiles.os.remove(file_path) await new_file.remove()
except OSError as e:
raise FileError(f"Failed to remove file at /{relative_path}:", *e.args)
await session.delete(self) await session.delete(self)
await session.flush() await session.flush()
@ -141,7 +132,25 @@ class File(Base):
async def copy( async def copy(
self, directory: Optional["Directory"], session: SessionContext, config: Config self, directory: Optional["Directory"], session: SessionContext, config: Config
) -> Self: ) -> Self:
pass session.add(self)
await session.refresh(self, attribute_names=["repository"])
repository_path = await self.repository.path(session, config)
file_path = await self.path(session, config)
directory_path = (
await directory.path(session, config) if directory else repository_path
)
current_file = FileSystem(file_path, repository_path)
new_file = await current_file.copy(directory_path)
cloned = self.clone()
cloned.name = new_file.name()
cloned.parent_id = directory.id if directory else None
session.add(cloned)
await session.flush()
return self
async def move( async def move(
self, directory: Optional["Directory"], session: SessionContext, config: Config self, directory: Optional["Directory"], session: SessionContext, config: Config
@ -154,49 +163,29 @@ class File(Base):
directory_path = ( directory_path = (
await directory.path(session, config) if directory else repository_path await directory.path(session, config) if directory else repository_path
) )
new_path = File.generate_name(file_path, directory_path, self.name)
try: current_file = FileSystem(file_path, repository_path)
await aioshutil.move(file_path, new_path) moved_file = await current_file.move(directory_path)
except OSError as e:
raise FileError("Failed to move file:", *e.args)
self.name = moved_file.name()
self.parent_id = directory.id if directory else None self.parent_id = directory.id if directory else None
self.updated = time()
await session.flush() await session.flush()
return self return self
@staticmethod
def generate_name(old_file: Path, target_directory: Path, name: str) -> Path:
new_path = target_directory.joinpath(name)
identity = 1
while True:
if new_path == old_file:
break
if not new_path.exists():
break
new_path = target_directory.joinpath(
f"{name.removesuffix(new_path.suffix)}.{str(identity)}{new_path.suffix}"
)
identity += 1
return new_path
async def rename(self, name: str, session: SessionContext, config: Config) -> Self: async def rename(self, name: str, session: SessionContext, config: Config) -> Self:
session.add(self) session.add(self)
await session.refresh(self, attribute_names=["repository"])
repository_path = await self.repository.path(session, config)
file_path = await self.path(session, config) file_path = await self.path(session, config)
relative_path = await self.relative_path(session)
new_path = File.generate_name(file_path, file_path.parent, name)
try: current_file = FileSystem(file_path, repository_path)
await aiofiles.os.rename(file_path, new_path) renamed_file = await current_file.rename(name, force=True)
except OSError as e:
raise FileError(f"Failed to rename file at /{relative_path}", *e.args)
self.name = new_path.name self.name = renamed_file.name()
self.updated = time()
await session.flush() await session.flush()
return self return self
@ -239,3 +228,4 @@ class FileInfo(BaseModel):
from materia.models.repository import Repository from materia.models.repository import Repository
from materia.models.directory import Directory from materia.models.directory import Directory
from materia.models.filesystem import FileSystem

View File

@ -0,0 +1,186 @@
from typing import Optional, Self, Iterator, TypeVar
from pathlib import Path
import aiofiles
from aiofiles import os as async_os
from aiofiles import ospath as async_path
import aioshutil
import re
class FileSystemError(Exception):
pass
T = TypeVar("T")
def wrapped_next(i: Iterator[T]) -> Optional[T]:
try:
return next(i)
except StopIteration:
return None
class FileSystem:
def __init__(self, path: Path, working_directory: Path):
if path == Path():
raise FileSystemError("The given path is empty")
if working_directory == Path():
raise FileSystemError("The given working directory is empty")
self.path = path
self.working_directory = working_directory
self.relative_path = path.relative_to(working_directory)
async def exists(self) -> bool:
return await async_path.exists(self.path)
async def size(self) -> int:
return await async_path.getsize(self.path)
async def is_file(self) -> bool:
return await async_path.isfile(self.path)
async def is_directory(self) -> bool:
return await async_path.isdir(self.path)
def name(self) -> str:
return self.path.name
async def remove(self):
try:
if await self.is_file():
await aiofiles.os.remove(self.path)
if await self.is_directory():
await aioshutil.rmtree(str(self.path))
except OSError as e:
raise FileSystemError(
f"Failed to remove file system content at /{self.relative_path}:",
*e.args,
)
async def generate_name(self, target_directory: Path, name: str) -> str:
"""Generate name based on target directory contents and self type."""
count = 1
new_path = target_directory.joinpath(name)
if new_path == self.path:
return name
while await async_path.exists(new_path):
if await self.is_file():
if with_counter := re.match(r"^(.+)\.(\d+)\.(\w+)$", new_path.name):
new_name, _, extension = with_counter.groups()
elif with_extension := re.match(r"^(.+)\.(\w+)$", new_path.name):
new_name, extension = with_extension.groups()
new_path = target_directory.joinpath(
"{}.{}.{}".format(new_name, count, extension)
)
if await self.is_directory():
if with_counter := re.match(r"^(.+)\.(\d+)$", new_path.name):
new_name, _ = with_counter.groups()
else:
new_name = new_path.name
new_path = target_directory.joinpath("{}.{}".format(new_name, count))
count += 1
return new_path.name
async def move(
self,
target_directory: Path,
new_name: Optional[str] = None,
force: bool = False,
):
new_name = new_name if new_name else self.path.name
try:
if (
await async_path.exists(target_directory.joinpath(new_name))
and not force
):
raise FileSystemError(
"Failed to move content to target destination: already exists"
)
new_path = target_directory.joinpath(
await self.generate_name(target_directory, new_name)
)
await aioshutil.move(self.path, new_path)
except Exception as e:
raise FileSystemError(
f"Failed to move content from /{self.relative_path}:",
*e.args,
)
return FileSystem(new_path, self.working_directory)
async def rename(self, new_name: str, force: bool = False) -> Path:
return await self.move(self.path.parent, new_name=new_name, force=force)
async def copy(
self,
target_directory: Path,
new_name: Optional[str] = None,
force: bool = False,
) -> Self:
new_name = new_name if new_name else self.path.name
try:
if (
await async_path.exists(target_directory.joinpath(new_name))
and not force
):
raise FileSystemError(
"Failed to copy content to target destination: already exists"
)
new_path = target_directory.joinpath(
await self.generate_name(target_directory, new_name)
)
if await self.is_file():
await aioshutil.copy(self.path, new_path)
if await self.is_directory():
await aioshutil.copytree(self.path, new_path)
except Exception as e:
raise FileSystemError(
f"Failed to copy content from /{self.relative_path}:",
*e.args,
)
return FileSystem(new_path, self.working_directory)
async def make_directory(self):
try:
if await self.exists():
raise FileSystemError("Failed to create directory: already exists")
await async_os.mkdir(self.path)
except Exception as e:
raise FileSystemError(
f"Failed to create directory at /{self.relative_path}:",
*e.args,
)
async def write_file(self, data: bytes):
try:
if await self.exists():
raise FileSystemError("Failed to write file: already exists")
async with aiofiles.open(self.path, mode="wb") as file:
await file.write(data)
except Exception as e:
raise FileSystemError(
f"Failed to write file to /{self.relative_path}:",
*e.args,
)

View File

@ -4,7 +4,7 @@ from uuid import UUID, uuid4
from pathlib import Path from pathlib import Path
import shutil import shutil
from sqlalchemy import BigInteger, ForeignKey from sqlalchemy import BigInteger, ForeignKey, inspect
from sqlalchemy.orm import mapped_column, Mapped, relationship from sqlalchemy.orm import mapped_column, Mapped, relationship
from sqlalchemy.orm.attributes import InstrumentedAttribute from sqlalchemy.orm.attributes import InstrumentedAttribute
import sqlalchemy as sa import sqlalchemy as sa
@ -86,13 +86,6 @@ class Repository(Base):
await session.delete(self) await session.delete(self)
await session.flush() await session.flush()
def to_dict(self) -> dict:
return {
k: getattr(self, k)
for k, v in Repository.__dict__.items()
if isinstance(v, InstrumentedAttribute)
}
async def update(self, session: SessionContext): async def update(self, session: SessionContext):
await session.execute( await session.execute(
sa.update(Repository).values(self.to_dict()).where(Repository.id == self.id) sa.update(Repository).values(self.to_dict()).where(Repository.id == self.id)

View File

@ -181,6 +181,13 @@ async def test_repository(data, tmpdir, session: SessionContext, config: Config)
assert (await repository.path(session, config)).exists() assert (await repository.path(session, config)).exists()
assert await Repository.from_user(data.user, session) == repository assert await Repository.from_user(data.user, session) == repository
await session.refresh(repository, attribute_names=["user"])
cloned_repository = repository.clone()
assert cloned_repository.id is None and cloned_repository.user is None
session.add(cloned_repository)
await session.flush()
assert cloned_repository.id is not None
await repository.remove(session, config) await repository.remove(session, config)
make_transient(repository) make_transient(repository)
session.add(repository) session.add(repository)