Files
urbnywrt 2e1c488bb9 feat: direct subscription fetching, URI parser, FLClashX compatibility
- Fetch subscriptions directly via httpx (mihomo UA → YAML, xray-checker → base64 URI list)
- Add uri_parser.py: vless/vmess/ss/trojan/hysteria2 URI → Mihomo proxy dicts
- Fix YAML quoting for Go parser (strings starting with special chars)
- Remove hidden:true and proxy-providers from delivered configs
- Inject all service groups into GLOBAL proxies for FLClashX group discovery
- Strip placeholder proxy-providers (e.g. "subscription") not in DB
- Fix Mihomo healthcheck: add Bearer auth header
- Add README

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-15 06:04:03 +03:00

345 lines
12 KiB
Python

import logging
import os
import yaml
from contextlib import asynccontextmanager
from datetime import datetime
from typing import AsyncGenerator
import httpx
import uuid
from fastapi import FastAPI, Depends, HTTPException, Form, Request
from fastapi.responses import Response, RedirectResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, desc, make_url
from models import Base, Config, Subscription, ExportLog, make_engine, make_session_factory
from mihomo import MihomoClient
from expander import expand_config, build_mihomo_config, inject_providers_for_delivery
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
logger = logging.getLogger(__name__)
DATABASE_URL = os.environ.get("DATABASE_URL", "sqlite+aiosqlite:////data/db/app.db")
MIHOMO_API = os.environ.get("MIHOMO_API", "http://mihomo:9090")
MIHOMO_SECRET = os.environ.get("MIHOMO_SECRET", "")
MIHOMO_CONFIG_DIR = os.environ.get("MIHOMO_CONFIG_DIR", "/data/mihomo")
engine = make_engine(DATABASE_URL)
SessionLocal = make_session_factory(engine)
mihomo_client = MihomoClient(MIHOMO_API, MIHOMO_SECRET)
templates = Jinja2Templates(
directory=os.path.join(os.path.dirname(__file__), "templates")
)
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with SessionLocal() as session:
yield session
async def write_and_reload_mihomo(db: AsyncSession) -> None:
result = await db.execute(select(Config))
configs = result.scalars().all()
mihomo_yaml = build_mihomo_config([c.base_yaml for c in configs], MIHOMO_SECRET)
mihomo_cfg = yaml.safe_load(mihomo_yaml) or {}
existing = set((mihomo_cfg.get("proxy-providers") or {}).keys())
sub_result = await db.execute(select(Subscription))
all_subs = sub_result.scalars().all()
providers = mihomo_cfg.setdefault("proxy-providers", {}) if existing or all_subs else {}
for sub in all_subs:
if sub.name not in existing:
providers[sub.name] = {
"type": "http",
"url": sub.url,
"interval": 3600,
"health-check": {
"enable": True,
"url": "https://www.gstatic.com/generate_204",
"interval": 300,
},
}
if providers:
mihomo_cfg["proxy-providers"] = providers
else:
mihomo_cfg.pop("proxy-providers", None)
config_yaml = yaml.dump(mihomo_cfg, allow_unicode=True, sort_keys=False)
config_path = os.path.join(MIHOMO_CONFIG_DIR, "config.yaml")
tmp_path = config_path + ".tmp"
os.makedirs(MIHOMO_CONFIG_DIR, exist_ok=True)
with open(tmp_path, "w") as f:
f.write(config_yaml)
os.replace(tmp_path, config_path)
logger.info("Wrote Mihomo config to %s", config_path)
await mihomo_client.reload_config()
logger.info("Mihomo config reloaded")
@asynccontextmanager
async def lifespan(app: FastAPI): # type: ignore[type-arg]
os.makedirs(MIHOMO_CONFIG_DIR, exist_ok=True)
db_url = make_url(DATABASE_URL)
db_path = db_url.database
if db_path and db_path != ":memory:":
os.makedirs(os.path.dirname(db_path), exist_ok=True)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
await mihomo_client.wait_ready()
async with SessionLocal() as db:
await write_and_reload_mihomo(db)
yield
app = FastAPI(lifespan=lifespan)
async def _fetch_subscription(url: str) -> list[dict]:
"""Fetch subscription. Tries mihomo UA first (YAML), falls back to xray-checker (base64 URI list)."""
import base64
from uri_parser import parse_proxy_uris
async with httpx.AsyncClient(follow_redirects=True) as c:
try:
resp = await c.get(url, timeout=30.0, headers={"User-Agent": "mihomo"})
resp.raise_for_status()
cfg = yaml.safe_load(resp.text) or {}
proxies = cfg.get("proxies") or []
if proxies:
return proxies
except Exception:
pass
resp = await c.get(url, timeout=30.0, headers={"User-Agent": "xray-checker"})
resp.raise_for_status()
raw = resp.content.strip()
raw += b"=" * (-len(raw) % 4)
decoded = base64.b64decode(raw).decode("utf-8", errors="replace")
return parse_proxy_uris(decoded)
@app.get("/config/{token}.yaml")
async def get_config(
token: str,
db: AsyncSession = Depends(get_db),
) -> Response:
result = await db.execute(select(Config).where(Config.token == token))
config = result.scalar_one_or_none()
if not config:
raise HTTPException(status_code=404, detail="Config not found")
result = await db.execute(
select(Subscription).where(Subscription.config_id == config.id)
)
subscriptions = result.scalars().all()
provider_proxies: dict[str, list[dict]] = {}
errors: list[str] = []
for sub in subscriptions:
try:
provider_proxies[sub.name] = await _fetch_subscription(sub.url)
sub.last_fetched_at = datetime.utcnow()
except Exception as exc:
logger.error("Failed to fetch subscription %s: %s", sub.name, exc)
errors.append(f"{sub.name}: {exc}")
try:
injected_yaml = inject_providers_for_delivery(config.base_yaml, subscriptions)
expanded = expand_config(injected_yaml, provider_proxies)
except Exception as exc:
logger.error("Config expansion failed for token (redacted): %s", exc, exc_info=True)
db.add(
ExportLog(
config_id=config.id,
node_count=0,
success=False,
error_message=str(exc),
)
)
await db.commit()
raise HTTPException(status_code=500, detail="Config expansion failed")
node_count = sum(len(p) for p in provider_proxies.values())
error_msg = "; ".join(errors) if errors else None
db.add(
ExportLog(
config_id=config.id,
node_count=node_count,
success=not bool(errors),
error_message=error_msg,
)
)
await db.commit()
return Response(content=expanded, media_type="application/x-yaml")
@app.get("/")
async def index(request: Request, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Config))
configs = result.scalars().all()
last_logs: dict[int, ExportLog | None] = {}
for cfg in configs:
log_result = await db.execute(
select(ExportLog)
.where(ExportLog.config_id == cfg.id)
.order_by(desc(ExportLog.fetched_at))
.limit(1)
)
last_logs[cfg.id] = log_result.scalar_one_or_none()
return templates.TemplateResponse(
"index.html",
{"request": request, "configs": configs, "last_logs": last_logs},
)
@app.get("/configs/new")
async def new_config_form(request: Request):
return templates.TemplateResponse("config_form.html", {"request": request, "config": None})
@app.post("/configs/new")
async def create_config(
name: str = Form(...),
base_yaml: str = Form(...),
db: AsyncSession = Depends(get_db),
):
config = Config(name=name, token=str(uuid.uuid4()), base_yaml=base_yaml)
db.add(config)
await db.commit()
await write_and_reload_mihomo(db)
return RedirectResponse(f"/configs/{config.id}", status_code=303)
@app.get("/configs/{config_id}")
async def config_detail(config_id: int, request: Request, db: AsyncSession = Depends(get_db)):
config = await db.get(Config, config_id)
if not config:
raise HTTPException(status_code=404, detail="Config not found")
result = await db.execute(
select(Subscription).where(Subscription.config_id == config_id)
)
subscriptions = result.scalars().all()
return templates.TemplateResponse(
"config_detail.html",
{"request": request, "config": config, "subscriptions": subscriptions},
)
@app.post("/configs/{config_id}")
async def update_config(
config_id: int,
base_yaml: str = Form(...),
db: AsyncSession = Depends(get_db),
):
config = await db.get(Config, config_id)
if not config:
raise HTTPException(status_code=404, detail="Config not found")
config.base_yaml = base_yaml
config.updated_at = datetime.utcnow()
await db.commit()
await write_and_reload_mihomo(db)
return RedirectResponse(f"/configs/{config_id}", status_code=303)
@app.post("/configs/{config_id}/delete")
async def delete_config(config_id: int, db: AsyncSession = Depends(get_db)):
config = await db.get(Config, config_id)
if not config:
raise HTTPException(status_code=404, detail="Config not found")
await db.delete(config)
await db.commit()
await write_and_reload_mihomo(db)
return RedirectResponse("/", status_code=303)
@app.get("/configs/{config_id}/subscriptions/new")
async def new_sub_form(config_id: int, request: Request, db: AsyncSession = Depends(get_db)):
config = await db.get(Config, config_id)
if not config:
raise HTTPException(status_code=404, detail="Config not found")
return templates.TemplateResponse(
"sub_form.html", {"request": request, "config": config}
)
@app.post("/configs/{config_id}/subscriptions/new")
async def create_subscription(
config_id: int,
name: str = Form(...),
url: str = Form(...),
db: AsyncSession = Depends(get_db),
):
config = await db.get(Config, config_id)
if not config:
raise HTTPException(status_code=404, detail="Config not found")
sub = Subscription(config_id=config_id, name=name, url=url)
db.add(sub)
await db.commit()
await write_and_reload_mihomo(db)
return RedirectResponse(f"/configs/{config_id}", status_code=303)
@app.post("/subscriptions/{sub_id}/delete")
async def delete_subscription(sub_id: int, db: AsyncSession = Depends(get_db)):
sub = await db.get(Subscription, sub_id)
if not sub:
raise HTTPException(status_code=404, detail="Subscription not found")
config_id = sub.config_id
await db.delete(sub)
await db.commit()
await write_and_reload_mihomo(db)
return RedirectResponse(f"/configs/{config_id}", status_code=303)
@app.get("/configs/{config_id}/logs")
async def config_logs(config_id: int, request: Request, db: AsyncSession = Depends(get_db)):
config = await db.get(Config, config_id)
if not config:
raise HTTPException(status_code=404, detail="Config not found")
result = await db.execute(
select(ExportLog)
.where(ExportLog.config_id == config_id)
.order_by(desc(ExportLog.fetched_at))
.limit(50)
)
logs = result.scalars().all()
return templates.TemplateResponse(
"logs.html", {"request": request, "config": config, "logs": logs}
)
@app.post("/configs/{config_id}/refresh")
async def force_refresh(config_id: int, db: AsyncSession = Depends(get_db)):
config = await db.get(Config, config_id)
if not config:
raise HTTPException(status_code=404, detail="Config not found")
result = await db.execute(
select(Subscription).where(Subscription.config_id == config_id)
)
subscriptions = result.scalars().all()
for sub in subscriptions:
try:
await mihomo_client.refresh_and_collect(sub.name, timeout=30)
sub.last_fetched_at = datetime.utcnow()
except Exception as e:
logger.error("Force-refresh failed for provider %s: %s", sub.name, e)
await db.commit()
return RedirectResponse(f"/configs/{config_id}", status_code=303)