import uuid import pytest import pytest_asyncio from unittest.mock import AsyncMock, patch from httpx import AsyncClient, ASGITransport from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker from models import Base, Config, Subscription, ExportLog from main import app, get_db @pytest_asyncio.fixture async def db_engine(): engine = create_async_engine("sqlite+aiosqlite:///:memory:") async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) yield engine await engine.dispose() @pytest_asyncio.fixture async def db_session(db_engine): Session = async_sessionmaker(db_engine, expire_on_commit=False) async with Session() as session: yield session @pytest_asyncio.fixture async def http_client(db_session): async def override_get_db(): yield db_session app.dependency_overrides[get_db] = override_get_db async with AsyncClient( transport=ASGITransport(app=app), base_url="http://test" ) as client: yield client app.dependency_overrides.clear() async def test_get_config_not_found(http_client): resp = await http_client.get("/config/nonexistent-token.yaml") assert resp.status_code == 404 async def test_get_config_returns_yaml(http_client, db_session): token = str(uuid.uuid4()) config = Config( name="test", token=token, base_yaml="proxies: []\nproxy-groups: []\nrules:\n - MATCH,DIRECT\n", ) db_session.add(config) await db_session.commit() with patch("main.mihomo_client") as mock_mc: mock_mc.refresh_and_collect = AsyncMock(return_value=[]) resp = await http_client.get(f"/config/{token}.yaml") assert resp.status_code == 200 assert "proxies" in resp.text async def test_get_config_writes_export_log(http_client, db_session): from sqlalchemy import select token = str(uuid.uuid4()) config = Config( name="test", token=token, base_yaml="proxies: []\nproxy-groups: []\nrules: []\n", ) db_session.add(config) await db_session.commit() with patch("main.mihomo_client") as mock_mc: mock_mc.refresh_and_collect = AsyncMock(return_value=[]) await http_client.get(f"/config/{token}.yaml") result = await db_session.execute( select(ExportLog).where(ExportLog.config_id == config.id) ) logs = result.scalars().all() assert len(logs) == 1 assert logs[0].success is True async def test_get_config_with_subscription_expands_nodes(http_client, db_session): token = str(uuid.uuid4()) config = Config( name="test", token=token, base_yaml=( "proxies: []\n" "proxy-providers:\n" " myprovider:\n" " type: http\n" " url: https://example.com/sub\n" " interval: 3600\n" "proxy-groups:\n" " - name: Proxy\n" " type: select\n" " use:\n" " - myprovider\n" "rules:\n" " - MATCH,DIRECT\n" ), ) db_session.add(config) await db_session.flush() sub = Subscription(config_id=config.id, name="myprovider", url="https://example.com/sub") db_session.add(sub) await db_session.commit() fake_proxies = [ {"name": "node1", "type": "ss", "server": "1.2.3.4", "port": 443, "password": "pwd", "cipher": "aes-256-gcm", "alive": True}, ] with patch("main.mihomo_client") as mock_mc: mock_mc.refresh_and_collect = AsyncMock(return_value=fake_proxies) resp = await http_client.get(f"/config/{token}.yaml") assert resp.status_code == 200 assert "node1" in resp.text assert "proxy-providers" not in resp.text assert "alive" not in resp.text # --- Admin UI tests --- async def test_index_returns_200(http_client): resp = await http_client.get("/") assert resp.status_code == 200 async def test_create_config_redirects(http_client, db_session): from sqlalchemy import select with patch("main.write_and_reload_mihomo", new_callable=AsyncMock): resp = await http_client.post( "/configs/new", data={"name": "MyConfig", "base_yaml": "proxies: []\nrules: []\nproxy-groups: []\n"}, follow_redirects=False, ) assert resp.status_code in (302, 303) result = await db_session.execute(select(Config).where(Config.name == "MyConfig")) config = result.scalar_one_or_none() assert config is not None assert config.token is not None async def test_config_detail_returns_200(http_client, db_session): config = Config(name="c", token=str(uuid.uuid4()), base_yaml="proxies: []\n") db_session.add(config) await db_session.commit() resp = await http_client.get(f"/configs/{config.id}") assert resp.status_code == 200 async def test_config_detail_404_for_missing(http_client): resp = await http_client.get("/configs/99999") assert resp.status_code == 404 async def test_update_config_base_yaml(http_client, db_session): config = Config(name="c", token=str(uuid.uuid4()), base_yaml="proxies: []\n") db_session.add(config) await db_session.commit() with patch("main.write_and_reload_mihomo", new_callable=AsyncMock): resp = await http_client.post( f"/configs/{config.id}", data={"base_yaml": "proxies: []\nrules: []\n"}, follow_redirects=False, ) assert resp.status_code in (302, 303) await db_session.refresh(config) assert "rules" in config.base_yaml async def test_delete_config(http_client, db_session): from sqlalchemy import select config = Config(name="c", token=str(uuid.uuid4()), base_yaml="proxies: []\n") db_session.add(config) await db_session.commit() config_id = config.id with patch("main.write_and_reload_mihomo", new_callable=AsyncMock): resp = await http_client.post(f"/configs/{config_id}/delete", follow_redirects=False) assert resp.status_code in (302, 303) result = await db_session.execute(select(Config).where(Config.id == config_id)) assert result.scalar_one_or_none() is None async def test_add_subscription(http_client, db_session): from sqlalchemy import select config = Config(name="c", token=str(uuid.uuid4()), base_yaml="proxies: []\n") db_session.add(config) await db_session.commit() resp = await http_client.post( f"/configs/{config.id}/subscriptions/new", data={"name": "mysub", "url": "https://example.com/sub"}, follow_redirects=False, ) assert resp.status_code in (302, 303) result = await db_session.execute( select(Subscription).where(Subscription.config_id == config.id) ) subs = result.scalars().all() assert len(subs) == 1 assert subs[0].name == "mysub" async def test_delete_subscription(http_client, db_session): from sqlalchemy import select config = Config(name="c", token=str(uuid.uuid4()), base_yaml="proxies: []\n") db_session.add(config) await db_session.flush() sub = Subscription(config_id=config.id, name="s", url="https://example.com/sub") db_session.add(sub) await db_session.commit() sub_id = sub.id resp = await http_client.post(f"/subscriptions/{sub_id}/delete", follow_redirects=False) assert resp.status_code in (302, 303) result = await db_session.execute(select(Subscription).where(Subscription.id == sub_id)) assert result.scalar_one_or_none() is None async def test_logs_page_returns_200(http_client, db_session): config = Config(name="c", token=str(uuid.uuid4()), base_yaml="proxies: []\n") db_session.add(config) await db_session.flush() log = ExportLog(config_id=config.id, node_count=3, success=True) db_session.add(log) await db_session.commit() resp = await http_client.get(f"/configs/{config.id}/logs") assert resp.status_code == 200 async def test_force_refresh(http_client, db_session): config = Config(name="c", token=str(uuid.uuid4()), base_yaml="proxies: []\n") db_session.add(config) await db_session.flush() sub = Subscription(config_id=config.id, name="p", url="https://example.com/sub") db_session.add(sub) await db_session.commit() with patch("main.mihomo_client") as mock_mc: mock_mc.refresh_and_collect = AsyncMock(return_value=[]) resp = await http_client.post(f"/configs/{config.id}/refresh", follow_redirects=False) assert resp.status_code in (302, 303)