materia/tests/test_database.py

399 lines
12 KiB
Python

import pytest_asyncio
import pytest
import os
import sys
from pathlib import Path
from materia.config import Config
from materia.models import (
Database,
User,
LoginType,
Repository,
Directory,
RepositoryError,
File,
)
from materia.models.base import Base
from materia.models.database import SessionContext
from materia import security
import sqlalchemy as sa
from sqlalchemy.pool import NullPool
from sqlalchemy.orm.session import make_transient
from sqlalchemy import inspect
import aiofiles
import aiofiles.os
@pytest_asyncio.fixture(scope="session")
async def config() -> Config:
conf = Config()
conf.database.port = 54320
# conf.application.working_directory = conf.application.working_directory / "temp"
# if (cwd := conf.application.working_directory.resolve()).exists():
# os.chdir(cwd)
# if local_conf := Config.open(cwd / "config.toml"):
# conf = local_conf
return conf
@pytest_asyncio.fixture(scope="session")
async def db(config: Config, request) -> Database:
config_postgres = config
config_postgres.database.user = "postgres"
config_postgres.database.name = "postgres"
database_postgres = await Database.new(
config_postgres.database.url(), poolclass=NullPool
)
async with database_postgres.connection() as connection:
await connection.execution_options(isolation_level="AUTOCOMMIT")
await connection.execute(sa.text("create role pytest login"))
await connection.execute(sa.text("create database pytest owner pytest"))
await connection.commit()
await database_postgres.dispose()
config.database.user = "pytest"
config.database.name = "pytest"
database = await Database.new(config.database.url(), poolclass=NullPool)
yield database
await database.dispose()
async with database_postgres.connection() as connection:
await connection.execution_options(isolation_level="AUTOCOMMIT")
await connection.execute(sa.text("drop database pytest")),
await connection.execute(sa.text("drop role pytest"))
await connection.commit()
await database_postgres.dispose()
"""
@pytest.mark.asyncio
async def test_migrations(db):
await db.run_migrations()
await db.rollback_migrations()
"""
@pytest_asyncio.fixture(scope="session", autouse=True)
async def setup_db(db: Database, request):
async with db.connection() as connection:
await connection.run_sync(Base.metadata.create_all)
await connection.commit()
yield
async with db.connection() as connection:
await connection.run_sync(Base.metadata.drop_all)
await connection.commit()
@pytest_asyncio.fixture(autouse=True)
async def session(db: Database, request):
session = db.sessionmaker()
yield session
await session.rollback()
await session.close()
"""
@pytest_asyncio.fixture(scope="session")
async def user(config: Config, session) -> User:
test_user = User(
name="pytest",
lower_name="pytest",
email="pytest@example.com",
hashed_password=security.hash_password(
"iampytest", algo=config.security.password_hash_algo
),
login_type=LoginType.Plain,
is_admin=True,
)
async with db.session() as session:
session.add(test_user)
await session.flush()
await session.refresh(test_user)
yield test_user
async with db.session() as session:
await session.delete(test_user)
await session.flush()
"""
@pytest_asyncio.fixture(scope="function")
async def data(config: Config):
class TestData:
user = User(
name="PyTest",
lower_name="pytest",
email="pytest@example.com",
hashed_password=security.hash_password(
"iampytest", algo=config.security.password_hash_algo
),
login_type=LoginType.Plain,
is_admin=True,
)
return TestData()
@pytest.mark.asyncio
async def test_user(data, session: SessionContext, config: Config):
# simple
session.add(data.user)
await session.flush()
assert data.user.id is not None
assert security.validate_password("iampytest", data.user.hashed_password)
await session.rollback()
# methods
await data.user.new(session, config)
assert data.user.id is not None
assert await data.user.count(session) == 1
assert await User.by_name("PyTest", session) == data.user
assert await User.by_email("pytest@example.com", session) == data.user
await data.user.edit_name("AsyncPyTest", session)
assert await User.by_name("asyncpytest", session, with_lower=True) == data.user
await data.user.remove(session)
@pytest.mark.asyncio
async def test_repository(data, tmpdir, session: SessionContext, config: Config):
config.application.working_directory = Path(tmpdir)
session.add(data.user)
await session.flush()
repository = await Repository(
user_id=data.user.id, capacity=config.repository.capacity
).new(session, config)
assert repository
assert repository.id is not None
assert (await repository.path(session, config)).exists()
assert await Repository.from_user(data.user, session) == repository
await session.refresh(repository, attribute_names=["user"])
cloned_repository = repository.clone()
assert cloned_repository.id is None and cloned_repository.user is None
session.add(cloned_repository)
await session.flush()
assert cloned_repository.id is not None
await repository.remove(session, config)
make_transient(repository)
session.add(repository)
await session.flush()
with pytest.raises(RepositoryError):
await repository.remove(session, config)
assert not (await repository.path(session, config)).exists()
@pytest.mark.asyncio
async def test_directory(data, tmpdir, session: SessionContext, config: Config):
config.application.working_directory = Path(tmpdir)
# setup
session.add(data.user)
await session.flush()
repository = await Repository(
user_id=data.user.id, capacity=config.repository.capacity
).new(session, config)
directory = await Directory(
repository_id=repository.id, parent_id=None, name="test1"
).new(session, config)
# simple
assert directory.id is not None
assert (
await session.scalars(
sa.select(Directory).where(
sa.and_(
Directory.repository_id == repository.id,
Directory.name == "test1",
)
)
)
).first() == directory
assert (await directory.path(session, config)).exists()
# nested simple
nested_directory = await Directory(
repository_id=repository.id,
parent_id=directory.id,
name="test_nested",
).new(session, config)
assert nested_directory.id is not None
assert (
await session.scalars(
sa.select(Directory).where(
sa.and_(
Directory.repository_id == repository.id,
Directory.name == "test_nested",
)
)
)
).first() == nested_directory
assert nested_directory.parent_id == directory.id
assert (await nested_directory.path(session, config)).exists()
# relationship
await session.refresh(directory, attribute_names=["directories", "files"])
assert isinstance(directory.files, list) and len(directory.files) == 0
assert isinstance(directory.directories, list) and len(directory.directories) == 1
await session.refresh(nested_directory, attribute_names=["directories", "files"])
assert (nested_directory.files, list) and len(nested_directory.files) == 0
assert (nested_directory.directories, list) and len(
nested_directory.directories
) == 0
#
assert (
await Directory.by_path(
repository, Path("test1", "test_nested"), session, config
)
== nested_directory
)
# remove nested
nested_path = await nested_directory.path(session, config)
assert nested_path.exists()
await nested_directory.remove(session, config)
assert inspect(nested_directory).was_deleted
assert await nested_directory.path(session, config) is None
assert not nested_path.exists()
await session.refresh(directory) # update attributes that was deleted
assert (await directory.path(session, config)).exists()
# rename
assert (await directory.rename("test1", session, config)).name == "test1"
directory2 = await Directory(
repository_id=repository.id, parent_id=None, name="test2"
).new(session, config)
assert (await directory.rename("test2", session, config)).name == "test2.1"
assert (await repository.path(session, config)).joinpath("test2.1").exists()
assert not (await repository.path(session, config)).joinpath("test1").exists()
directory_path = await directory.path(session, config)
assert directory_path.exists()
await directory.remove(session, config)
assert await directory.path(session, config) is None
assert not directory_path.exists()
@pytest.mark.asyncio
async def test_file(data, tmpdir, session: SessionContext, config: Config):
config.application.working_directory = Path(tmpdir)
# setup
session.add(data.user)
await session.flush()
repository = await Repository(
user_id=data.user.id, capacity=config.repository.capacity
).new(session, config)
directory = await Directory(
repository_id=repository.id, parent_id=None, name="test1"
).new(session, config)
directory2 = await Directory(
repository_id=repository.id, parent_id=None, name="test2"
).new(session, config)
data = b"Hello there, it's a test"
file = await File(
repository_id=repository.id,
parent_id=directory.id,
name="test_file.txt",
).new(data, session, config)
# simple
assert file.id is not None
assert (
await session.scalars(
sa.select(File).where(
sa.and_(
File.repository_id == repository.id,
File.parent_id == directory.id,
File.name == "test_file.txt",
)
)
)
).first() == file
# relationship
await session.refresh(file, attribute_names=["parent", "repository"])
assert file.parent == directory
assert file.repository == repository
#
assert (
await File.by_path(repository, Path("test1", "test_file.txt"), session, config)
== file
)
#
file_path = await file.path(session, config)
assert file_path.exists()
assert (await aiofiles.os.stat(file_path)).st_size == file.size
async with aiofiles.open(file_path, mode="rb") as io:
content = await io.read()
assert data == content
# rename
assert (
await file.rename("test_file_rename.txt", session, config)
).name == "test_file_rename.txt"
file2 = await File(
repository_id=repository.id, parent_id=directory.id, name="test_file_2.txt"
).new(b"", session, config)
assert (
await file.rename("test_file_2.txt", session, config)
).name == "test_file_2.1.txt"
assert (
(await repository.path(session, config))
.joinpath("test1", "test_file_2.1.txt")
.exists()
)
assert (
not (await repository.path(session, config))
.joinpath("test1", "test_file_rename.txt")
.exists()
)
# move
await file.move(directory2, session, config)
await session.refresh(file, attribute_names=["parent"])
assert file.parent == directory2
assert (
not (await repository.path(session, config))
.joinpath("test1", "test_file_2.1.txt")
.exists()
)
assert (
(await repository.path(session, config))
.joinpath("test2", "test_file_2.1.txt")
.exists()
)
# remove
await file.remove(session, config)
assert not await File.by_path(
repository, Path("test1", "test_file.txt"), session, config
)
assert not file_path.exists()