repository and directory tests

This commit is contained in:
L-Nafaryus 2024-08-15 01:02:12 +05:00
parent 58e7175d45
commit 680b0172f0
Signed by: L-Nafaryus
GPG Key ID: 553C97999B363D38
7 changed files with 147 additions and 104 deletions

View File

@ -25,6 +25,11 @@ from materia.models.repository import (
RepositoryError,
)
from materia.models.directory import Directory, DirectoryLink, DirectoryInfo
from materia.models.directory import (
Directory,
DirectoryPath,
DirectoryLink,
DirectoryInfo,
)
from materia.models.file import File, FileLink, FileInfo

View File

@ -3,7 +3,7 @@ import os
from typing import AsyncIterator, Self, TypeAlias
from pathlib import Path
from pydantic import BaseModel, PostgresDsn
from pydantic import BaseModel, PostgresDsn, ValidationError
from sqlalchemy.ext.asyncio import (
AsyncConnection,
AsyncEngine,
@ -102,7 +102,7 @@ class Database:
try:
yield session
except HTTPException as e:
except (HTTPException, ValidationError) as e:
await session.rollback()
raise e from None
except Exception as e:

View File

@ -3,11 +3,12 @@ from typing import List, Optional, Self
from pathlib import Path
import shutil
import aiofiles
import re
from sqlalchemy import BigInteger, ForeignKey, inspect
from sqlalchemy.orm import mapped_column, Mapped, relationship
import sqlalchemy as sa
from pydantic import BaseModel, ConfigDict
from pydantic import BaseModel, ConfigDict, ValidationError
from materia.models.base import Base
from materia.models import database
@ -202,8 +203,17 @@ class Directory(Base):
await session.flush()
return self
async def info(self) -> "DirectoryInfo":
return DirectoryInfo.model_validate(self)
async def info(self, session: SessionContext) -> "DirectoryInfo":
info = DirectoryInfo.model_validate(self)
session.add(self)
await session.refresh(self, attribute_names=["files"])
info.used = sum([file.size for file in self.files])
return info
class DirectoryPath(BaseModel):
path: Path
class DirectoryLink(Base):
@ -228,7 +238,6 @@ class DirectoryInfo(BaseModel):
created: int
updated: int
name: str
path: Optional[str]
is_public: bool
used: Optional[int] = None

View File

@ -6,6 +6,8 @@ from aiofiles import ospath as async_path
import aioshutil
import re
valid_path = re.compile(r"^/(.*/)*([^/]*)$")
class FileSystemError(Exception):
pass
@ -184,3 +186,14 @@ class FileSystem:
f"Failed to write file to /{self.relative_path}:",
*e.args,
)
@staticmethod
def check_path(path: Path) -> bool:
return bool(valid_path.match(str(path)))
@staticmethod
def normalize(path: Path) -> Path:
if not path.is_absolute():
path = Path("/").joinpath(path)
return Path(*path.resolve().parts[1:])

View File

@ -4,130 +4,111 @@ import shutil
from fastapi import APIRouter, Depends, HTTPException, status
from materia.models import User, Directory, DirectoryInfo
from materia.models import User, Directory, DirectoryPath, DirectoryInfo, FileSystem
from materia.routers import middleware
from materia.config import Config
from pydantic import BaseModel
router = APIRouter(tags=["directory"])
@router.post("/directory")
async def create(
path: Path = Path(),
user: User = Depends(middleware.user),
path: DirectoryPath,
repository=Depends(middleware.repository),
ctx: middleware.Context = Depends(),
):
repository_path = Config.data_dir() / "repository" / user.lower_name
blacklist = [os.sep, ".", "..", "*"]
directory_path = Path(
os.sep.join(filter(lambda part: part not in blacklist, path.parts))
)
if not FileSystem.check_path(path.path):
raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR, "Invalid path")
path = FileSystem.normalize(path.path)
async with ctx.database.session() as session:
session.add(user)
await session.refresh(user, attribute_names=["repository"])
if not user.repository:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Repository not found")
current_directory = None
current_path = Path()
directory = None
for part in directory_path.parts:
if not await Directory.by_path(
user.repository.id, current_path, part, ctx.database
for part in path.parts:
if not (
directory := await Directory.by_path(
repository, current_path.joinpath(part), session, ctx.config
)
):
directory = Directory(
repository_id=user.repository.id,
directory = await Directory(
repository_id=repository.id,
parent_id=current_directory.id if current_directory else None,
name=part,
)
try:
(repository_path / current_path / part).mkdir(exist_ok=True)
except OSError:
raise HTTPException(
status.HTTP_500_INTERNAL_SERVER_ERROR,
f"Failed to create a directory {current_path / part}",
)
async with ctx.database.session() as session:
session.add(directory)
await session.commit()
await session.refresh(directory)
).new(session, ctx.config)
current_directory = directory
current_path /= part
await session.commit()
@router.get("/directory")
async def info(
path: Path,
user: User = Depends(middleware.user),
repository=Depends(middleware.repository),
ctx: middleware.Context = Depends(),
):
if not FileSystem.check_path(path):
raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR, "Invalid path")
path = FileSystem.normalize(path)
ctx.logger.info(path)
async with ctx.database.session() as session:
session.add(user)
await session.refresh(user, attribute_names=["repository"])
if not user.repository:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Repository not found")
if not (
directory := await Directory.by_path(
user.repository.id,
None if path.parent == Path() else path.parent,
path.name,
ctx.database,
repository,
path,
session,
ctx.config,
)
):
raise HTTPException(status.HTTP_404_NOT_FOUND, "Directory not found")
async with ctx.database.session() as session:
session.add(directory)
await session.refresh(directory, attribute_names=["files"])
info = DirectoryInfo.model_validate(directory)
info.used = sum([file.size for file in directory.files])
ctx.logger.info(directory)
info = await directory.info(session)
ctx.logger.info(info)
return info
@router.delete("/directory")
async def remove(
path: Path,
user: User = Depends(middleware.user),
repository=Depends(middleware.repository),
ctx: middleware.Context = Depends(),
):
repository_path = Config.data_dir() / "repository" / user.lower_name
if not FileSystem.check_path(path):
raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR, "Invalid path")
path = FileSystem.normalize(path)
async with ctx.database.session() as session:
session.add(user)
await session.refresh(user, attribute_names=["repository"])
if not user.repository:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Repository not found")
if not (
directory := await Directory.by_path(
user.repository.id,
None if path.parent == Path() else path.parent,
path.name,
ctx.database,
repository,
path,
session,
ctx.config,
)
):
raise HTTPException(status.HTTP_404_NOT_FOUND, "Directory not found")
directory_path = repository_path / path
await directory.remove(session, ctx.config)
try:
if directory_path.is_dir():
shutil.rmtree(str(directory_path))
except OSError:
raise HTTPException(
status.HTTP_500_INTERNAL_SERVER_ERROR, "Failed to remove directory"
)
await directory.remove(ctx.database)
@router.patch("/directory/rename")
async def rename():
pass
@router.patch("/directory/move")
async def move():
pass
@router.post("/directory/copy")
async def copy():
pass

View File

@ -48,18 +48,14 @@ async def info(
@router.delete("/repository")
async def remove(
repository=Depends(middleware.repository),
repository_path=Depends(middleware.repository_path),
ctx: middleware.Context = Depends(),
):
try:
if repository_path.exists():
shutil.rmtree(str(repository_path))
except OSError:
raise HTTPException(
status.HTTP_500_INTERNAL_SERVER_ERROR, "Failed to remove repository"
)
await repository.remove(ctx.database)
async with ctx.database.session() as session:
await repository.remove(session, ctx.config)
await session.commit()
except Exception as e:
raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR, f"{e}")
@router.get("/repository/content", response_model=RepositoryContent)

View File

@ -82,5 +82,44 @@ async def test_repository(auth_client: AsyncClient, api_config: Config):
create = await auth_client.post("/api/repository")
assert create.status_code == 409, create.text
assert api_config.application.working_directory.joinpath(
"repository", "PyTest".lower(), "default"
).exists()
info = await auth_client.get("/api/repository")
assert info.status_code == 200, info.text
delete = await auth_client.delete("/api/repository")
assert delete.status_code == 200, delete.text
info = await auth_client.get("/api/repository")
assert info.status_code == 404, info.text
# TODO: content
@pytest.mark.asyncio
async def test_directory(auth_client: AsyncClient, api_config: Config):
create = await auth_client.post("/api/repository")
assert create.status_code == 200, create.text
create = await auth_client.post("/api/directory", json={"path": "first_dir"})
assert create.status_code == 500, create.text
create = await auth_client.post("/api/directory", json={"path": "/first_dir"})
assert create.status_code == 200, create.text
assert api_config.application.working_directory.joinpath(
"repository", "PyTest".lower(), "default", "first_dir"
).exists()
info = await auth_client.get("/api/directory", params=[("path", "/first_dir")])
assert info.status_code == 200, info.text
assert info.json()["used"] == 0
delete = await auth_client.delete("/api/directory", params=[("path", "/first_dir")])
assert delete.status_code == 200, delete.text
assert not api_config.application.working_directory.joinpath(
"repository", "PyTest".lower(), "default", "first_dir"
).exists()