materia/tests/test_database.py

186 lines
5.3 KiB
Python
Raw Normal View History

2024-07-25 13:33:05 +05:00
import pytest_asyncio
import pytest
import os
from materia_server.config import Config
from materia_server.models import Database, User, LoginType, Repository, Directory
from materia_server import security
import sqlalchemy as sa
from sqlalchemy.pool import NullPool
from dataclasses import dataclass
@pytest_asyncio.fixture(scope="session")
async def config() -> Config:
conf = Config()
conf.database.port = 54320
# conf.application.working_directory = conf.application.working_directory / "temp"
# if (cwd := conf.application.working_directory.resolve()).exists():
# os.chdir(cwd)
# if local_conf := Config.open(cwd / "config.toml"):
# conf = local_conf
return conf
@pytest_asyncio.fixture(scope="session")
async def db(config: Config, request) -> Database:
config_postgres = config
config_postgres.database.user = "postgres"
config_postgres.database.name = "postgres"
database_postgres = await Database.new(
config_postgres.database.url(), poolclass=NullPool
)
async with database_postgres.connection() as connection:
await connection.execution_options(isolation_level="AUTOCOMMIT")
await connection.execute(sa.text("create role pytest login"))
await connection.execute(sa.text("create database pytest owner pytest"))
await connection.commit()
await database_postgres.dispose()
config.database.user = "pytest"
config.database.name = "pytest"
database = await Database.new(config.database.url(), poolclass=NullPool)
yield database
await database.dispose()
# database_postgres = await Database.new(config_postgres.database.url())
async with database_postgres.connection() as connection:
await connection.execution_options(isolation_level="AUTOCOMMIT")
await connection.execute(sa.text("drop database pytest")),
await connection.execute(sa.text("drop role pytest"))
await connection.commit()
await database_postgres.dispose()
@pytest_asyncio.fixture(scope="session", autouse=True)
async def setup_db(db: Database, request):
await db.run_migrations()
yield
# await db.rollback_migrations()
@pytest_asyncio.fixture(autouse=True)
async def session(db: Database, request):
session = db.sessionmaker()
yield session
await session.rollback()
await session.close()
@pytest_asyncio.fixture(scope="session")
async def user(config: Config, session) -> User:
test_user = User(
name="pytest",
lower_name="pytest",
email="pytest@example.com",
hashed_password=security.hash_password(
"iampytest", algo=config.security.password_hash_algo
),
login_type=LoginType.Plain,
is_admin=True,
)
async with db.session() as session:
session.add(test_user)
await session.flush()
await session.refresh(test_user)
yield test_user
async with db.session() as session:
await session.delete(test_user)
await session.flush()
@pytest_asyncio.fixture
async def data(config: Config):
class TestData:
user = User(
name="pytest",
lower_name="pytest",
email="pytest@example.com",
hashed_password=security.hash_password(
"iampytest", algo=config.security.password_hash_algo
),
login_type=LoginType.Plain,
is_admin=True,
)
return TestData()
@pytest.mark.asyncio
async def test_user(data, session):
session.add(data.user)
await session.flush()
assert data.user.id is not None
assert security.validate_password("iampytest", data.user.hashed_password)
@pytest.mark.asyncio
async def test_repository(data, session, config):
session.add(data.user)
await session.flush()
repository = Repository(user_id=data.user.id, capacity=config.repository.capacity)
session.add(repository)
await session.flush()
assert repository.id is not None
@pytest.mark.asyncio
async def test_directory(data, session, config):
session.add(data.user)
await session.flush()
repository = Repository(user_id=data.user.id, capacity=config.repository.capacity)
session.add(repository)
await session.flush()
directory = Directory(
repository_id=repository.id, parent_id=None, name="test1", path=None
)
session.add(directory)
await session.flush()
assert directory.id is not None
assert (
await session.scalars(
sa.select(Directory).where(
sa.and_(
Directory.repository_id == repository.id,
Directory.name == "test1",
Directory.path.is_(None),
)
)
)
).first() == directory
nested_directory = Directory(
repository_id=repository.id,
parent_id=directory.id,
name="test_nested",
path="test1",
)
session.add(nested_directory)
await session.flush()
assert nested_directory.id is not None
assert (
await session.scalars(
sa.select(Directory).where(
sa.and_(
Directory.repository_id == repository.id,
Directory.name == "test_nested",
Directory.path == "test1",
)
)
)
).first() == nested_directory
assert nested_directory.parent_id == directory.id