eabeb54767
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
289 lines
8.9 KiB
Python
289 lines
8.9 KiB
Python
import uuid
|
|
import pytest
|
|
import pytest_asyncio
|
|
from unittest.mock import AsyncMock, patch
|
|
from httpx import AsyncClient, ASGITransport
|
|
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
|
|
|
|
from models import Base, Config, Subscription, ExportLog
|
|
from main import app, get_db
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def db_engine():
|
|
engine = create_async_engine("sqlite+aiosqlite:///:memory:")
|
|
async with engine.begin() as conn:
|
|
await conn.run_sync(Base.metadata.create_all)
|
|
yield engine
|
|
await engine.dispose()
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def db_session(db_engine):
|
|
Session = async_sessionmaker(db_engine, expire_on_commit=False)
|
|
async with Session() as session:
|
|
yield session
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def http_client(db_session):
|
|
async def override_get_db():
|
|
yield db_session
|
|
|
|
app.dependency_overrides[get_db] = override_get_db
|
|
|
|
async with AsyncClient(
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
|
) as client:
|
|
yield client
|
|
|
|
app.dependency_overrides.clear()
|
|
|
|
|
|
async def test_get_config_not_found(http_client):
|
|
resp = await http_client.get("/config/nonexistent-token.yaml")
|
|
assert resp.status_code == 404
|
|
|
|
|
|
async def test_get_config_returns_yaml(http_client, db_session):
|
|
token = str(uuid.uuid4())
|
|
config = Config(
|
|
name="test",
|
|
token=token,
|
|
base_yaml="proxies: []\nproxy-groups: []\nrules:\n - MATCH,DIRECT\n",
|
|
)
|
|
db_session.add(config)
|
|
await db_session.commit()
|
|
|
|
with patch("main.mihomo_client") as mock_mc:
|
|
mock_mc.refresh_and_collect = AsyncMock(return_value=[])
|
|
resp = await http_client.get(f"/config/{token}.yaml")
|
|
|
|
assert resp.status_code == 200
|
|
assert "proxies" in resp.text
|
|
|
|
|
|
async def test_get_config_writes_export_log(http_client, db_session):
|
|
from sqlalchemy import select
|
|
|
|
token = str(uuid.uuid4())
|
|
config = Config(
|
|
name="test",
|
|
token=token,
|
|
base_yaml="proxies: []\nproxy-groups: []\nrules: []\n",
|
|
)
|
|
db_session.add(config)
|
|
await db_session.commit()
|
|
|
|
with patch("main.mihomo_client") as mock_mc:
|
|
mock_mc.refresh_and_collect = AsyncMock(return_value=[])
|
|
await http_client.get(f"/config/{token}.yaml")
|
|
|
|
result = await db_session.execute(
|
|
select(ExportLog).where(ExportLog.config_id == config.id)
|
|
)
|
|
logs = result.scalars().all()
|
|
assert len(logs) == 1
|
|
assert logs[0].success is True
|
|
|
|
|
|
async def test_get_config_with_subscription_expands_nodes(http_client, db_session):
|
|
token = str(uuid.uuid4())
|
|
config = Config(
|
|
name="test",
|
|
token=token,
|
|
base_yaml=(
|
|
"proxies: []\n"
|
|
"proxy-providers:\n"
|
|
" myprovider:\n"
|
|
" type: http\n"
|
|
" url: https://example.com/sub\n"
|
|
" interval: 3600\n"
|
|
"proxy-groups:\n"
|
|
" - name: Proxy\n"
|
|
" type: select\n"
|
|
" use:\n"
|
|
" - myprovider\n"
|
|
"rules:\n"
|
|
" - MATCH,DIRECT\n"
|
|
),
|
|
)
|
|
db_session.add(config)
|
|
await db_session.flush()
|
|
|
|
sub = Subscription(config_id=config.id, name="myprovider", url="https://example.com/sub")
|
|
db_session.add(sub)
|
|
await db_session.commit()
|
|
|
|
fake_proxies = [
|
|
{"name": "node1", "type": "ss", "server": "1.2.3.4", "port": 443,
|
|
"password": "pwd", "cipher": "aes-256-gcm", "alive": True},
|
|
]
|
|
|
|
with patch("main.mihomo_client") as mock_mc:
|
|
mock_mc.refresh_and_collect = AsyncMock(return_value=fake_proxies)
|
|
resp = await http_client.get(f"/config/{token}.yaml")
|
|
|
|
assert resp.status_code == 200
|
|
assert "node1" in resp.text
|
|
assert "proxy-providers" not in resp.text
|
|
assert "alive" not in resp.text
|
|
|
|
|
|
# --- Admin UI tests ---
|
|
|
|
async def test_index_returns_200(http_client):
|
|
resp = await http_client.get("/")
|
|
assert resp.status_code == 200
|
|
|
|
|
|
async def test_create_config_redirects(http_client, db_session):
|
|
from sqlalchemy import select
|
|
|
|
with patch("main.write_and_reload_mihomo", new_callable=AsyncMock):
|
|
resp = await http_client.post(
|
|
"/configs/new",
|
|
data={"name": "MyConfig", "base_yaml": "proxies: []\nrules: []\nproxy-groups: []\n"},
|
|
follow_redirects=False,
|
|
)
|
|
assert resp.status_code == 303
|
|
|
|
result = await db_session.execute(select(Config).where(Config.name == "MyConfig"))
|
|
config = result.scalar_one_or_none()
|
|
assert config is not None
|
|
assert config.token is not None
|
|
|
|
|
|
async def test_config_detail_returns_200(http_client, db_session):
|
|
config = Config(name="c", token=str(uuid.uuid4()), base_yaml="proxies: []\n")
|
|
db_session.add(config)
|
|
await db_session.commit()
|
|
|
|
resp = await http_client.get(f"/configs/{config.id}")
|
|
assert resp.status_code == 200
|
|
|
|
|
|
async def test_config_detail_404_for_missing(http_client):
|
|
resp = await http_client.get("/configs/99999")
|
|
assert resp.status_code == 404
|
|
|
|
|
|
async def test_update_config_base_yaml(http_client, db_session):
|
|
config = Config(name="c", token=str(uuid.uuid4()), base_yaml="proxies: []\n")
|
|
db_session.add(config)
|
|
await db_session.commit()
|
|
|
|
with patch("main.write_and_reload_mihomo", new_callable=AsyncMock):
|
|
resp = await http_client.post(
|
|
f"/configs/{config.id}",
|
|
data={"base_yaml": "proxies: []\nrules: []\n"},
|
|
follow_redirects=False,
|
|
)
|
|
assert resp.status_code == 303
|
|
|
|
await db_session.refresh(config)
|
|
assert "rules" in config.base_yaml
|
|
|
|
|
|
async def test_delete_config(http_client, db_session):
|
|
from sqlalchemy import select
|
|
|
|
config = Config(name="c", token=str(uuid.uuid4()), base_yaml="proxies: []\n")
|
|
db_session.add(config)
|
|
await db_session.flush()
|
|
|
|
sub = Subscription(config_id=config.id, name="s", url="https://example.com/sub")
|
|
log = ExportLog(config_id=config.id, node_count=1, success=True)
|
|
db_session.add(sub)
|
|
db_session.add(log)
|
|
await db_session.commit()
|
|
config_id = config.id
|
|
sub_id = sub.id
|
|
log_id = log.id
|
|
|
|
with patch("main.write_and_reload_mihomo", new_callable=AsyncMock):
|
|
resp = await http_client.post(f"/configs/{config_id}/delete", follow_redirects=False)
|
|
assert resp.status_code == 303
|
|
|
|
result = await db_session.execute(select(Config).where(Config.id == config_id))
|
|
assert result.scalar_one_or_none() is None
|
|
|
|
assert (await db_session.get(Subscription, sub_id)) is None
|
|
assert (await db_session.get(ExportLog, log_id)) is None
|
|
|
|
|
|
async def test_add_subscription(http_client, db_session):
|
|
from sqlalchemy import select
|
|
|
|
config = Config(name="c", token=str(uuid.uuid4()), base_yaml="proxies: []\n")
|
|
db_session.add(config)
|
|
await db_session.commit()
|
|
|
|
resp = await http_client.post(
|
|
f"/configs/{config.id}/subscriptions/new",
|
|
data={"name": "mysub", "url": "https://example.com/sub"},
|
|
follow_redirects=False,
|
|
)
|
|
assert resp.status_code == 303
|
|
|
|
result = await db_session.execute(
|
|
select(Subscription).where(Subscription.config_id == config.id)
|
|
)
|
|
subs = result.scalars().all()
|
|
assert len(subs) == 1
|
|
assert subs[0].name == "mysub"
|
|
|
|
|
|
async def test_delete_subscription(http_client, db_session):
|
|
from sqlalchemy import select
|
|
|
|
config = Config(name="c", token=str(uuid.uuid4()), base_yaml="proxies: []\n")
|
|
db_session.add(config)
|
|
await db_session.flush()
|
|
|
|
sub = Subscription(config_id=config.id, name="s", url="https://example.com/sub")
|
|
db_session.add(sub)
|
|
await db_session.commit()
|
|
sub_id = sub.id
|
|
|
|
resp = await http_client.post(f"/subscriptions/{sub_id}/delete", follow_redirects=False)
|
|
assert resp.status_code == 303
|
|
|
|
result = await db_session.execute(select(Subscription).where(Subscription.id == sub_id))
|
|
assert result.scalar_one_or_none() is None
|
|
|
|
|
|
async def test_logs_page_returns_200(http_client, db_session):
|
|
config = Config(name="c", token=str(uuid.uuid4()), base_yaml="proxies: []\n")
|
|
db_session.add(config)
|
|
await db_session.flush()
|
|
|
|
log = ExportLog(config_id=config.id, node_count=3, success=True)
|
|
db_session.add(log)
|
|
await db_session.commit()
|
|
|
|
resp = await http_client.get(f"/configs/{config.id}/logs")
|
|
assert resp.status_code == 200
|
|
|
|
|
|
async def test_force_refresh(http_client, db_session):
|
|
from sqlalchemy import select
|
|
|
|
config = Config(name="c", token=str(uuid.uuid4()), base_yaml="proxies: []\n")
|
|
db_session.add(config)
|
|
await db_session.flush()
|
|
|
|
sub = Subscription(config_id=config.id, name="p", url="https://example.com/sub")
|
|
db_session.add(sub)
|
|
await db_session.commit()
|
|
|
|
with patch("main.mihomo_client") as mock_mc:
|
|
mock_mc.refresh_and_collect = AsyncMock(return_value=[])
|
|
resp = await http_client.post(f"/configs/{config.id}/refresh", follow_redirects=False)
|
|
|
|
assert resp.status_code == 303
|
|
assert mock_mc.refresh_and_collect.await_count == 1
|
|
|
|
logs = (await db_session.execute(select(ExportLog))).scalars().all()
|
|
assert logs == []
|