materia/tests/test_database.py

186 lines
5.3 KiB
Python

import pytest_asyncio
import pytest
import os
from materia.config import Config
from materia.models import Database, User, LoginType, Repository, Directory
from materia import security
import sqlalchemy as sa
from sqlalchemy.pool import NullPool
from dataclasses import dataclass
@pytest_asyncio.fixture(scope="session")
async def config() -> Config:
conf = Config()
conf.database.port = 54320
# conf.application.working_directory = conf.application.working_directory / "temp"
# if (cwd := conf.application.working_directory.resolve()).exists():
# os.chdir(cwd)
# if local_conf := Config.open(cwd / "config.toml"):
# conf = local_conf
return conf
@pytest_asyncio.fixture(scope="session")
async def db(config: Config, request) -> Database:
config_postgres = config
config_postgres.database.user = "postgres"
config_postgres.database.name = "postgres"
database_postgres = await Database.new(
config_postgres.database.url(), poolclass=NullPool
)
async with database_postgres.connection() as connection:
await connection.execution_options(isolation_level="AUTOCOMMIT")
await connection.execute(sa.text("create role pytest login"))
await connection.execute(sa.text("create database pytest owner pytest"))
await connection.commit()
await database_postgres.dispose()
config.database.user = "pytest"
config.database.name = "pytest"
database = await Database.new(config.database.url(), poolclass=NullPool)
yield database
await database.dispose()
# database_postgres = await Database.new(config_postgres.database.url())
async with database_postgres.connection() as connection:
await connection.execution_options(isolation_level="AUTOCOMMIT")
await connection.execute(sa.text("drop database pytest")),
await connection.execute(sa.text("drop role pytest"))
await connection.commit()
await database_postgres.dispose()
@pytest_asyncio.fixture(scope="session", autouse=True)
async def setup_db(db: Database, request):
await db.run_migrations()
yield
# await db.rollback_migrations()
@pytest_asyncio.fixture(autouse=True)
async def session(db: Database, request):
session = db.sessionmaker()
yield session
await session.rollback()
await session.close()
@pytest_asyncio.fixture(scope="session")
async def user(config: Config, session) -> User:
test_user = User(
name="pytest",
lower_name="pytest",
email="pytest@example.com",
hashed_password=security.hash_password(
"iampytest", algo=config.security.password_hash_algo
),
login_type=LoginType.Plain,
is_admin=True,
)
async with db.session() as session:
session.add(test_user)
await session.flush()
await session.refresh(test_user)
yield test_user
async with db.session() as session:
await session.delete(test_user)
await session.flush()
@pytest_asyncio.fixture
async def data(config: Config):
class TestData:
user = User(
name="pytest",
lower_name="pytest",
email="pytest@example.com",
hashed_password=security.hash_password(
"iampytest", algo=config.security.password_hash_algo
),
login_type=LoginType.Plain,
is_admin=True,
)
return TestData()
@pytest.mark.asyncio
async def test_user(data, session):
session.add(data.user)
await session.flush()
assert data.user.id is not None
assert security.validate_password("iampytest", data.user.hashed_password)
@pytest.mark.asyncio
async def test_repository(data, session, config):
session.add(data.user)
await session.flush()
repository = Repository(user_id=data.user.id, capacity=config.repository.capacity)
session.add(repository)
await session.flush()
assert repository.id is not None
@pytest.mark.asyncio
async def test_directory(data, session, config):
session.add(data.user)
await session.flush()
repository = Repository(user_id=data.user.id, capacity=config.repository.capacity)
session.add(repository)
await session.flush()
directory = Directory(
repository_id=repository.id, parent_id=None, name="test1", path=None
)
session.add(directory)
await session.flush()
assert directory.id is not None
assert (
await session.scalars(
sa.select(Directory).where(
sa.and_(
Directory.repository_id == repository.id,
Directory.name == "test1",
Directory.path.is_(None),
)
)
)
).first() == directory
nested_directory = Directory(
repository_id=repository.id,
parent_id=directory.id,
name="test_nested",
path="test1",
)
session.add(nested_directory)
await session.flush()
assert nested_directory.id is not None
assert (
await session.scalars(
sa.select(Directory).where(
sa.and_(
Directory.repository_id == repository.id,
Directory.name == "test_nested",
Directory.path == "test1",
)
)
)
).first() == nested_directory
assert nested_directory.parent_id == directory.id