import pytest_asyncio import pytest import os from materia.config import Config from materia.models import Database, User, LoginType, Repository, Directory from materia import security import sqlalchemy as sa from sqlalchemy.pool import NullPool from dataclasses import dataclass @pytest_asyncio.fixture(scope="session") async def config() -> Config: conf = Config() conf.database.port = 54320 # conf.application.working_directory = conf.application.working_directory / "temp" # if (cwd := conf.application.working_directory.resolve()).exists(): # os.chdir(cwd) # if local_conf := Config.open(cwd / "config.toml"): # conf = local_conf return conf @pytest_asyncio.fixture(scope="session") async def db(config: Config, request) -> Database: config_postgres = config config_postgres.database.user = "postgres" config_postgres.database.name = "postgres" database_postgres = await Database.new( config_postgres.database.url(), poolclass=NullPool ) async with database_postgres.connection() as connection: await connection.execution_options(isolation_level="AUTOCOMMIT") await connection.execute(sa.text("create role pytest login")) await connection.execute(sa.text("create database pytest owner pytest")) await connection.commit() await database_postgres.dispose() config.database.user = "pytest" config.database.name = "pytest" database = await Database.new(config.database.url(), poolclass=NullPool) yield database await database.dispose() # database_postgres = await Database.new(config_postgres.database.url()) async with database_postgres.connection() as connection: await connection.execution_options(isolation_level="AUTOCOMMIT") await connection.execute(sa.text("drop database pytest")), await connection.execute(sa.text("drop role pytest")) await connection.commit() await database_postgres.dispose() @pytest_asyncio.fixture(scope="session", autouse=True) async def setup_db(db: Database, request): await db.run_migrations() yield # await db.rollback_migrations() @pytest_asyncio.fixture(autouse=True) async def session(db: Database, request): session = db.sessionmaker() yield session await session.rollback() await session.close() @pytest_asyncio.fixture(scope="session") async def user(config: Config, session) -> User: test_user = User( name="pytest", lower_name="pytest", email="pytest@example.com", hashed_password=security.hash_password( "iampytest", algo=config.security.password_hash_algo ), login_type=LoginType.Plain, is_admin=True, ) async with db.session() as session: session.add(test_user) await session.flush() await session.refresh(test_user) yield test_user async with db.session() as session: await session.delete(test_user) await session.flush() @pytest_asyncio.fixture async def data(config: Config): class TestData: user = User( name="pytest", lower_name="pytest", email="pytest@example.com", hashed_password=security.hash_password( "iampytest", algo=config.security.password_hash_algo ), login_type=LoginType.Plain, is_admin=True, ) return TestData() @pytest.mark.asyncio async def test_user(data, session): session.add(data.user) await session.flush() assert data.user.id is not None assert security.validate_password("iampytest", data.user.hashed_password) @pytest.mark.asyncio async def test_repository(data, session, config): session.add(data.user) await session.flush() repository = Repository(user_id=data.user.id, capacity=config.repository.capacity) session.add(repository) await session.flush() assert repository.id is not None @pytest.mark.asyncio async def test_directory(data, session, config): session.add(data.user) await session.flush() repository = Repository(user_id=data.user.id, capacity=config.repository.capacity) session.add(repository) await session.flush() directory = Directory( repository_id=repository.id, parent_id=None, name="test1", path=None ) session.add(directory) await session.flush() assert directory.id is not None assert ( await session.scalars( sa.select(Directory).where( sa.and_( Directory.repository_id == repository.id, Directory.name == "test1", Directory.path.is_(None), ) ) ) ).first() == directory nested_directory = Directory( repository_id=repository.id, parent_id=directory.id, name="test_nested", path="test1", ) session.add(nested_directory) await session.flush() assert nested_directory.id is not None assert ( await session.scalars( sa.select(Directory).where( sa.and_( Directory.repository_id == repository.id, Directory.name == "test_nested", Directory.path == "test1", ) ) ) ).first() == nested_directory assert nested_directory.parent_id == directory.id