2024-07-25 13:33:05 +05:00
|
|
|
import pytest_asyncio
|
|
|
|
import pytest
|
|
|
|
import os
|
2024-08-03 01:01:01 +05:00
|
|
|
from pathlib import Path
|
2024-07-31 16:37:04 +05:00
|
|
|
from materia.config import Config
|
2024-08-03 01:01:01 +05:00
|
|
|
from materia.models import (
|
|
|
|
Database,
|
|
|
|
User,
|
|
|
|
LoginType,
|
|
|
|
Repository,
|
|
|
|
Directory,
|
|
|
|
RepositoryError,
|
|
|
|
)
|
|
|
|
from materia.models.base import Base
|
|
|
|
from materia.models.database import SessionContext
|
2024-07-31 16:37:04 +05:00
|
|
|
from materia import security
|
2024-07-25 13:33:05 +05:00
|
|
|
import sqlalchemy as sa
|
|
|
|
from sqlalchemy.pool import NullPool
|
2024-08-03 01:01:01 +05:00
|
|
|
from sqlalchemy.orm.session import make_transient
|
2024-07-25 13:33:05 +05:00
|
|
|
from dataclasses import dataclass
|
|
|
|
|
|
|
|
|
|
|
|
@pytest_asyncio.fixture(scope="session")
|
|
|
|
async def config() -> Config:
|
|
|
|
conf = Config()
|
|
|
|
conf.database.port = 54320
|
|
|
|
# conf.application.working_directory = conf.application.working_directory / "temp"
|
|
|
|
# if (cwd := conf.application.working_directory.resolve()).exists():
|
|
|
|
# os.chdir(cwd)
|
|
|
|
# if local_conf := Config.open(cwd / "config.toml"):
|
|
|
|
# conf = local_conf
|
|
|
|
return conf
|
|
|
|
|
|
|
|
|
|
|
|
@pytest_asyncio.fixture(scope="session")
|
|
|
|
async def db(config: Config, request) -> Database:
|
|
|
|
config_postgres = config
|
|
|
|
config_postgres.database.user = "postgres"
|
|
|
|
config_postgres.database.name = "postgres"
|
|
|
|
database_postgres = await Database.new(
|
|
|
|
config_postgres.database.url(), poolclass=NullPool
|
|
|
|
)
|
|
|
|
|
|
|
|
async with database_postgres.connection() as connection:
|
|
|
|
await connection.execution_options(isolation_level="AUTOCOMMIT")
|
|
|
|
await connection.execute(sa.text("create role pytest login"))
|
|
|
|
await connection.execute(sa.text("create database pytest owner pytest"))
|
|
|
|
await connection.commit()
|
|
|
|
|
|
|
|
await database_postgres.dispose()
|
|
|
|
|
|
|
|
config.database.user = "pytest"
|
|
|
|
config.database.name = "pytest"
|
|
|
|
database = await Database.new(config.database.url(), poolclass=NullPool)
|
|
|
|
|
|
|
|
yield database
|
|
|
|
|
|
|
|
await database.dispose()
|
|
|
|
|
|
|
|
async with database_postgres.connection() as connection:
|
|
|
|
await connection.execution_options(isolation_level="AUTOCOMMIT")
|
|
|
|
await connection.execute(sa.text("drop database pytest")),
|
|
|
|
await connection.execute(sa.text("drop role pytest"))
|
|
|
|
await connection.commit()
|
|
|
|
await database_postgres.dispose()
|
|
|
|
|
|
|
|
|
2024-08-03 01:01:01 +05:00
|
|
|
"""
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
async def test_migrations(db):
|
|
|
|
await db.run_migrations()
|
|
|
|
await db.rollback_migrations()
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
2024-07-25 13:33:05 +05:00
|
|
|
@pytest_asyncio.fixture(scope="session", autouse=True)
|
|
|
|
async def setup_db(db: Database, request):
|
2024-08-03 01:01:01 +05:00
|
|
|
async with db.connection() as connection:
|
|
|
|
await connection.run_sync(Base.metadata.create_all)
|
|
|
|
await connection.commit()
|
2024-07-25 13:33:05 +05:00
|
|
|
yield
|
2024-08-03 01:01:01 +05:00
|
|
|
async with db.connection() as connection:
|
|
|
|
await connection.run_sync(Base.metadata.drop_all)
|
|
|
|
await connection.commit()
|
2024-07-25 13:33:05 +05:00
|
|
|
|
|
|
|
|
|
|
|
@pytest_asyncio.fixture(autouse=True)
|
|
|
|
async def session(db: Database, request):
|
|
|
|
session = db.sessionmaker()
|
|
|
|
yield session
|
|
|
|
await session.rollback()
|
|
|
|
await session.close()
|
|
|
|
|
|
|
|
|
2024-08-03 01:01:01 +05:00
|
|
|
"""
|
2024-07-25 13:33:05 +05:00
|
|
|
@pytest_asyncio.fixture(scope="session")
|
|
|
|
async def user(config: Config, session) -> User:
|
|
|
|
test_user = User(
|
|
|
|
name="pytest",
|
|
|
|
lower_name="pytest",
|
|
|
|
email="pytest@example.com",
|
|
|
|
hashed_password=security.hash_password(
|
|
|
|
"iampytest", algo=config.security.password_hash_algo
|
|
|
|
),
|
|
|
|
login_type=LoginType.Plain,
|
|
|
|
is_admin=True,
|
|
|
|
)
|
|
|
|
|
|
|
|
async with db.session() as session:
|
|
|
|
session.add(test_user)
|
|
|
|
await session.flush()
|
|
|
|
await session.refresh(test_user)
|
|
|
|
|
|
|
|
yield test_user
|
|
|
|
|
|
|
|
async with db.session() as session:
|
|
|
|
await session.delete(test_user)
|
|
|
|
await session.flush()
|
2024-08-03 01:01:01 +05:00
|
|
|
"""
|
2024-07-25 13:33:05 +05:00
|
|
|
|
|
|
|
|
2024-08-03 01:01:01 +05:00
|
|
|
@pytest_asyncio.fixture(scope="function")
|
2024-07-25 13:33:05 +05:00
|
|
|
async def data(config: Config):
|
|
|
|
class TestData:
|
|
|
|
user = User(
|
2024-08-03 01:01:01 +05:00
|
|
|
name="PyTest",
|
2024-07-25 13:33:05 +05:00
|
|
|
lower_name="pytest",
|
|
|
|
email="pytest@example.com",
|
|
|
|
hashed_password=security.hash_password(
|
|
|
|
"iampytest", algo=config.security.password_hash_algo
|
|
|
|
),
|
|
|
|
login_type=LoginType.Plain,
|
|
|
|
is_admin=True,
|
|
|
|
)
|
|
|
|
|
|
|
|
return TestData()
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
2024-08-03 01:01:01 +05:00
|
|
|
async def test_user(data, session: SessionContext, config: Config):
|
|
|
|
# simple
|
2024-07-25 13:33:05 +05:00
|
|
|
session.add(data.user)
|
|
|
|
await session.flush()
|
|
|
|
|
|
|
|
assert data.user.id is not None
|
|
|
|
assert security.validate_password("iampytest", data.user.hashed_password)
|
|
|
|
|
2024-08-03 01:01:01 +05:00
|
|
|
await session.rollback()
|
|
|
|
|
|
|
|
# methods
|
|
|
|
await data.user.new(session, config)
|
|
|
|
|
|
|
|
assert data.user.id is not None
|
|
|
|
assert await data.user.count(session) == 1
|
|
|
|
assert await User.by_name("PyTest", session) == data.user
|
|
|
|
assert await User.by_email("pytest@example.com", session) == data.user
|
|
|
|
|
|
|
|
await data.user.edit_name("AsyncPyTest", session)
|
|
|
|
assert await User.by_name("asyncpytest", session, with_lower=True) == data.user
|
|
|
|
|
|
|
|
await data.user.remove(session)
|
|
|
|
|
2024-07-25 13:33:05 +05:00
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
2024-08-03 01:01:01 +05:00
|
|
|
async def test_repository(data, tmpdir, session: SessionContext, config: Config):
|
|
|
|
config.application.working_directory = Path(tmpdir)
|
|
|
|
|
2024-07-25 13:33:05 +05:00
|
|
|
session.add(data.user)
|
|
|
|
await session.flush()
|
|
|
|
|
2024-08-03 01:01:01 +05:00
|
|
|
repository = await Repository(
|
|
|
|
user_id=data.user.id, capacity=config.repository.capacity
|
|
|
|
).new(session, config)
|
2024-07-25 13:33:05 +05:00
|
|
|
|
2024-08-03 01:01:01 +05:00
|
|
|
assert repository
|
2024-07-25 13:33:05 +05:00
|
|
|
assert repository.id is not None
|
2024-08-03 01:01:01 +05:00
|
|
|
assert (await repository.path(session, config)).exists()
|
|
|
|
assert await Repository.from_user(data.user, session) == repository
|
|
|
|
|
|
|
|
await repository.remove(session, config)
|
|
|
|
make_transient(repository)
|
|
|
|
session.add(repository)
|
|
|
|
await session.flush()
|
|
|
|
with pytest.raises(RepositoryError):
|
|
|
|
await repository.remove(session, config)
|
|
|
|
assert not (await repository.path(session, config)).exists()
|
2024-07-25 13:33:05 +05:00
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
2024-08-03 01:01:01 +05:00
|
|
|
async def test_directory(data, tmpdir, session: SessionContext, config: Config):
|
|
|
|
# setup
|
2024-07-25 13:33:05 +05:00
|
|
|
session.add(data.user)
|
|
|
|
await session.flush()
|
|
|
|
|
|
|
|
repository = Repository(user_id=data.user.id, capacity=config.repository.capacity)
|
|
|
|
session.add(repository)
|
|
|
|
await session.flush()
|
|
|
|
|
2024-08-03 01:01:01 +05:00
|
|
|
directory = Directory(repository_id=repository.id, parent_id=None, name="test1")
|
2024-07-25 13:33:05 +05:00
|
|
|
session.add(directory)
|
|
|
|
await session.flush()
|
|
|
|
|
2024-08-03 01:01:01 +05:00
|
|
|
# simple
|
2024-07-25 13:33:05 +05:00
|
|
|
assert directory.id is not None
|
|
|
|
assert (
|
|
|
|
await session.scalars(
|
|
|
|
sa.select(Directory).where(
|
|
|
|
sa.and_(
|
|
|
|
Directory.repository_id == repository.id,
|
|
|
|
Directory.name == "test1",
|
|
|
|
)
|
|
|
|
)
|
|
|
|
)
|
|
|
|
).first() == directory
|
|
|
|
|
2024-08-03 01:01:01 +05:00
|
|
|
# nested simple
|
2024-07-25 13:33:05 +05:00
|
|
|
nested_directory = Directory(
|
|
|
|
repository_id=repository.id,
|
|
|
|
parent_id=directory.id,
|
|
|
|
name="test_nested",
|
|
|
|
)
|
|
|
|
session.add(nested_directory)
|
|
|
|
await session.flush()
|
|
|
|
|
|
|
|
assert nested_directory.id is not None
|
|
|
|
assert (
|
|
|
|
await session.scalars(
|
|
|
|
sa.select(Directory).where(
|
|
|
|
sa.and_(
|
|
|
|
Directory.repository_id == repository.id,
|
|
|
|
Directory.name == "test_nested",
|
|
|
|
)
|
|
|
|
)
|
|
|
|
)
|
|
|
|
).first() == nested_directory
|
|
|
|
assert nested_directory.parent_id == directory.id
|