stabilize directory workflow
This commit is contained in:
parent
680b0172f0
commit
b3be3d25ee
@ -12,6 +12,7 @@ from materia.models.database import (
|
||||
DatabaseMigrationError,
|
||||
Cache,
|
||||
CacheError,
|
||||
SessionContext,
|
||||
)
|
||||
|
||||
from materia.models.user import User, UserCredentials, UserInfo
|
||||
@ -27,9 +28,18 @@ from materia.models.repository import (
|
||||
|
||||
from materia.models.directory import (
|
||||
Directory,
|
||||
DirectoryPath,
|
||||
DirectoryLink,
|
||||
DirectoryInfo,
|
||||
DirectoryPath,
|
||||
DirectoryRename,
|
||||
DirectoryCopyMove,
|
||||
)
|
||||
|
||||
from materia.models.file import File, FileLink, FileInfo
|
||||
from materia.models.file import (
|
||||
File,
|
||||
FileLink,
|
||||
FileInfo,
|
||||
FilePath,
|
||||
FileRename,
|
||||
FileCopyMove,
|
||||
)
|
||||
|
@ -107,7 +107,7 @@ class Database:
|
||||
raise e from None
|
||||
except Exception as e:
|
||||
await session.rollback()
|
||||
raise DatabaseError(*e.args) from e
|
||||
raise e # DatabaseError(*e.args) from e
|
||||
finally:
|
||||
await session.close()
|
||||
|
||||
|
@ -48,8 +48,8 @@ class Directory(Base):
|
||||
await session.flush()
|
||||
await session.refresh(self, attribute_names=["repository"])
|
||||
|
||||
repository_path = await self.repository.path(session, config)
|
||||
directory_path = await self.path(session, config)
|
||||
repository_path = await self.repository.real_path(session, config)
|
||||
directory_path = await self.real_path(session, config)
|
||||
|
||||
new_directory = FileSystem(directory_path, repository_path)
|
||||
await new_directory.make_directory()
|
||||
@ -70,8 +70,8 @@ class Directory(Base):
|
||||
for file in self.files:
|
||||
file.remove(session, config)
|
||||
|
||||
repository_path = await self.repository.path(session, config)
|
||||
directory_path = await self.path(session, config)
|
||||
repository_path = await self.repository.real_path(session, config)
|
||||
directory_path = await self.real_path(session, config)
|
||||
|
||||
current_directory = FileSystem(directory_path, repository_path)
|
||||
await current_directory.remove()
|
||||
@ -80,32 +80,45 @@ class Directory(Base):
|
||||
await session.flush()
|
||||
|
||||
async def relative_path(self, session: SessionContext) -> Optional[Path]:
|
||||
"""Get relative path of the current directory"""
|
||||
"""Get path of the directory relative repository root."""
|
||||
if inspect(self).was_deleted:
|
||||
return None
|
||||
|
||||
parts = []
|
||||
current_directory = self
|
||||
|
||||
async with session.begin_nested():
|
||||
while True:
|
||||
parts.append(current_directory.name)
|
||||
while True:
|
||||
# ISSUE: accessing `parent` attribute raises greenlet_spawn has not been called; can't call await_only() here
|
||||
# parts.append(current_directory.name)
|
||||
# session.add(current_directory)
|
||||
# await session.refresh(current_directory, attribute_names=["parent"])
|
||||
# if current_directory.parent is None:
|
||||
# break
|
||||
# current_directory = current_directory.parent
|
||||
|
||||
session.add(current_directory)
|
||||
await session.refresh(current_directory, attribute_names=["parent"])
|
||||
parts.append(current_directory.name)
|
||||
|
||||
if current_directory.parent is None:
|
||||
break
|
||||
if current_directory.parent_id is None:
|
||||
break
|
||||
|
||||
current_directory = current_directory.parent
|
||||
current_directory = (
|
||||
await session.scalars(
|
||||
sa.select(Directory).where(
|
||||
Directory.id == current_directory.parent_id,
|
||||
)
|
||||
)
|
||||
).first()
|
||||
|
||||
return Path().joinpath(*reversed(parts))
|
||||
|
||||
async def path(self, session: SessionContext, config: Config) -> Optional[Path]:
|
||||
async def real_path(
|
||||
self, session: SessionContext, config: Config
|
||||
) -> Optional[Path]:
|
||||
"""Get absolute path of the directory"""
|
||||
if inspect(self).was_deleted:
|
||||
return None
|
||||
|
||||
repository_path = await self.repository.path(session, config)
|
||||
repository_path = await self.repository.real_path(session, config)
|
||||
relative_path = await self.relative_path(session)
|
||||
|
||||
return repository_path.joinpath(relative_path)
|
||||
@ -123,6 +136,7 @@ class Directory(Base):
|
||||
current_directory: Optional[Directory] = None
|
||||
|
||||
for part in path.parts:
|
||||
# from root directory to target directory
|
||||
current_directory = (
|
||||
await session.scalars(
|
||||
sa.select(Directory).where(
|
||||
@ -145,77 +159,108 @@ class Directory(Base):
|
||||
return current_directory
|
||||
|
||||
async def copy(
|
||||
self, directory: Optional["Directory"], session: SessionContext, config: Config
|
||||
self,
|
||||
target: Optional["Directory"],
|
||||
session: SessionContext,
|
||||
config: Config,
|
||||
force: bool = False,
|
||||
shallow: bool = False,
|
||||
) -> Self:
|
||||
session.add(self)
|
||||
await session.refresh(self, attribute_names=["repository"])
|
||||
|
||||
repository_path = await self.repository.path(session, config)
|
||||
directory_path = await self.path(session, config)
|
||||
directory_path = (
|
||||
await directory.path(session, config) if directory else repository_path
|
||||
repository_path = await self.repository.real_path(session, config)
|
||||
directory_path = await self.real_path(session, config)
|
||||
target_path = (
|
||||
await target.real_path(session, config) if target else repository_path
|
||||
)
|
||||
|
||||
current_directory = FileSystem(directory_path, repository_path)
|
||||
new_directory = await current_directory.copy(directory_path)
|
||||
new_directory = await current_directory.copy(
|
||||
target_path, force=force, shallow=shallow
|
||||
)
|
||||
|
||||
cloned = self.clone()
|
||||
cloned.name = new_directory.name()
|
||||
cloned.parent_id = directory.id if directory else None
|
||||
cloned.parent_id = target.id if target else None
|
||||
session.add(cloned)
|
||||
await session.flush()
|
||||
|
||||
await session.refresh(self, attribute_names=["files", "directories"])
|
||||
for directory in self.directories:
|
||||
await directory.copy(cloned, session, config, shallow=True)
|
||||
for file in self.files:
|
||||
await file.copy(cloned, session, config, shallow=True)
|
||||
|
||||
return self
|
||||
|
||||
async def move(
|
||||
self, directory: Optional["Directory"], session: SessionContext, config: Config
|
||||
self,
|
||||
target: Optional["Directory"],
|
||||
session: SessionContext,
|
||||
config: Config,
|
||||
force: bool = False,
|
||||
shallow: bool = False,
|
||||
) -> Self:
|
||||
session.add(self)
|
||||
await session.refresh(self, attribute_names=["repository"])
|
||||
|
||||
repository_path = await self.repository.path(session, config)
|
||||
directory_path = await self.path(session, config)
|
||||
directory_path = (
|
||||
await directory.path(session, config) if directory else repository_path
|
||||
repository_path = await self.repository.real_path(session, config)
|
||||
directory_path = await self.real_path(session, config)
|
||||
target_path = (
|
||||
await target.real_path(session, config) if target else repository_path
|
||||
)
|
||||
|
||||
current_directory = FileSystem(directory_path, repository_path)
|
||||
moved_directory = await current_directory.move(directory_path)
|
||||
moved_directory = await current_directory.move(
|
||||
target_path, force=force, shallow=shallow
|
||||
)
|
||||
|
||||
self.name = moved_directory.name()
|
||||
self.parent_id = directory.id if directory else None
|
||||
self.parent_id = target.id if target else None
|
||||
self.updated = time()
|
||||
|
||||
await session.flush()
|
||||
|
||||
return self
|
||||
|
||||
async def rename(self, name: str, session: SessionContext, config: Config) -> Self:
|
||||
async def rename(
|
||||
self,
|
||||
name: str,
|
||||
session: SessionContext,
|
||||
config: Config,
|
||||
force: bool = False,
|
||||
shallow: bool = False,
|
||||
) -> Self:
|
||||
session.add(self)
|
||||
await session.refresh(self, attribute_names=["repository"])
|
||||
|
||||
repository_path = await self.repository.path(session, config)
|
||||
directory_path = await self.path(session, config)
|
||||
repository_path = await self.repository.real_path(session, config)
|
||||
directory_path = await self.real_path(session, config)
|
||||
|
||||
current_directory = FileSystem(directory_path, repository_path)
|
||||
renamed_directory = await current_directory.rename(name, force=True)
|
||||
renamed_directory = await current_directory.rename(
|
||||
name, force=force, shallow=shallow
|
||||
)
|
||||
|
||||
self.name = renamed_directory.name()
|
||||
await session.flush()
|
||||
return self
|
||||
|
||||
async def info(self, session: SessionContext) -> "DirectoryInfo":
|
||||
info = DirectoryInfo.model_validate(self)
|
||||
session.add(self)
|
||||
await session.refresh(self, attribute_names=["files"])
|
||||
|
||||
info = DirectoryInfo.model_validate(self)
|
||||
|
||||
relative_path = await self.relative_path(session)
|
||||
|
||||
info.path = Path("/").joinpath(relative_path) if relative_path else None
|
||||
info.used = sum([file.size for file in self.files])
|
||||
|
||||
return info
|
||||
|
||||
|
||||
class DirectoryPath(BaseModel):
|
||||
path: Path
|
||||
|
||||
|
||||
class DirectoryLink(Base):
|
||||
__tablename__ = "directory_link"
|
||||
|
||||
@ -240,9 +285,26 @@ class DirectoryInfo(BaseModel):
|
||||
name: str
|
||||
is_public: bool
|
||||
|
||||
path: Optional[Path] = None
|
||||
used: Optional[int] = None
|
||||
|
||||
|
||||
class DirectoryPath(BaseModel):
|
||||
path: Path
|
||||
|
||||
|
||||
class DirectoryRename(BaseModel):
|
||||
path: Path
|
||||
name: str
|
||||
force: Optional[bool] = False
|
||||
|
||||
|
||||
class DirectoryCopyMove(BaseModel):
|
||||
path: Path
|
||||
target: Path
|
||||
force: Optional[bool] = False
|
||||
|
||||
|
||||
from materia.models.repository import Repository
|
||||
from materia.models.file import File
|
||||
from materia.models.filesystem import FileSystem
|
||||
|
@ -47,9 +47,11 @@ class File(Base):
|
||||
await session.flush()
|
||||
await session.refresh(self, attribute_names=["repository"])
|
||||
|
||||
file_path = await self.path(session, config)
|
||||
file_path = await self.real_path(session, config)
|
||||
|
||||
new_file = FileSystem(file_path, await self.repository.path(session, config))
|
||||
new_file = FileSystem(
|
||||
file_path, await self.repository.real_path(session, config)
|
||||
)
|
||||
await new_file.write_file(data)
|
||||
|
||||
self.size = await new_file.size()
|
||||
@ -60,9 +62,11 @@ class File(Base):
|
||||
async def remove(self, session: SessionContext, config: Config):
|
||||
session.add(self)
|
||||
|
||||
file_path = await self.path(session, config)
|
||||
file_path = await self.real_path(session, config)
|
||||
|
||||
new_file = FileSystem(file_path, await self.repository.path(session, config))
|
||||
new_file = FileSystem(
|
||||
file_path, await self.repository.real_path(session, config)
|
||||
)
|
||||
await new_file.remove()
|
||||
|
||||
await session.delete(self)
|
||||
@ -83,7 +87,9 @@ class File(Base):
|
||||
|
||||
return file_path.joinpath(self.name)
|
||||
|
||||
async def path(self, session: SessionContext, config: Config) -> Optional[Path]:
|
||||
async def real_path(
|
||||
self, session: SessionContext, config: Config
|
||||
) -> Optional[Path]:
|
||||
if inspect(self).was_deleted:
|
||||
return None
|
||||
|
||||
@ -94,9 +100,9 @@ class File(Base):
|
||||
await session.refresh(self, attribute_names=["repository", "parent"])
|
||||
|
||||
if self.parent:
|
||||
file_path = await self.parent.path(session, config)
|
||||
file_path = await self.parent.real_path(session, config)
|
||||
else:
|
||||
file_path = await self.repository.path(session, config)
|
||||
file_path = await self.repository.real_path(session, config)
|
||||
|
||||
return file_path.joinpath(self.name)
|
||||
|
||||
@ -130,19 +136,24 @@ class File(Base):
|
||||
return current_file
|
||||
|
||||
async def copy(
|
||||
self, directory: Optional["Directory"], session: SessionContext, config: Config
|
||||
self,
|
||||
directory: Optional["Directory"],
|
||||
session: SessionContext,
|
||||
config: Config,
|
||||
force: bool = False,
|
||||
shallow: bool = False,
|
||||
) -> Self:
|
||||
session.add(self)
|
||||
await session.refresh(self, attribute_names=["repository"])
|
||||
|
||||
repository_path = await self.repository.path(session, config)
|
||||
file_path = await self.path(session, config)
|
||||
repository_path = await self.repository.real_path(session, config)
|
||||
file_path = await self.real_path(session, config)
|
||||
directory_path = (
|
||||
await directory.path(session, config) if directory else repository_path
|
||||
await directory.real_path(session, config) if directory else repository_path
|
||||
)
|
||||
|
||||
current_file = FileSystem(file_path, repository_path)
|
||||
new_file = await current_file.copy(directory_path)
|
||||
new_file = await current_file.copy(directory_path, force=force, shallow=shallow)
|
||||
|
||||
cloned = self.clone()
|
||||
cloned.name = new_file.name()
|
||||
@ -153,19 +164,26 @@ class File(Base):
|
||||
return self
|
||||
|
||||
async def move(
|
||||
self, directory: Optional["Directory"], session: SessionContext, config: Config
|
||||
self,
|
||||
directory: Optional["Directory"],
|
||||
session: SessionContext,
|
||||
config: Config,
|
||||
force: bool = False,
|
||||
shallow: bool = False,
|
||||
) -> Self:
|
||||
session.add(self)
|
||||
await session.refresh(self, attribute_names=["repository"])
|
||||
|
||||
repository_path = await self.repository.path(session, config)
|
||||
file_path = await self.path(session, config)
|
||||
repository_path = await self.repository.real_path(session, config)
|
||||
file_path = await self.real_path(session, config)
|
||||
directory_path = (
|
||||
await directory.path(session, config) if directory else repository_path
|
||||
await directory.real_path(session, config) if directory else repository_path
|
||||
)
|
||||
|
||||
current_file = FileSystem(file_path, repository_path)
|
||||
moved_file = await current_file.move(directory_path)
|
||||
moved_file = await current_file.move(
|
||||
directory_path, force=force, shallow=shallow
|
||||
)
|
||||
|
||||
self.name = moved_file.name()
|
||||
self.parent_id = directory.id if directory else None
|
||||
@ -174,15 +192,22 @@ class File(Base):
|
||||
|
||||
return self
|
||||
|
||||
async def rename(self, name: str, session: SessionContext, config: Config) -> Self:
|
||||
async def rename(
|
||||
self,
|
||||
name: str,
|
||||
session: SessionContext,
|
||||
config: Config,
|
||||
force: bool = False,
|
||||
shallow: bool = False,
|
||||
) -> Self:
|
||||
session.add(self)
|
||||
await session.refresh(self, attribute_names=["repository"])
|
||||
|
||||
repository_path = await self.repository.path(session, config)
|
||||
file_path = await self.path(session, config)
|
||||
repository_path = await self.repository.real_path(session, config)
|
||||
file_path = await self.real_path(session, config)
|
||||
|
||||
current_file = FileSystem(file_path, repository_path)
|
||||
renamed_file = await current_file.rename(name, force=True)
|
||||
renamed_file = await current_file.rename(name, force=force, shallow=shallow)
|
||||
|
||||
self.name = renamed_file.name()
|
||||
self.updated = time()
|
||||
@ -226,6 +251,22 @@ class FileInfo(BaseModel):
|
||||
size: int
|
||||
|
||||
|
||||
class FilePath(BaseModel):
|
||||
path: Path
|
||||
|
||||
|
||||
class FileRename(BaseModel):
|
||||
path: Path
|
||||
name: str
|
||||
force: Optional[bool] = False
|
||||
|
||||
|
||||
class FileCopyMove(BaseModel):
|
||||
path: Path
|
||||
target: Path
|
||||
force: Optional[bool] = False
|
||||
|
||||
|
||||
from materia.models.repository import Repository
|
||||
from materia.models.directory import Directory
|
||||
from materia.models.filesystem import FileSystem
|
||||
|
@ -59,7 +59,7 @@ class FileSystem:
|
||||
|
||||
except OSError as e:
|
||||
raise FileSystemError(
|
||||
f"Failed to remove file system content at /{self.relative_path}:",
|
||||
f"Failed to remove content at /{self.relative_path}:",
|
||||
*e.args,
|
||||
)
|
||||
|
||||
@ -68,9 +68,6 @@ class FileSystem:
|
||||
count = 1
|
||||
new_path = target_directory.joinpath(name)
|
||||
|
||||
if new_path == self.path:
|
||||
return name
|
||||
|
||||
while await async_path.exists(new_path):
|
||||
if await self.is_file():
|
||||
if with_counter := re.match(r"^(.+)\.(\d+)\.(\w+)$", new_path.name):
|
||||
@ -94,27 +91,42 @@ class FileSystem:
|
||||
|
||||
return new_path.name
|
||||
|
||||
async def _generate_new_path(
|
||||
self,
|
||||
target_directory: Path,
|
||||
new_name: Optional[str] = None,
|
||||
force: bool = False,
|
||||
shallow: bool = False,
|
||||
) -> Path:
|
||||
if self.path == self.working_directory:
|
||||
raise FileSystemError("Cannot modify working directory")
|
||||
|
||||
new_name = new_name or self.path.name
|
||||
|
||||
if await async_path.exists(target_directory.joinpath(new_name)) and not shallow:
|
||||
if force:
|
||||
new_name = await self.generate_name(target_directory, new_name)
|
||||
else:
|
||||
raise FileSystemError(
|
||||
f"Target destination already exists /{target_directory.joinpath(new_name)}"
|
||||
)
|
||||
|
||||
return target_directory.joinpath(new_name)
|
||||
|
||||
async def move(
|
||||
self,
|
||||
target_directory: Path,
|
||||
new_name: Optional[str] = None,
|
||||
force: bool = False,
|
||||
shallow: bool = False,
|
||||
):
|
||||
new_name = new_name if new_name else self.path.name
|
||||
new_path = await self._generate_new_path(
|
||||
target_directory, new_name, force=force, shallow=shallow
|
||||
)
|
||||
|
||||
try:
|
||||
if (
|
||||
await async_path.exists(target_directory.joinpath(new_name))
|
||||
and not force
|
||||
):
|
||||
raise FileSystemError(
|
||||
"Failed to move content to target destination: already exists"
|
||||
)
|
||||
|
||||
new_path = target_directory.joinpath(
|
||||
await self.generate_name(target_directory, new_name)
|
||||
)
|
||||
await aioshutil.move(self.path, new_path)
|
||||
if not shallow:
|
||||
await aioshutil.move(self.path, new_path)
|
||||
|
||||
except Exception as e:
|
||||
raise FileSystemError(
|
||||
@ -124,39 +136,35 @@ class FileSystem:
|
||||
|
||||
return FileSystem(new_path, self.working_directory)
|
||||
|
||||
async def rename(self, new_name: str, force: bool = False) -> Path:
|
||||
return await self.move(self.path.parent, new_name=new_name, force=force)
|
||||
async def rename(
|
||||
self, new_name: str, force: bool = False, shallow: bool = False
|
||||
) -> Path:
|
||||
return await self.move(
|
||||
self.path.parent, new_name=new_name, force=force, shallow=shallow
|
||||
)
|
||||
|
||||
async def copy(
|
||||
self,
|
||||
target_directory: Path,
|
||||
new_name: Optional[str] = None,
|
||||
force: bool = False,
|
||||
shallow: bool = False,
|
||||
) -> Self:
|
||||
new_name = new_name if new_name else self.path.name
|
||||
new_path = await self._generate_new_path(
|
||||
target_directory, new_name, force=force, shallow=shallow
|
||||
)
|
||||
|
||||
try:
|
||||
if (
|
||||
await async_path.exists(target_directory.joinpath(new_name))
|
||||
and not force
|
||||
):
|
||||
raise FileSystemError(
|
||||
"Failed to copy content to target destination: already exists"
|
||||
)
|
||||
if not shallow:
|
||||
if await self.is_file():
|
||||
await aioshutil.copy(self.path, new_path)
|
||||
|
||||
new_path = target_directory.joinpath(
|
||||
await self.generate_name(target_directory, new_name)
|
||||
)
|
||||
|
||||
if await self.is_file():
|
||||
await aioshutil.copy(self.path, new_path)
|
||||
|
||||
if await self.is_directory():
|
||||
await aioshutil.copytree(self.path, new_path)
|
||||
if await self.is_directory():
|
||||
await aioshutil.copytree(self.path, new_path)
|
||||
|
||||
except Exception as e:
|
||||
raise FileSystemError(
|
||||
f"Failed to copy content from /{self.relative_path}:",
|
||||
f"Failed to copy content from /{new_path}:",
|
||||
*e.args,
|
||||
)
|
||||
|
||||
@ -193,6 +201,7 @@ class FileSystem:
|
||||
|
||||
@staticmethod
|
||||
def normalize(path: Path) -> Path:
|
||||
"""Resolve path and make it relative."""
|
||||
if not path.is_absolute():
|
||||
path = Path("/").joinpath(path)
|
||||
|
||||
|
@ -35,7 +35,7 @@ class Repository(Base):
|
||||
session.add(self)
|
||||
await session.flush()
|
||||
|
||||
repository_path = await self.path(session, config)
|
||||
repository_path = await self.real_path(session, config)
|
||||
relative_path = repository_path.relative_to(
|
||||
config.application.working_directory
|
||||
)
|
||||
@ -52,12 +52,13 @@ class Repository(Base):
|
||||
|
||||
return self
|
||||
|
||||
async def path(self, session: SessionContext, config: Config) -> Path:
|
||||
async def real_path(self, session: SessionContext, config: Config) -> Path:
|
||||
"""Get absolute path of the directory."""
|
||||
session.add(self)
|
||||
await session.refresh(self, attribute_names=["user"])
|
||||
|
||||
repository_path = config.application.working_directory.joinpath(
|
||||
"repository", self.user.lower_name, "default"
|
||||
"repository", self.user.lower_name
|
||||
)
|
||||
|
||||
return repository_path
|
||||
@ -73,7 +74,7 @@ class Repository(Base):
|
||||
for file in self.files:
|
||||
await file.remove(session)
|
||||
|
||||
repository_path = await self.path(session, config)
|
||||
repository_path = await self.real_path(session, config)
|
||||
|
||||
try:
|
||||
shutil.rmtree(str(repository_path))
|
||||
|
@ -4,7 +4,17 @@ import shutil
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, status
|
||||
|
||||
from materia.models import User, Directory, DirectoryPath, DirectoryInfo, FileSystem
|
||||
from materia.models import (
|
||||
User,
|
||||
Directory,
|
||||
DirectoryInfo,
|
||||
DirectoryPath,
|
||||
DirectoryRename,
|
||||
DirectoryCopyMove,
|
||||
FileSystem,
|
||||
Repository,
|
||||
)
|
||||
from materia.models.database import SessionContext
|
||||
from materia.routers import middleware
|
||||
from materia.config import Config
|
||||
|
||||
@ -13,23 +23,65 @@ from pydantic import BaseModel
|
||||
router = APIRouter(tags=["directory"])
|
||||
|
||||
|
||||
async def validate_current_directory(
|
||||
path: Path, repository: Repository, session: SessionContext, config: Config
|
||||
) -> Directory:
|
||||
if not FileSystem.check_path(path):
|
||||
raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR, "Invalid path")
|
||||
|
||||
if not (
|
||||
directory := await Directory.by_path(
|
||||
repository,
|
||||
FileSystem.normalize(path),
|
||||
session,
|
||||
config,
|
||||
)
|
||||
):
|
||||
raise HTTPException(status.HTTP_404_NOT_FOUND, "Directory not found")
|
||||
|
||||
return directory
|
||||
|
||||
|
||||
async def validate_target_directory(
|
||||
path: Path, repository: Repository, session: SessionContext, config: Config
|
||||
) -> Directory:
|
||||
if not FileSystem.check_path(path):
|
||||
raise HTTPException(
|
||||
status.HTTP_500_INTERNAL_SERVER_ERROR, "Invalid target path"
|
||||
)
|
||||
|
||||
if FileSystem.normalize(path) == Path():
|
||||
# mean repository root
|
||||
target_directory = None
|
||||
else:
|
||||
if not (
|
||||
target_directory := await Directory.by_path(
|
||||
repository,
|
||||
FileSystem.normalize(path),
|
||||
session,
|
||||
config,
|
||||
)
|
||||
):
|
||||
raise HTTPException(status.HTTP_404_NOT_FOUND, "Target directory not found")
|
||||
|
||||
return target_directory
|
||||
|
||||
|
||||
@router.post("/directory")
|
||||
async def create(
|
||||
path: DirectoryPath,
|
||||
repository=Depends(middleware.repository),
|
||||
repository: Repository = Depends(middleware.repository),
|
||||
ctx: middleware.Context = Depends(),
|
||||
):
|
||||
if not FileSystem.check_path(path.path):
|
||||
raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR, "Invalid path")
|
||||
|
||||
path = FileSystem.normalize(path.path)
|
||||
|
||||
async with ctx.database.session() as session:
|
||||
current_directory = None
|
||||
current_path = Path()
|
||||
directory = None
|
||||
|
||||
for part in path.parts:
|
||||
for part in FileSystem.normalize(path.path).parts:
|
||||
if not (
|
||||
directory := await Directory.by_path(
|
||||
repository, current_path.joinpath(part), session, ctx.config
|
||||
@ -50,65 +102,80 @@ async def create(
|
||||
@router.get("/directory")
|
||||
async def info(
|
||||
path: Path,
|
||||
repository=Depends(middleware.repository),
|
||||
repository: Repository = Depends(middleware.repository),
|
||||
ctx: middleware.Context = Depends(),
|
||||
):
|
||||
if not FileSystem.check_path(path):
|
||||
raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR, "Invalid path")
|
||||
|
||||
path = FileSystem.normalize(path)
|
||||
ctx.logger.info(path)
|
||||
async with ctx.database.session() as session:
|
||||
if not (
|
||||
directory := await Directory.by_path(
|
||||
repository,
|
||||
path,
|
||||
session,
|
||||
ctx.config,
|
||||
)
|
||||
):
|
||||
raise HTTPException(status.HTTP_404_NOT_FOUND, "Directory not found")
|
||||
ctx.logger.info(directory)
|
||||
directory = await validate_current_directory(
|
||||
path, repository, session, ctx.config
|
||||
)
|
||||
|
||||
info = await directory.info(session)
|
||||
ctx.logger.info(info)
|
||||
|
||||
return info
|
||||
|
||||
|
||||
@router.delete("/directory")
|
||||
async def remove(
|
||||
path: Path,
|
||||
repository=Depends(middleware.repository),
|
||||
repository: Repository = Depends(middleware.repository),
|
||||
ctx: middleware.Context = Depends(),
|
||||
):
|
||||
if not FileSystem.check_path(path):
|
||||
raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR, "Invalid path")
|
||||
|
||||
path = FileSystem.normalize(path)
|
||||
|
||||
async with ctx.database.session() as session:
|
||||
if not (
|
||||
directory := await Directory.by_path(
|
||||
repository,
|
||||
path,
|
||||
session,
|
||||
ctx.config,
|
||||
)
|
||||
):
|
||||
raise HTTPException(status.HTTP_404_NOT_FOUND, "Directory not found")
|
||||
directory = await validate_current_directory(
|
||||
path, repository, session, ctx.config
|
||||
)
|
||||
|
||||
await directory.remove(session, ctx.config)
|
||||
await session.commit()
|
||||
|
||||
|
||||
@router.patch("/directory/rename")
|
||||
async def rename():
|
||||
pass
|
||||
async def rename(
|
||||
data: DirectoryRename,
|
||||
repository: Repository = Depends(middleware.repository),
|
||||
ctx: middleware.Context = Depends(),
|
||||
):
|
||||
async with ctx.database.session() as session:
|
||||
directory = await validate_current_directory(
|
||||
data.path, repository, session, ctx.config
|
||||
)
|
||||
|
||||
await directory.rename(data.name, session, ctx.config, force=data.force)
|
||||
await session.commit()
|
||||
|
||||
|
||||
@router.patch("/directory/move")
|
||||
async def move():
|
||||
pass
|
||||
async def move(
|
||||
data: DirectoryCopyMove,
|
||||
repository: Repository = Depends(middleware.repository),
|
||||
ctx: middleware.Context = Depends(),
|
||||
):
|
||||
async with ctx.database.session() as session:
|
||||
directory = await validate_current_directory(
|
||||
data.path, repository, session, ctx.config
|
||||
)
|
||||
target_directory = await validate_target_directory(
|
||||
data.target, repository, session, ctx.config
|
||||
)
|
||||
|
||||
await directory.move(target_directory, session, ctx.config, force=data.force)
|
||||
await session.commit()
|
||||
|
||||
|
||||
@router.post("/directory/copy")
|
||||
async def copy():
|
||||
pass
|
||||
async def copy(
|
||||
data: DirectoryCopyMove,
|
||||
repository: Repository = Depends(middleware.repository),
|
||||
ctx: middleware.Context = Depends(),
|
||||
):
|
||||
async with ctx.database.session() as session:
|
||||
directory = await validate_current_directory(
|
||||
data.path, repository, session, ctx.config
|
||||
)
|
||||
target_directory = await validate_target_directory(
|
||||
data.target, repository, session, ctx.config
|
||||
)
|
||||
|
||||
await directory.copy(target_directory, session, ctx.config, force=data.force)
|
||||
await session.commit()
|
||||
|
@ -3,140 +3,141 @@ from pathlib import Path
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, status, UploadFile
|
||||
|
||||
from materia.models import User, File, FileInfo, Directory
|
||||
from materia.models import (
|
||||
User,
|
||||
File,
|
||||
FileInfo,
|
||||
Directory,
|
||||
DirectoryPath,
|
||||
Repository,
|
||||
FileSystem,
|
||||
FileRename,
|
||||
FilePath,
|
||||
FileCopyMove,
|
||||
)
|
||||
from materia.models.database import SessionContext
|
||||
from materia.routers import middleware
|
||||
from materia.config import Config
|
||||
|
||||
from materia.routers.api.directory import validate_target_directory
|
||||
|
||||
router = APIRouter(tags=["file"])
|
||||
|
||||
|
||||
async def validate_current_file(
|
||||
path: Path, repository: Repository, session: SessionContext, config: Config
|
||||
) -> Directory:
|
||||
if not FileSystem.check_path(path):
|
||||
raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR, "Invalid path")
|
||||
|
||||
if not (
|
||||
file := await File.by_path(
|
||||
repository,
|
||||
FileSystem.normalize(path),
|
||||
session,
|
||||
config,
|
||||
)
|
||||
):
|
||||
raise HTTPException(status.HTTP_404_NOT_FOUND, "File not found")
|
||||
|
||||
return file
|
||||
|
||||
|
||||
@router.post("/file")
|
||||
async def create(
|
||||
upload_file: UploadFile,
|
||||
path: Path = Path(),
|
||||
user: User = Depends(middleware.user),
|
||||
file: UploadFile,
|
||||
path: DirectoryPath,
|
||||
repository: Repository = Depends(middleware.repository),
|
||||
ctx: middleware.Context = Depends(),
|
||||
):
|
||||
if not upload_file.filename:
|
||||
if not file.filename:
|
||||
raise HTTPException(
|
||||
status.HTTP_417_EXPECTATION_FAILED, "Cannot upload file without name"
|
||||
)
|
||||
|
||||
repository_path = Config.data_dir() / "repository" / user.lower_name
|
||||
blacklist = [os.sep, ".", "..", "*"]
|
||||
directory_path = Path(
|
||||
os.sep.join(filter(lambda part: part not in blacklist, path.parts))
|
||||
)
|
||||
if not FileSystem.check_path(path.path):
|
||||
raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR, "Invalid path")
|
||||
|
||||
async with ctx.database.session() as session:
|
||||
session.add(user)
|
||||
await session.refresh(user, attribute_names=["repository"])
|
||||
|
||||
if not user.repository:
|
||||
raise HTTPException(status.HTTP_404_NOT_FOUND, "Repository not found")
|
||||
|
||||
if not directory_path == Path():
|
||||
directory = await Directory.by_path(
|
||||
user.repository.id,
|
||||
None if directory_path.parent == Path() else directory_path.parent,
|
||||
directory_path.name,
|
||||
ctx.database,
|
||||
target_directory = await validate_target_directory(
|
||||
path.path, repository, session, ctx.config
|
||||
)
|
||||
|
||||
if not directory:
|
||||
raise HTTPException(status.HTTP_404_NOT_FOUND, "Directory not found")
|
||||
else:
|
||||
directory = None
|
||||
await File(
|
||||
repository_id=repository.id,
|
||||
parent_id=target_directory.id if target_directory else None,
|
||||
name=file.filename,
|
||||
size=file.size,
|
||||
).new(await file.read(), session, ctx.config)
|
||||
|
||||
file = File(
|
||||
repository_id=user.repository.id,
|
||||
parent_id=directory.id if directory else None,
|
||||
name=upload_file.filename,
|
||||
path=None if directory_path == Path() else str(directory_path),
|
||||
size=upload_file.size,
|
||||
)
|
||||
|
||||
try:
|
||||
file_path = repository_path.joinpath(directory_path, upload_file.filename)
|
||||
|
||||
if file_path.exists():
|
||||
raise HTTPException(
|
||||
status.HTTP_409_CONFLICT, "File with given name already exists"
|
||||
)
|
||||
|
||||
file_path.write_bytes(await upload_file.read())
|
||||
except OSError:
|
||||
raise HTTPException(
|
||||
status.HTTP_500_INTERNAL_SERVER_ERROR, "Failed to write a file"
|
||||
)
|
||||
|
||||
async with ctx.database.session() as session:
|
||||
session.add(file)
|
||||
await session.commit()
|
||||
|
||||
|
||||
@router.get("/file")
|
||||
async def info(
|
||||
path: Path,
|
||||
user: User = Depends(middleware.user),
|
||||
repository: Repository = Depends(middleware.repository),
|
||||
ctx: middleware.Context = Depends(),
|
||||
):
|
||||
async with ctx.database.session() as session:
|
||||
session.add(user)
|
||||
await session.refresh(user, attribute_names=["repository"])
|
||||
file = await validate_current_file(path, repository, session, ctx.config)
|
||||
|
||||
if not user.repository:
|
||||
raise HTTPException(status.HTTP_404_NOT_FOUND, "Repository not found")
|
||||
info = await file.info(session)
|
||||
|
||||
if not (
|
||||
file := await File.by_path(
|
||||
user.repository.id,
|
||||
None if path.parent == Path() else path.parent,
|
||||
path.name,
|
||||
ctx.database,
|
||||
)
|
||||
):
|
||||
raise HTTPException(status.HTTP_404_NOT_FOUND, "File not found")
|
||||
|
||||
info = FileInfo.model_validate(file)
|
||||
|
||||
return info
|
||||
return info
|
||||
|
||||
|
||||
@router.delete("/file")
|
||||
async def remove(
|
||||
path: Path,
|
||||
user: User = Depends(middleware.user),
|
||||
repository: Repository = Depends(middleware.repository),
|
||||
ctx: middleware.Context = Depends(),
|
||||
):
|
||||
async with ctx.database.session() as session:
|
||||
session.add(user)
|
||||
await session.refresh(user, attribute_names=["repository"])
|
||||
file = await validate_current_file(path, repository, session, ctx.config)
|
||||
|
||||
if not user.repository:
|
||||
raise HTTPException(status.HTTP_404_NOT_FOUND, "Repository not found")
|
||||
await file.remove(session, ctx.config)
|
||||
await session.commit()
|
||||
|
||||
if not (
|
||||
file := await File.by_path(
|
||||
user.repository.id,
|
||||
None if path.parent == Path() else path.parent,
|
||||
path.name,
|
||||
ctx.database,
|
||||
)
|
||||
):
|
||||
raise HTTPException(status.HTTP_404_NOT_FOUND, "File not found")
|
||||
|
||||
repository_path = Config.data_dir() / "repository" / user.lower_name
|
||||
@router.patch("/file/rename")
|
||||
async def rename(
|
||||
data: FileRename,
|
||||
repository: Repository = Depends(middleware.repository),
|
||||
ctx: middleware.Context = Depends(),
|
||||
):
|
||||
async with ctx.database.session() as session:
|
||||
file = await validate_current_file(data.path, repository, session, ctx.config)
|
||||
|
||||
try:
|
||||
file_path = repository_path.joinpath(path)
|
||||
await file.rename(data.name, session, ctx.config, force=data.force)
|
||||
await session.commit()
|
||||
|
||||
if file_path.exists():
|
||||
file_path.unlink(missing_ok=True)
|
||||
except OSError:
|
||||
raise HTTPException(
|
||||
status.HTTP_500_INTERNAL_SERVER_ERROR, "Failed to remove a file"
|
||||
|
||||
@router.patch("/file/move")
|
||||
async def move(
|
||||
data: FileCopyMove,
|
||||
repository: Repository = Depends(middleware.repository),
|
||||
ctx: middleware.Context = Depends(),
|
||||
):
|
||||
async with ctx.database.session() as session:
|
||||
file = await validate_current_file(data.path, repository, session, ctx.config)
|
||||
target_directory = await validate_target_directory(
|
||||
data.target, repository, session, ctx.config
|
||||
)
|
||||
|
||||
await file.remove(ctx.database)
|
||||
await file.move(target_directory, session, ctx.config, force=data.force)
|
||||
await session.commit()
|
||||
|
||||
|
||||
@router.post("/file/copy")
|
||||
async def copy(
|
||||
data: FileCopyMove,
|
||||
repository: Repository = Depends(middleware.repository),
|
||||
ctx: middleware.Context = Depends(),
|
||||
):
|
||||
async with ctx.database.session() as session:
|
||||
file = await validate_current_file(data.path, repository, session, ctx.config)
|
||||
target_directory = await validate_target_directory(
|
||||
data.target, repository, session, ctx.config
|
||||
)
|
||||
|
||||
await file.copy(target_directory, session, ctx.config, force=data.force)
|
||||
await session.commit()
|
||||
|
@ -21,6 +21,7 @@ from asgi_lifespan import LifespanManager
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from materia import routers
|
||||
from pathlib import Path
|
||||
from copy import deepcopy
|
||||
|
||||
|
||||
@pytest_asyncio.fixture(scope="session")
|
||||
|
@ -5,6 +5,8 @@ from materia.models.base import Base
|
||||
import aiofiles
|
||||
from io import BytesIO
|
||||
|
||||
# TODO: replace downloadable images for tests
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_auth(api_client: AsyncClient, api_config: Config):
|
||||
@ -83,7 +85,7 @@ async def test_repository(auth_client: AsyncClient, api_config: Config):
|
||||
assert create.status_code == 409, create.text
|
||||
|
||||
assert api_config.application.working_directory.joinpath(
|
||||
"repository", "PyTest".lower(), "default"
|
||||
"repository", "PyTest".lower()
|
||||
).exists()
|
||||
|
||||
info = await auth_client.get("/api/repository")
|
||||
@ -100,6 +102,22 @@ async def test_repository(auth_client: AsyncClient, api_config: Config):
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_directory(auth_client: AsyncClient, api_config: Config):
|
||||
first_dir_path = api_config.application.working_directory.joinpath(
|
||||
"repository", "PyTest".lower(), "first_dir"
|
||||
)
|
||||
second_dir_path = api_config.application.working_directory.joinpath(
|
||||
"repository", "PyTest".lower(), "second_dir"
|
||||
)
|
||||
second_dir_path_two = api_config.application.working_directory.joinpath(
|
||||
"repository", "PyTest".lower(), "second_dir.1"
|
||||
)
|
||||
third_dir_path_one = api_config.application.working_directory.joinpath(
|
||||
"repository", "PyTest".lower(), "third_dir"
|
||||
)
|
||||
third_dir_path_two = api_config.application.working_directory.joinpath(
|
||||
"repository", "PyTest".lower(), "second_dir", "third_dir"
|
||||
)
|
||||
|
||||
create = await auth_client.post("/api/repository")
|
||||
assert create.status_code == 200, create.text
|
||||
|
||||
@ -109,17 +127,67 @@ async def test_directory(auth_client: AsyncClient, api_config: Config):
|
||||
create = await auth_client.post("/api/directory", json={"path": "/first_dir"})
|
||||
assert create.status_code == 200, create.text
|
||||
|
||||
assert api_config.application.working_directory.joinpath(
|
||||
"repository", "PyTest".lower(), "default", "first_dir"
|
||||
).exists()
|
||||
assert first_dir_path.exists()
|
||||
|
||||
info = await auth_client.get("/api/directory", params=[("path", "/first_dir")])
|
||||
assert info.status_code == 200, info.text
|
||||
assert info.json()["used"] == 0
|
||||
assert info.json()["path"] == "/first_dir"
|
||||
|
||||
delete = await auth_client.delete("/api/directory", params=[("path", "/first_dir")])
|
||||
create = await auth_client.patch(
|
||||
"/api/directory/rename",
|
||||
json={"path": "/first_dir", "name": "first_dir_renamed"},
|
||||
)
|
||||
assert create.status_code == 200, create.text
|
||||
|
||||
delete = await auth_client.delete(
|
||||
"/api/directory", params=[("path", "/first_dir_renamed")]
|
||||
)
|
||||
assert delete.status_code == 200, delete.text
|
||||
|
||||
assert not api_config.application.working_directory.joinpath(
|
||||
"repository", "PyTest".lower(), "default", "first_dir"
|
||||
).exists()
|
||||
assert not first_dir_path.exists()
|
||||
|
||||
create = await auth_client.post("/api/directory", json={"path": "/second_dir"})
|
||||
assert create.status_code == 200, create.text
|
||||
|
||||
create = await auth_client.post("/api/directory", json={"path": "/third_dir"})
|
||||
assert create.status_code == 200, create.text
|
||||
|
||||
move = await auth_client.patch(
|
||||
"/api/directory/move", json={"path": "/third_dir", "target": "/second_dir"}
|
||||
)
|
||||
assert move.status_code == 200, move.text
|
||||
assert not third_dir_path_one.exists()
|
||||
assert third_dir_path_two.exists()
|
||||
|
||||
info = await auth_client.get(
|
||||
"/api/directory", params=[("path", "/second_dir/third_dir")]
|
||||
)
|
||||
assert info.status_code == 200, info.text
|
||||
assert info.json()["path"] == "/second_dir/third_dir"
|
||||
|
||||
copy = await auth_client.post(
|
||||
"/api/directory/copy",
|
||||
json={"path": "/second_dir", "target": "/", "force": True},
|
||||
)
|
||||
assert copy.status_code == 200, copy.text
|
||||
assert second_dir_path.exists()
|
||||
assert second_dir_path_two.exists()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_file(auth_client: AsyncClient, api_config: Config):
|
||||
create = await auth_client.post("/api/repository")
|
||||
assert create.status_code == 200, create.text
|
||||
|
||||
async with AsyncClient() as client:
|
||||
pytest_logo_res = await client.get(
|
||||
"https://docs.pytest.org/en/stable/_static/pytest1.png"
|
||||
)
|
||||
assert isinstance(pytest_logo_res.content, bytes)
|
||||
pytest_logo = BytesIO(pytest_logo_res.content)
|
||||
|
||||
create = await auth_client.post(
|
||||
"/api/file", files={"file": ("pytest.png", pytest_logo)}, json={"path", "/"}
|
||||
)
|
||||
assert create.status_code == 200, create.text
|
||||
|
@ -60,7 +60,7 @@ async def test_repository(data, tmpdir, session: SessionContext, config: Config)
|
||||
|
||||
assert repository
|
||||
assert repository.id is not None
|
||||
assert (await repository.path(session, config)).exists()
|
||||
assert (await repository.real_path(session, config)).exists()
|
||||
assert await Repository.from_user(data.user, session) == repository
|
||||
|
||||
await session.refresh(repository, attribute_names=["user"])
|
||||
@ -76,7 +76,7 @@ async def test_repository(data, tmpdir, session: SessionContext, config: Config)
|
||||
await session.flush()
|
||||
with pytest.raises(RepositoryError):
|
||||
await repository.remove(session, config)
|
||||
assert not (await repository.path(session, config)).exists()
|
||||
assert not (await repository.real_path(session, config)).exists()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@ -107,7 +107,7 @@ async def test_directory(data, tmpdir, session: SessionContext, config: Config):
|
||||
)
|
||||
)
|
||||
).first() == directory
|
||||
assert (await directory.path(session, config)).exists()
|
||||
assert (await directory.real_path(session, config)).exists()
|
||||
|
||||
# nested simple
|
||||
nested_directory = await Directory(
|
||||
@ -128,7 +128,7 @@ async def test_directory(data, tmpdir, session: SessionContext, config: Config):
|
||||
)
|
||||
).first() == nested_directory
|
||||
assert nested_directory.parent_id == directory.id
|
||||
assert (await nested_directory.path(session, config)).exists()
|
||||
assert (await nested_directory.real_path(session, config)).exists()
|
||||
|
||||
# relationship
|
||||
await session.refresh(directory, attribute_names=["directories", "files"])
|
||||
@ -150,30 +150,34 @@ async def test_directory(data, tmpdir, session: SessionContext, config: Config):
|
||||
)
|
||||
|
||||
# remove nested
|
||||
nested_path = await nested_directory.path(session, config)
|
||||
nested_path = await nested_directory.real_path(session, config)
|
||||
assert nested_path.exists()
|
||||
await nested_directory.remove(session, config)
|
||||
assert inspect(nested_directory).was_deleted
|
||||
assert await nested_directory.path(session, config) is None
|
||||
assert await nested_directory.real_path(session, config) is None
|
||||
assert not nested_path.exists()
|
||||
|
||||
await session.refresh(directory) # update attributes that was deleted
|
||||
assert (await directory.path(session, config)).exists()
|
||||
assert (await directory.real_path(session, config)).exists()
|
||||
|
||||
# rename
|
||||
assert (await directory.rename("test1", session, config)).name == "test1"
|
||||
assert (
|
||||
await directory.rename("test1", session, config, force=True)
|
||||
).name == "test1.1"
|
||||
await Directory(repository_id=repository.id, parent_id=None, name="test2").new(
|
||||
session, config
|
||||
)
|
||||
assert (await directory.rename("test2", session, config)).name == "test2.1"
|
||||
assert (await repository.path(session, config)).joinpath("test2.1").exists()
|
||||
assert not (await repository.path(session, config)).joinpath("test1").exists()
|
||||
assert (
|
||||
await directory.rename("test2", session, config, force=True)
|
||||
).name == "test2.1"
|
||||
assert (await repository.real_path(session, config)).joinpath("test2.1").exists()
|
||||
assert not (await repository.real_path(session, config)).joinpath("test1").exists()
|
||||
|
||||
directory_path = await directory.path(session, config)
|
||||
directory_path = await directory.real_path(session, config)
|
||||
assert directory_path.exists()
|
||||
|
||||
await directory.remove(session, config)
|
||||
assert await directory.path(session, config) is None
|
||||
assert await directory.real_path(session, config) is None
|
||||
assert not directory_path.exists()
|
||||
|
||||
|
||||
@ -229,7 +233,7 @@ async def test_file(data, tmpdir, session: SessionContext, config: Config):
|
||||
)
|
||||
|
||||
#
|
||||
file_path = await file.path(session, config)
|
||||
file_path = await file.real_path(session, config)
|
||||
assert file_path.exists()
|
||||
assert (await aiofiles.os.stat(file_path)).st_size == file.size
|
||||
async with aiofiles.open(file_path, mode="rb") as io:
|
||||
@ -238,21 +242,21 @@ async def test_file(data, tmpdir, session: SessionContext, config: Config):
|
||||
|
||||
# rename
|
||||
assert (
|
||||
await file.rename("test_file_rename.txt", session, config)
|
||||
await file.rename("test_file_rename.txt", session, config, force=True)
|
||||
).name == "test_file_rename.txt"
|
||||
await File(
|
||||
repository_id=repository.id, parent_id=directory.id, name="test_file_2.txt"
|
||||
).new(b"", session, config)
|
||||
assert (
|
||||
await file.rename("test_file_2.txt", session, config)
|
||||
await file.rename("test_file_2.txt", session, config, force=True)
|
||||
).name == "test_file_2.1.txt"
|
||||
assert (
|
||||
(await repository.path(session, config))
|
||||
(await repository.real_path(session, config))
|
||||
.joinpath("test1", "test_file_2.1.txt")
|
||||
.exists()
|
||||
)
|
||||
assert (
|
||||
not (await repository.path(session, config))
|
||||
not (await repository.real_path(session, config))
|
||||
.joinpath("test1", "test_file_rename.txt")
|
||||
.exists()
|
||||
)
|
||||
@ -262,12 +266,12 @@ async def test_file(data, tmpdir, session: SessionContext, config: Config):
|
||||
await session.refresh(file, attribute_names=["parent"])
|
||||
assert file.parent == directory2
|
||||
assert (
|
||||
not (await repository.path(session, config))
|
||||
not (await repository.real_path(session, config))
|
||||
.joinpath("test1", "test_file_2.1.txt")
|
||||
.exists()
|
||||
)
|
||||
assert (
|
||||
(await repository.path(session, config))
|
||||
(await repository.real_path(session, config))
|
||||
.joinpath("test2", "test_file_2.1.txt")
|
||||
.exists()
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user