927abe6111
Implements SQLAlchemy 2.0 async ORM models (Config, Subscription, ExportLog) with cascade delete-orphan relationships. Upgrades SQLAlchemy to 2.0.49 for Python 3.14 compatibility. Adds pytest conftest with in-memory SQLite fixture; all 4 TDD tests pass. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
68 lines
2.1 KiB
Python
68 lines
2.1 KiB
Python
import uuid
|
|
import pytest
|
|
from sqlalchemy import select
|
|
from models import Config, Subscription, ExportLog
|
|
|
|
|
|
async def test_config_insert_and_retrieve(db_session):
|
|
token = str(uuid.uuid4())
|
|
config = Config(name="my-config", token=token, base_yaml="proxies: []")
|
|
db_session.add(config)
|
|
await db_session.commit()
|
|
|
|
result = await db_session.execute(select(Config).where(Config.token == token))
|
|
found = result.scalar_one()
|
|
assert found.id is not None
|
|
assert found.name == "my-config"
|
|
assert found.created_at is not None
|
|
|
|
|
|
async def test_subscription_belongs_to_config(db_session):
|
|
config = Config(name="c", token=str(uuid.uuid4()), base_yaml="proxies: []")
|
|
db_session.add(config)
|
|
await db_session.flush()
|
|
|
|
sub = Subscription(config_id=config.id, name="provider1", url="https://example.com/sub")
|
|
db_session.add(sub)
|
|
await db_session.commit()
|
|
|
|
result = await db_session.execute(select(Subscription).where(Subscription.config_id == config.id))
|
|
subs = result.scalars().all()
|
|
assert len(subs) == 1
|
|
assert subs[0].name == "provider1"
|
|
|
|
|
|
async def test_cascade_delete_removes_subscriptions(db_session):
|
|
config = Config(name="c", token=str(uuid.uuid4()), base_yaml="proxies: []")
|
|
db_session.add(config)
|
|
await db_session.flush()
|
|
|
|
sub = Subscription(config_id=config.id, name="p", url="https://example.com/sub")
|
|
log = ExportLog(config_id=config.id, node_count=5, success=True)
|
|
db_session.add(sub)
|
|
db_session.add(log)
|
|
await db_session.commit()
|
|
|
|
sub_id = sub.id
|
|
log_id = log.id
|
|
|
|
await db_session.delete(config)
|
|
await db_session.commit()
|
|
|
|
assert (await db_session.get(Subscription, sub_id)) is None
|
|
assert (await db_session.get(ExportLog, log_id)) is None
|
|
|
|
|
|
async def test_export_log_defaults(db_session):
|
|
config = Config(name="c", token=str(uuid.uuid4()), base_yaml="proxies: []")
|
|
db_session.add(config)
|
|
await db_session.flush()
|
|
|
|
log = ExportLog(config_id=config.id)
|
|
db_session.add(log)
|
|
await db_session.commit()
|
|
|
|
assert log.success is True
|
|
assert log.node_count == 0
|
|
assert log.fetched_at is not None
|