Compare commits

..

10 Commits

Author SHA1 Message Date
urbnywrt 2e1c488bb9 feat: direct subscription fetching, URI parser, FLClashX compatibility
- Fetch subscriptions directly via httpx (mihomo UA → YAML, xray-checker → base64 URI list)
- Add uri_parser.py: vless/vmess/ss/trojan/hysteria2 URI → Mihomo proxy dicts
- Fix YAML quoting for Go parser (strings starting with special chars)
- Remove hidden:true and proxy-providers from delivered configs
- Inject all service groups into GLOBAL proxies for FLClashX group discovery
- Strip placeholder proxy-providers (e.g. "subscription") not in DB
- Fix Mihomo healthcheck: add Bearer auth header
- Add README

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-15 06:04:03 +03:00
urbnywrt 5fb1782fa5 fix: use Docker Hub mihomo image (ghcr.io unreachable in some environments) 2026-05-15 00:22:27 +03:00
urbnywrt 6f84d27d35 feat: complete Mihomo subscription expander service
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-15 00:22:02 +03:00
urbnywrt 04f152a873 feat: add Jinja2 admin UI templates 2026-05-15 00:19:50 +03:00
urbnywrt eabeb54767 fix: remove unused request params, strengthen admin route tests
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-15 00:16:39 +03:00
urbnywrt 43d5820474 feat: add admin UI routes
Add CRUD routes for configs and subscriptions, force-refresh and logs endpoints, stub Jinja2 templates for test coverage.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-15 00:12:57 +03:00
urbnywrt 5416ae7565 fix: safe DATABASE_URL parsing and redact exception from client response
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-15 00:10:28 +03:00
urbnywrt 94c01fe532 feat: add FastAPI app with config delivery endpoint 2026-05-15 00:08:00 +03:00
urbnywrt 5000079cbe fix: guard against empty YAML and missing proxy name in expander
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-15 00:05:03 +03:00
urbnywrt c597a1add7 feat: add config expander and Mihomo config builder
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-14 23:57:45 +03:00
14 changed files with 1640 additions and 2 deletions
+3
View File
@@ -1,3 +1,6 @@
.env
data/db/*
data/mihomo/*
__pycache__/
*.pyc
.claude/
+117
View File
@@ -0,0 +1,117 @@
# Mihomo Subscription Expander
Сервис для раздачи готовых Clash/Mihomo конфигов клиентам. Берёт ваш базовый YAML, подтягивает серверы из подписок и отдаёт полностью развёрнутый конфиг по токен-ссылке — без `proxy-providers`, с реальными прокси внутри каждой группы.
## Как это работает
1. Вы создаёте конфиг с вашими группами (`proxy-groups`) и правилами (`rules`)
2. Добавляете подписки (URL в формате YAML или base64 URI-список)
3. Клиент (Clash Verge, FLClashX и др.) получает конфиг по ссылке `/config/<token>.yaml`
4. При каждом запросе подписки фетчатся свежими, серверы вставляются напрямую в группы
```
Клиент → GET /config/<token>.yaml
├─ Фетч подписок (User-Agent: mihomo → YAML, fallback xray-checker → base64 URI)
├─ Раскрытие use: в proxy-groups
├─ Удаление proxy-providers
└─ Готовый YAML с серверами прямо в группах
```
## Стек
- **FastAPI** + SQLAlchemy 2.0 async (aiosqlite) — веб-интерфейс и API
- **Mihomo** (metacubex/mihomo) — прокси-ядро
- **Caddy** — reverse proxy с Basic Auth и автоматическим TLS
- **Docker Compose** — оркестрация всех трёх сервисов
## Быстрый старт
```bash
git clone <repo>
cd mihomo_injecter
cp .env.example .env
# Отредактируйте .env: задайте MIHOMO_SECRET, ADMIN_USER, ADMIN_PASSWORD, HOST_DOMAIN
```
Сгенерируйте хэш пароля для Caddy:
```bash
docker run --rm caddy:2-alpine caddy hash-password --plaintext 'ваш_пароль'
```
Вставьте полученный хэш в `.env` как `ADMIN_PASSWORD_HASH`.
Запустите:
```bash
docker compose up -d
```
Откройте `https://your-domain.com` — веб-интерфейс для управления конфигами.
## Переменные окружения (.env)
| Переменная | По умолчанию | Описание |
|---|---|---|
| `MIHOMO_SECRET` | `changeme` | Bearer-токен для Mihomo API |
| `ADMIN_USER` | `admin` | Логин для веб-интерфейса |
| `ADMIN_PASSWORD` | `changeme` | Пароль (plaintext, используется Caddy entrypoint) |
| `ADMIN_PASSWORD_HASH` | — | Bcrypt-хэш пароля (генерируется автоматически если задан `ADMIN_PASSWORD`) |
| `HOST_DOMAIN` | — | Домен для Caddy (например `sub.example.com`). Пусто = слушать на всех портах |
| `DATABASE_URL` | sqlite | Путь к базе данных |
| `MIHOMO_API` | `http://mihomo:9090` | URL Mihomo API |
| `MIHOMO_CONFIG_DIR` | `/data/mihomo` | Путь к конфигу Mihomo |
## Базовый конфиг
Пишите обычный Clash/Mihomo YAML. В `proxy-groups` используйте `use: [subscription]` — имя провайдера не важно, оно автоматически заменится на реальные подписки из БД.
```yaml
proxy-groups:
- name: Telegram
type: select
use:
- subscription # любое имя — заменится автоматически
proxies:
- DIRECT
- name: GLOBAL
type: select
use:
- subscription
```
Блок `proxy-providers` в базовом конфиге не нужен — если он есть, неизвестные провайдеры удаляются автоматически.
## Форматы подписок
- **YAML** (`User-Agent: mihomo`) — стандартный Clash/Mihomo формат с `proxies:`
- **Base64 URI-список** (`User-Agent: xray-checker`) — строки `vless://`, `vmess://`, `ss://`, `trojan://`, `hysteria2://`
## Доступ к конфигу
Ссылка для клиента (без авторизации):
```
https://your-domain.com/config/<token>.yaml
```
Токен генерируется автоматически при создании конфига и виден в веб-интерфейсе.
## Структура данных
```
data/
db/app.db # SQLite: конфиги, подписки, логи экспорта
mihomo/ # config.yaml для Mihomo (генерируется автоматически)
```
## Разработка
```bash
cd app
python -m venv .venv && source .venv/bin/activate
pip install -r requirements.txt
pytest
```
+182
View File
@@ -0,0 +1,182 @@
import yaml
from yaml.representer import SafeRepresenter
def _str_representer(dumper: yaml.Dumper, value: str) -> yaml.ScalarNode:
# Quote strings that start with special YAML characters so Go parser doesn't choke
if value and value[0] in ('-', ':', '{', '}', '[', ']', ',', '#', '&', '*', '?', '|', '<', '>', '=', '!', '%', '@', '`'):
return dumper.represent_scalar('tag:yaml.org,2002:str', value, style="'")
return SafeRepresenter.represent_str(dumper, value)
class _SafeDumper(yaml.SafeDumper):
pass
_SafeDumper.add_representer(str, _str_representer)
def _dump(data: dict) -> str:
return yaml.dump(data, Dumper=_SafeDumper, allow_unicode=True, sort_keys=False)
PROXY_FIELDS = {
"name", "type", "server", "port", "uuid", "password", "cipher",
"alterId", "tls", "network", "ws-opts", "h2-opts", "http-opts",
"grpc-opts", "reality-opts", "servername", "fingerprint", "flow",
"udp", "skip-cert-verify", "sni", "username", "plugin", "plugin-opts",
"client-fingerprint", "early-data-header-name", "smux",
}
# Types that are not real proxy protocols — Mihomo may include them in provider
# responses but they're not valid standalone proxy entries.
_SKIP_TYPES = {"direct", "dns", "reject", "pass", "compatible", "relay",
"selector", "fallback", "url-test", "load-balance"}
def filter_proxy(proxy: dict) -> dict:
result = {k: v for k, v in proxy.items() if k in PROXY_FIELDS}
if "type" in result and isinstance(result["type"], str):
result["type"] = result["type"].lower()
return result
def _is_real_proxy(proxy: dict) -> bool:
return "name" in proxy and proxy.get("type", "").lower() not in _SKIP_TYPES
def expand_config(base_yaml: str, provider_proxies: dict[str, list[dict]]) -> str:
cfg = yaml.safe_load(base_yaml) or {}
all_proxies = list(cfg.get("proxies") or [])
provider_to_names: dict[str, list[str]] = {}
all_sub_names: list[str] = []
for provider_name, proxies in provider_proxies.items():
filtered = [filter_proxy(p) for p in proxies if _is_real_proxy(p)]
provider_to_names[provider_name] = [p["name"] for p in filtered]
all_proxies.extend(filtered)
all_sub_names.extend(p["name"] for p in filtered)
cfg["proxies"] = all_proxies
# Deduplicate subscription proxy names (preserve order)
seen: set[str] = set()
unique_sub_names: list[str] = []
for n in all_sub_names:
if n not in seen:
seen.add(n)
unique_sub_names.append(n)
cfg["proxy-groups"] = list(cfg.get("proxy-groups") or [])
sub_names_set = set(unique_sub_names)
expanded_group_names: list[str] = []
for group in cfg.get("proxy-groups") or []:
group.pop("hidden", None)
if "use" not in group:
continue
name = group.get("name", "")
expanded: list[str] = []
for pname in group.pop("use"):
expanded.extend(provider_to_names.get(pname, []))
original = group.get("proxies") or []
special = [p for p in original if p not in sub_names_set and p != "DIRECT"]
group["proxies"] = special + expanded + ["DIRECT"]
if not group["proxies"]:
group["proxies"] = ["DIRECT"]
if name and name != "GLOBAL":
expanded_group_names.append(name)
# Add all service groups to GLOBAL so FLClashX can discover them
if expanded_group_names:
global_group = next(
(g for g in cfg.get("proxy-groups") or [] if g.get("name") == "GLOBAL"),
None,
)
if global_group:
current = global_group.get("proxies") or []
current_set = set(current)
to_add = [n for n in expanded_group_names if n not in current_set]
global_group["proxies"] = current + to_add
cfg.pop("proxy-providers", None)
return _dump(cfg)
def inject_providers_for_delivery(base_yaml: str, subscriptions: list) -> str:
"""At delivery time: inject proxy-providers entries and use: for subscriptions absent from base_yaml."""
if not subscriptions:
return base_yaml
cfg = yaml.safe_load(base_yaml) or {}
db_sub_names = {sub.name for sub in subscriptions}
# Remove placeholder providers that aren't real DB subscriptions
if cfg.get("proxy-providers"):
for name in list(cfg["proxy-providers"].keys()):
if name not in db_sub_names:
del cfg["proxy-providers"][name]
existing_providers = set((cfg.get("proxy-providers") or {}).keys())
new_entries = {}
for sub in subscriptions:
if sub.name not in existing_providers:
new_entries[sub.name] = {
"type": "http",
"url": sub.url,
"interval": 3600,
"health-check": {
"enable": True,
"url": "https://www.gstatic.com/generate_204",
"interval": 300,
},
}
if not new_entries:
return base_yaml
all_known = existing_providers | set(new_entries.keys())
providers = cfg.setdefault("proxy-providers", {})
providers.update(new_entries)
new_names = list(new_entries.keys())
_special = {"DIRECT", "REJECT"}
for group in cfg.get("proxy-groups") or []:
if group.get("name") in _special:
continue
if "use" in group:
# Replace references to unknown providers with our subscriptions
patched = []
for u in group["use"]:
if u in all_known:
patched.append(u)
else:
patched.extend(new_names)
group["use"] = patched if patched else new_names
else:
group["use"] = new_names
return _dump(cfg)
def build_mihomo_config(all_base_yamls: list[str], secret: str) -> str:
providers: dict[str, dict] = {}
for base_yaml in all_base_yamls:
try:
cfg = yaml.safe_load(base_yaml) or {}
for name, provider in (cfg.get("proxy-providers") or {}).items():
if name not in providers:
providers[name] = provider
except yaml.YAMLError:
continue
config: dict = {
"external-controller": "0.0.0.0:9090",
"secret": secret,
"mixed-port": 7890,
"allow-lan": False,
}
if providers:
config["proxy-providers"] = providers
return _dump(config)
+344
View File
@@ -0,0 +1,344 @@
import logging
import os
import yaml
from contextlib import asynccontextmanager
from datetime import datetime
from typing import AsyncGenerator
import httpx
import uuid
from fastapi import FastAPI, Depends, HTTPException, Form, Request
from fastapi.responses import Response, RedirectResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, desc, make_url
from models import Base, Config, Subscription, ExportLog, make_engine, make_session_factory
from mihomo import MihomoClient
from expander import expand_config, build_mihomo_config, inject_providers_for_delivery
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
logger = logging.getLogger(__name__)
DATABASE_URL = os.environ.get("DATABASE_URL", "sqlite+aiosqlite:////data/db/app.db")
MIHOMO_API = os.environ.get("MIHOMO_API", "http://mihomo:9090")
MIHOMO_SECRET = os.environ.get("MIHOMO_SECRET", "")
MIHOMO_CONFIG_DIR = os.environ.get("MIHOMO_CONFIG_DIR", "/data/mihomo")
engine = make_engine(DATABASE_URL)
SessionLocal = make_session_factory(engine)
mihomo_client = MihomoClient(MIHOMO_API, MIHOMO_SECRET)
templates = Jinja2Templates(
directory=os.path.join(os.path.dirname(__file__), "templates")
)
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with SessionLocal() as session:
yield session
async def write_and_reload_mihomo(db: AsyncSession) -> None:
result = await db.execute(select(Config))
configs = result.scalars().all()
mihomo_yaml = build_mihomo_config([c.base_yaml for c in configs], MIHOMO_SECRET)
mihomo_cfg = yaml.safe_load(mihomo_yaml) or {}
existing = set((mihomo_cfg.get("proxy-providers") or {}).keys())
sub_result = await db.execute(select(Subscription))
all_subs = sub_result.scalars().all()
providers = mihomo_cfg.setdefault("proxy-providers", {}) if existing or all_subs else {}
for sub in all_subs:
if sub.name not in existing:
providers[sub.name] = {
"type": "http",
"url": sub.url,
"interval": 3600,
"health-check": {
"enable": True,
"url": "https://www.gstatic.com/generate_204",
"interval": 300,
},
}
if providers:
mihomo_cfg["proxy-providers"] = providers
else:
mihomo_cfg.pop("proxy-providers", None)
config_yaml = yaml.dump(mihomo_cfg, allow_unicode=True, sort_keys=False)
config_path = os.path.join(MIHOMO_CONFIG_DIR, "config.yaml")
tmp_path = config_path + ".tmp"
os.makedirs(MIHOMO_CONFIG_DIR, exist_ok=True)
with open(tmp_path, "w") as f:
f.write(config_yaml)
os.replace(tmp_path, config_path)
logger.info("Wrote Mihomo config to %s", config_path)
await mihomo_client.reload_config()
logger.info("Mihomo config reloaded")
@asynccontextmanager
async def lifespan(app: FastAPI): # type: ignore[type-arg]
os.makedirs(MIHOMO_CONFIG_DIR, exist_ok=True)
db_url = make_url(DATABASE_URL)
db_path = db_url.database
if db_path and db_path != ":memory:":
os.makedirs(os.path.dirname(db_path), exist_ok=True)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
await mihomo_client.wait_ready()
async with SessionLocal() as db:
await write_and_reload_mihomo(db)
yield
app = FastAPI(lifespan=lifespan)
async def _fetch_subscription(url: str) -> list[dict]:
"""Fetch subscription. Tries mihomo UA first (YAML), falls back to xray-checker (base64 URI list)."""
import base64
from uri_parser import parse_proxy_uris
async with httpx.AsyncClient(follow_redirects=True) as c:
try:
resp = await c.get(url, timeout=30.0, headers={"User-Agent": "mihomo"})
resp.raise_for_status()
cfg = yaml.safe_load(resp.text) or {}
proxies = cfg.get("proxies") or []
if proxies:
return proxies
except Exception:
pass
resp = await c.get(url, timeout=30.0, headers={"User-Agent": "xray-checker"})
resp.raise_for_status()
raw = resp.content.strip()
raw += b"=" * (-len(raw) % 4)
decoded = base64.b64decode(raw).decode("utf-8", errors="replace")
return parse_proxy_uris(decoded)
@app.get("/config/{token}.yaml")
async def get_config(
token: str,
db: AsyncSession = Depends(get_db),
) -> Response:
result = await db.execute(select(Config).where(Config.token == token))
config = result.scalar_one_or_none()
if not config:
raise HTTPException(status_code=404, detail="Config not found")
result = await db.execute(
select(Subscription).where(Subscription.config_id == config.id)
)
subscriptions = result.scalars().all()
provider_proxies: dict[str, list[dict]] = {}
errors: list[str] = []
for sub in subscriptions:
try:
provider_proxies[sub.name] = await _fetch_subscription(sub.url)
sub.last_fetched_at = datetime.utcnow()
except Exception as exc:
logger.error("Failed to fetch subscription %s: %s", sub.name, exc)
errors.append(f"{sub.name}: {exc}")
try:
injected_yaml = inject_providers_for_delivery(config.base_yaml, subscriptions)
expanded = expand_config(injected_yaml, provider_proxies)
except Exception as exc:
logger.error("Config expansion failed for token (redacted): %s", exc, exc_info=True)
db.add(
ExportLog(
config_id=config.id,
node_count=0,
success=False,
error_message=str(exc),
)
)
await db.commit()
raise HTTPException(status_code=500, detail="Config expansion failed")
node_count = sum(len(p) for p in provider_proxies.values())
error_msg = "; ".join(errors) if errors else None
db.add(
ExportLog(
config_id=config.id,
node_count=node_count,
success=not bool(errors),
error_message=error_msg,
)
)
await db.commit()
return Response(content=expanded, media_type="application/x-yaml")
@app.get("/")
async def index(request: Request, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Config))
configs = result.scalars().all()
last_logs: dict[int, ExportLog | None] = {}
for cfg in configs:
log_result = await db.execute(
select(ExportLog)
.where(ExportLog.config_id == cfg.id)
.order_by(desc(ExportLog.fetched_at))
.limit(1)
)
last_logs[cfg.id] = log_result.scalar_one_or_none()
return templates.TemplateResponse(
"index.html",
{"request": request, "configs": configs, "last_logs": last_logs},
)
@app.get("/configs/new")
async def new_config_form(request: Request):
return templates.TemplateResponse("config_form.html", {"request": request, "config": None})
@app.post("/configs/new")
async def create_config(
name: str = Form(...),
base_yaml: str = Form(...),
db: AsyncSession = Depends(get_db),
):
config = Config(name=name, token=str(uuid.uuid4()), base_yaml=base_yaml)
db.add(config)
await db.commit()
await write_and_reload_mihomo(db)
return RedirectResponse(f"/configs/{config.id}", status_code=303)
@app.get("/configs/{config_id}")
async def config_detail(config_id: int, request: Request, db: AsyncSession = Depends(get_db)):
config = await db.get(Config, config_id)
if not config:
raise HTTPException(status_code=404, detail="Config not found")
result = await db.execute(
select(Subscription).where(Subscription.config_id == config_id)
)
subscriptions = result.scalars().all()
return templates.TemplateResponse(
"config_detail.html",
{"request": request, "config": config, "subscriptions": subscriptions},
)
@app.post("/configs/{config_id}")
async def update_config(
config_id: int,
base_yaml: str = Form(...),
db: AsyncSession = Depends(get_db),
):
config = await db.get(Config, config_id)
if not config:
raise HTTPException(status_code=404, detail="Config not found")
config.base_yaml = base_yaml
config.updated_at = datetime.utcnow()
await db.commit()
await write_and_reload_mihomo(db)
return RedirectResponse(f"/configs/{config_id}", status_code=303)
@app.post("/configs/{config_id}/delete")
async def delete_config(config_id: int, db: AsyncSession = Depends(get_db)):
config = await db.get(Config, config_id)
if not config:
raise HTTPException(status_code=404, detail="Config not found")
await db.delete(config)
await db.commit()
await write_and_reload_mihomo(db)
return RedirectResponse("/", status_code=303)
@app.get("/configs/{config_id}/subscriptions/new")
async def new_sub_form(config_id: int, request: Request, db: AsyncSession = Depends(get_db)):
config = await db.get(Config, config_id)
if not config:
raise HTTPException(status_code=404, detail="Config not found")
return templates.TemplateResponse(
"sub_form.html", {"request": request, "config": config}
)
@app.post("/configs/{config_id}/subscriptions/new")
async def create_subscription(
config_id: int,
name: str = Form(...),
url: str = Form(...),
db: AsyncSession = Depends(get_db),
):
config = await db.get(Config, config_id)
if not config:
raise HTTPException(status_code=404, detail="Config not found")
sub = Subscription(config_id=config_id, name=name, url=url)
db.add(sub)
await db.commit()
await write_and_reload_mihomo(db)
return RedirectResponse(f"/configs/{config_id}", status_code=303)
@app.post("/subscriptions/{sub_id}/delete")
async def delete_subscription(sub_id: int, db: AsyncSession = Depends(get_db)):
sub = await db.get(Subscription, sub_id)
if not sub:
raise HTTPException(status_code=404, detail="Subscription not found")
config_id = sub.config_id
await db.delete(sub)
await db.commit()
await write_and_reload_mihomo(db)
return RedirectResponse(f"/configs/{config_id}", status_code=303)
@app.get("/configs/{config_id}/logs")
async def config_logs(config_id: int, request: Request, db: AsyncSession = Depends(get_db)):
config = await db.get(Config, config_id)
if not config:
raise HTTPException(status_code=404, detail="Config not found")
result = await db.execute(
select(ExportLog)
.where(ExportLog.config_id == config_id)
.order_by(desc(ExportLog.fetched_at))
.limit(50)
)
logs = result.scalars().all()
return templates.TemplateResponse(
"logs.html", {"request": request, "config": config, "logs": logs}
)
@app.post("/configs/{config_id}/refresh")
async def force_refresh(config_id: int, db: AsyncSession = Depends(get_db)):
config = await db.get(Config, config_id)
if not config:
raise HTTPException(status_code=404, detail="Config not found")
result = await db.execute(
select(Subscription).where(Subscription.config_id == config_id)
)
subscriptions = result.scalars().all()
for sub in subscriptions:
try:
await mihomo_client.refresh_and_collect(sub.name, timeout=30)
sub.last_fetched_at = datetime.utcnow()
except Exception as e:
logger.error("Force-refresh failed for provider %s: %s", sub.name, e)
await db.commit()
return RedirectResponse(f"/configs/{config_id}", status_code=303)
+41
View File
@@ -0,0 +1,41 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Mihomo Expander</title>
<style>
* { box-sizing: border-box; }
body { font-family: system-ui, sans-serif; margin: 0; background: #f5f5f5; color: #222; }
header { background: #1a1a2e; color: #fff; padding: 12px 24px; display: flex; align-items: center; gap: 16px; }
header a { color: #aad4f5; text-decoration: none; font-weight: bold; }
main { max-width: 960px; margin: 24px auto; padding: 0 16px; }
table { width: 100%; border-collapse: collapse; background: #fff; border-radius: 6px; overflow: hidden; box-shadow: 0 1px 3px rgba(0,0,0,.1); }
th, td { padding: 10px 14px; text-align: left; border-bottom: 1px solid #eee; }
th { background: #f0f0f0; font-size: .85rem; text-transform: uppercase; letter-spacing: .05em; }
tr:last-child td { border-bottom: none; }
.btn { display: inline-block; padding: 7px 14px; border-radius: 4px; border: none; cursor: pointer; font-size: .9rem; text-decoration: none; }
.btn-primary { background: #2563eb; color: #fff; }
.btn-danger { background: #dc2626; color: #fff; }
.btn-sm { padding: 4px 10px; font-size: .8rem; }
form.inline { display: inline; }
.card { background: #fff; border-radius: 6px; box-shadow: 0 1px 3px rgba(0,0,0,.1); padding: 20px; margin-bottom: 20px; }
label { display: block; margin-bottom: 4px; font-weight: 500; font-size: .9rem; }
input[type=text], input[type=url], textarea { width: 100%; padding: 8px 10px; border: 1px solid #ccc; border-radius: 4px; font-size: .95rem; }
textarea { font-family: monospace; font-size: .85rem; min-height: 200px; }
.token-url { font-family: monospace; font-size: .85rem; background: #f0f4ff; padding: 6px 10px; border-radius: 4px; word-break: break-all; }
.success { color: #16a34a; }
.error { color: #dc2626; }
h1 { margin-top: 0; }
h2 { margin-top: 0; font-size: 1.1rem; color: #444; }
</style>
</head>
<body>
<header>
<a href="/">Mihomo Expander</a>
</header>
<main>
{% block content %}{% endblock %}
</main>
</body>
</html>
+65
View File
@@ -0,0 +1,65 @@
{% extends "base.html" %}
{% block content %}
<div style="display:flex; justify-content:space-between; align-items:center; margin-bottom:16px;">
<h1>{{ config.name }}</h1>
<div>
<form class="inline" method="post" action="/configs/{{ config.id }}/refresh">
<button type="submit" class="btn btn-primary">&#8635; Force Refresh</button>
</form>
<form class="inline" method="post" action="/configs/{{ config.id }}/delete"
onsubmit="return confirm('Delete this config?')">
<button type="submit" class="btn btn-danger">Delete</button>
</form>
</div>
</div>
<div class="card">
<h2>Client URL</h2>
<div class="token-url">{{ request.url.scheme }}://{{ request.url.netloc }}/config/{{ config.token }}.yaml</div>
</div>
<div class="card">
<h2>Base YAML</h2>
<form method="post" action="/configs/{{ config.id }}">
<textarea name="base_yaml" style="min-height:300px;">{{ config.base_yaml }}</textarea>
<div style="margin-top:10px;">
<button type="submit" class="btn btn-primary">Save YAML</button>
</div>
</form>
</div>
<div class="card">
<div style="display:flex; justify-content:space-between; align-items:center; margin-bottom:12px;">
<h2 style="margin:0;">Subscriptions</h2>
<a href="/configs/{{ config.id }}/subscriptions/new" class="btn btn-primary btn-sm">+ Add</a>
</div>
{% if subscriptions %}
<table>
<thead>
<tr><th>Name</th><th>URL</th><th>Last Fetched</th><th></th></tr>
</thead>
<tbody>
{% for sub in subscriptions %}
<tr>
<td>{{ sub.name }}</td>
<td style="font-family:monospace;font-size:.8rem;">{{ sub.url }}</td>
<td>{{ sub.last_fetched_at.strftime('%Y-%m-%d %H:%M') if sub.last_fetched_at else '—' }}</td>
<td>
<form class="inline" method="post" action="/subscriptions/{{ sub.id }}/delete">
<button type="submit" class="btn btn-danger btn-sm">Delete</button>
</form>
</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p style="color:#666;">No subscriptions. Add one above.</p>
{% endif %}
</div>
<div style="margin-top:8px;">
<a href="/configs/{{ config.id }}/logs" class="btn" style="background:#6b7280;color:#fff;">View Export Logs</a>
<a href="/" class="btn" style="background:#6b7280;color:#fff;margin-left:8px;">&#8592; Back</a>
</div>
{% endblock %}
+20
View File
@@ -0,0 +1,20 @@
{% extends "base.html" %}
{% block content %}
<h1>{{ "Edit Config" if config else "New Config" }}</h1>
<div class="card">
<form method="post">
{% if not config %}
<div style="margin-bottom:14px;">
<label for="name">Name</label>
<input type="text" id="name" name="name" required placeholder="My Config">
</div>
{% endif %}
<div style="margin-bottom:14px;">
<label for="base_yaml">Base YAML</label>
<textarea id="base_yaml" name="base_yaml" required>{{ config.base_yaml if config else "" }}</textarea>
</div>
<button type="submit" class="btn btn-primary">Save</button>
<a href="/" class="btn" style="background:#6b7280;color:#fff;">Cancel</a>
</form>
</div>
{% endblock %}
+50
View File
@@ -0,0 +1,50 @@
{% extends "base.html" %}
{% block content %}
<div style="display:flex; justify-content:space-between; align-items:center; margin-bottom:16px;">
<h1>Configs</h1>
<a href="/configs/new" class="btn btn-primary">+ New Config</a>
</div>
{% if configs %}
<table>
<thead>
<tr>
<th>Name</th>
<th>Client URL</th>
<th>Last Export</th>
<th>Nodes</th>
<th>Actions</th>
</tr>
</thead>
<tbody>
{% for config in configs %}
<tr>
<td><a href="/configs/{{ config.id }}">{{ config.name }}</a></td>
<td>
<span class="token-url">{{ request.url.scheme }}://{{ request.url.netloc }}/config/{{ config.token }}.yaml</span>
</td>
<td>
{% set log = last_logs[config.id] %}
{% if log %}
<span class="{{ 'success' if log.success else 'error' }}">
{{ log.fetched_at.strftime('%Y-%m-%d %H:%M') }}
</span>
{% else %}
<span style="color:#999">Never</span>
{% endif %}
</td>
<td>{{ last_logs[config.id].node_count if last_logs[config.id] else '—' }}</td>
<td>
<a href="/configs/{{ config.id }}" class="btn btn-sm btn-primary">Edit</a>
<a href="/configs/{{ config.id }}/logs" class="btn btn-sm" style="background:#6b7280;color:#fff;">Logs</a>
</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<div class="card">
<p style="color:#666; text-align:center;">No configs yet. <a href="/configs/new">Create one.</a></p>
</div>
{% endif %}
{% endblock %}
+40
View File
@@ -0,0 +1,40 @@
{% extends "base.html" %}
{% block content %}
<div style="display:flex; justify-content:space-between; align-items:center; margin-bottom:16px;">
<h1>Export Logs &#8212; {{ config.name }}</h1>
<a href="/configs/{{ config.id }}" class="btn" style="background:#6b7280;color:#fff;">&#8592; Back</a>
</div>
{% if logs %}
<table>
<thead>
<tr>
<th>Time</th>
<th>Status</th>
<th>Nodes</th>
<th>Error</th>
</tr>
</thead>
<tbody>
{% for log in logs %}
<tr>
<td>{{ log.fetched_at.strftime('%Y-%m-%d %H:%M:%S') }}</td>
<td>
{% if log.success %}
<span class="success">&#10003; OK</span>
{% else %}
<span class="error">&#10007; Failed</span>
{% endif %}
</td>
<td>{{ log.node_count }}</td>
<td style="font-size:.8rem;color:#666;">{{ log.error_message or '' }}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<div class="card">
<p style="color:#666;text-align:center;">No export logs yet.</p>
</div>
{% endif %}
{% endblock %}
+19
View File
@@ -0,0 +1,19 @@
{% extends "base.html" %}
{% block content %}
<h1>Add Subscription to "{{ config.name }}"</h1>
<div class="card">
<form method="post">
<div style="margin-bottom:14px;">
<label for="name">Provider Name</label>
<input type="text" id="name" name="name" required
placeholder="Must match proxy-provider name in base_yaml">
</div>
<div style="margin-bottom:14px;">
<label for="url">Subscription URL</label>
<input type="url" id="url" name="url" required placeholder="https://...">
</div>
<button type="submit" class="btn btn-primary">Add Subscription</button>
<a href="/configs/{{ config.id }}" class="btn" style="background:#6b7280;color:#fff;">Cancel</a>
</form>
</div>
{% endblock %}
+269
View File
@@ -0,0 +1,269 @@
import yaml
from dataclasses import dataclass
from expander import filter_proxy, expand_config, build_mihomo_config, inject_providers_for_delivery
@dataclass
class FakeSub:
name: str
url: str
PROXY_FULL = {
"name": "node1",
"type": "ss",
"server": "1.2.3.4",
"port": 443,
"password": "secret",
"cipher": "aes-256-gcm",
"alive": True, # runtime field — should be stripped
"history": [], # runtime field — should be stripped
"extra": {"foo": "bar"}, # runtime field — should be stripped
}
BASE_YAML = """
proxies: []
proxy-providers:
provider1:
type: http
url: https://example.com/sub
interval: 3600
proxy-groups:
- name: Proxy
type: select
use:
- provider1
rules:
- MATCH,DIRECT
"""
def test_filter_proxy_removes_runtime_fields():
filtered = filter_proxy(PROXY_FULL)
assert "alive" not in filtered
assert "history" not in filtered
assert "extra" not in filtered
assert filtered["name"] == "node1"
assert filtered["server"] == "1.2.3.4"
def test_expand_config_replaces_providers_with_proxies():
proxies = [
{"name": "node1", "type": "ss", "server": "1.2.3.4", "port": 443,
"password": "pwd", "cipher": "aes-256-gcm"},
]
result = expand_config(BASE_YAML, {"provider1": proxies})
cfg = yaml.safe_load(result)
assert "proxy-providers" not in cfg
assert any(p["name"] == "node1" for p in cfg["proxies"])
def test_expand_config_fills_use_in_proxy_groups():
proxies = [
{"name": "node1", "type": "ss", "server": "1.2.3.4", "port": 443,
"password": "pwd", "cipher": "aes-256-gcm"},
{"name": "node2", "type": "ss", "server": "5.6.7.8", "port": 443,
"password": "pwd", "cipher": "aes-256-gcm"},
]
result = expand_config(BASE_YAML, {"provider1": proxies})
cfg = yaml.safe_load(result)
proxy_group = next(g for g in cfg["proxy-groups"] if g["name"] == "Proxy")
assert "use" not in proxy_group
assert "node1" in proxy_group["proxies"]
assert "node2" in proxy_group["proxies"]
def test_expand_config_preserves_existing_proxies():
base = """
proxies:
- name: static-node
type: direct
proxy-providers:
p1:
type: http
url: https://example.com/sub
interval: 3600
proxy-groups: []
rules: []
"""
result = expand_config(base, {"p1": [{"name": "dynamic-node", "type": "ss",
"server": "1.2.3.4", "port": 443}]})
cfg = yaml.safe_load(result)
names = [p["name"] for p in cfg["proxies"]]
assert "static-node" in names
assert "dynamic-node" in names
def test_expand_config_empty_providers():
result = expand_config(BASE_YAML, {})
cfg = yaml.safe_load(result)
assert "proxy-providers" not in cfg
def test_build_mihomo_config_merges_providers():
yaml1 = """
proxy-providers:
p1:
type: http
url: https://a.example.com/sub
interval: 3600
health-check:
enable: true
url: http://www.gstatic.com/generate_204
interval: 300
"""
yaml2 = """
proxy-providers:
p2:
type: http
url: https://b.example.com/sub
interval: 3600
"""
result = build_mihomo_config([yaml1, yaml2], "mysecret")
cfg = yaml.safe_load(result)
assert cfg["external-controller"] == "0.0.0.0:9090"
assert cfg["secret"] == "mysecret"
assert cfg["mixed-port"] == 7890
assert cfg["allow-lan"] is False
assert "p1" in cfg["proxy-providers"]
assert "p2" in cfg["proxy-providers"]
assert cfg["proxy-providers"]["p1"]["health-check"]["enable"] is True
def test_build_mihomo_config_deduplicates_by_name():
yaml1 = """
proxy-providers:
p1:
type: http
url: https://original.example.com/sub
interval: 3600
"""
yaml2 = """
proxy-providers:
p1:
type: http
url: https://duplicate.example.com/sub
interval: 3600
"""
result = build_mihomo_config([yaml1, yaml2], "s")
cfg = yaml.safe_load(result)
assert cfg["proxy-providers"]["p1"]["url"] == "https://original.example.com/sub"
def test_build_mihomo_config_no_providers():
result = build_mihomo_config(["proxies: []"], "s")
cfg = yaml.safe_load(result)
assert "proxy-providers" not in cfg
def test_expand_config_skips_proxies_without_name():
result = expand_config(BASE_YAML, {"provider1": [
{"type": "ss", "server": "1.2.3.4", "port": 443}, # no "name"
{"name": "valid-node", "type": "ss", "server": "5.6.7.8", "port": 443},
]})
cfg = yaml.safe_load(result)
names = [p["name"] for p in cfg["proxies"]]
assert "valid-node" in names
assert len(names) == 1
def test_build_mihomo_config_skips_malformed_yaml():
valid_yaml = """
proxy-providers:
p1:
type: http
url: https://example.com/sub
interval: 3600
"""
result = build_mihomo_config([valid_yaml, ":: invalid yaml ::: {{{"], "s")
cfg = yaml.safe_load(result)
assert "p1" in cfg["proxy-providers"]
# --- inject_providers_for_delivery tests ---
def test_inject_providers_returns_unchanged_when_no_subs():
base = "proxies: []\nrules:\n - MATCH,DIRECT\n"
result = inject_providers_for_delivery(base, [])
assert result == base
def test_inject_providers_adds_entry_for_missing_sub():
base = "proxies: []\nproxy-groups: []\nrules:\n - MATCH,DIRECT\n"
sub = FakeSub(name="mysub", url="https://example.com/sub")
result = inject_providers_for_delivery(base, [sub])
cfg = yaml.safe_load(result)
assert "mysub" in cfg["proxy-providers"]
entry = cfg["proxy-providers"]["mysub"]
assert entry["type"] == "http"
assert entry["url"] == "https://example.com/sub"
assert entry["interval"] == 3600
assert entry["health-check"]["enable"] is True
def test_inject_providers_adds_use_to_fallback_and_url_test_groups():
base = (
"proxies: []\n"
"proxy-groups:\n"
" - name: Auto\n"
" type: url-test\n"
" proxies: []\n"
" - name: FB\n"
" type: fallback\n"
" proxies: []\n"
" - name: LB\n"
" type: load-balance\n"
" proxies: []\n"
" - name: Manual\n"
" type: select\n"
" proxies: []\n"
"rules:\n"
" - MATCH,DIRECT\n"
)
sub = FakeSub(name="newsub", url="https://example.com/sub")
result = inject_providers_for_delivery(base, [sub])
cfg = yaml.safe_load(result)
groups = {g["name"]: g for g in cfg["proxy-groups"]}
assert groups["Auto"]["use"] == ["newsub"]
assert groups["FB"]["use"] == ["newsub"]
assert groups["LB"]["use"] == ["newsub"]
assert "use" not in groups["Manual"]
def test_inject_providers_does_not_add_use_to_groups_that_already_have_use():
base = (
"proxies: []\n"
"proxy-groups:\n"
" - name: Auto\n"
" type: url-test\n"
" use:\n"
" - existing-provider\n"
"rules:\n"
" - MATCH,DIRECT\n"
)
sub = FakeSub(name="newsub", url="https://example.com/sub")
result = inject_providers_for_delivery(base, [sub])
cfg = yaml.safe_load(result)
groups = {g["name"]: g for g in cfg["proxy-groups"]}
assert groups["Auto"]["use"] == ["existing-provider"]
def test_inject_providers_skips_sub_already_in_base_yaml():
base = (
"proxy-providers:\n"
" existing-sub:\n"
" type: http\n"
" url: https://original.example.com/sub\n"
" interval: 3600\n"
"proxies: []\n"
"proxy-groups: []\n"
"rules:\n"
" - MATCH,DIRECT\n"
)
sub = FakeSub(name="existing-sub", url="https://different.example.com/sub")
result = inject_providers_for_delivery(base, [sub])
# Should return unchanged since no new entries
assert result == base
+290
View File
@@ -0,0 +1,290 @@
import uuid
import pytest
import pytest_asyncio
from unittest.mock import AsyncMock, patch
from httpx import AsyncClient, ASGITransport
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from models import Base, Config, Subscription, ExportLog
from main import app, get_db
@pytest_asyncio.fixture
async def db_engine():
engine = create_async_engine("sqlite+aiosqlite:///:memory:")
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield engine
await engine.dispose()
@pytest_asyncio.fixture
async def db_session(db_engine):
Session = async_sessionmaker(db_engine, expire_on_commit=False)
async with Session() as session:
yield session
@pytest_asyncio.fixture
async def http_client(db_session):
async def override_get_db():
yield db_session
app.dependency_overrides[get_db] = override_get_db
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
yield client
app.dependency_overrides.clear()
async def test_get_config_not_found(http_client):
resp = await http_client.get("/config/nonexistent-token.yaml")
assert resp.status_code == 404
async def test_get_config_returns_yaml(http_client, db_session):
token = str(uuid.uuid4())
config = Config(
name="test",
token=token,
base_yaml="proxies: []\nproxy-groups: []\nrules:\n - MATCH,DIRECT\n",
)
db_session.add(config)
await db_session.commit()
with patch("main.mihomo_client") as mock_mc:
mock_mc.refresh_and_collect = AsyncMock(return_value=[])
resp = await http_client.get(f"/config/{token}.yaml")
assert resp.status_code == 200
assert "proxies" in resp.text
async def test_get_config_writes_export_log(http_client, db_session):
from sqlalchemy import select
token = str(uuid.uuid4())
config = Config(
name="test",
token=token,
base_yaml="proxies: []\nproxy-groups: []\nrules: []\n",
)
db_session.add(config)
await db_session.commit()
with patch("main.mihomo_client") as mock_mc:
mock_mc.refresh_and_collect = AsyncMock(return_value=[])
await http_client.get(f"/config/{token}.yaml")
result = await db_session.execute(
select(ExportLog).where(ExportLog.config_id == config.id)
)
logs = result.scalars().all()
assert len(logs) == 1
assert logs[0].success is True
async def test_get_config_with_subscription_expands_nodes(http_client, db_session):
token = str(uuid.uuid4())
config = Config(
name="test",
token=token,
base_yaml=(
"proxies: []\n"
"proxy-providers:\n"
" myprovider:\n"
" type: http\n"
" url: https://example.com/sub\n"
" interval: 3600\n"
"proxy-groups:\n"
" - name: Proxy\n"
" type: select\n"
" use:\n"
" - myprovider\n"
"rules:\n"
" - MATCH,DIRECT\n"
),
)
db_session.add(config)
await db_session.flush()
sub = Subscription(config_id=config.id, name="myprovider", url="https://example.com/sub")
db_session.add(sub)
await db_session.commit()
fake_proxies = [
{"name": "node1", "type": "ss", "server": "1.2.3.4", "port": 443,
"password": "pwd", "cipher": "aes-256-gcm", "alive": True},
]
with patch("main.mihomo_client") as mock_mc:
mock_mc.refresh_and_collect = AsyncMock(return_value=fake_proxies)
resp = await http_client.get(f"/config/{token}.yaml")
assert resp.status_code == 200
assert "node1" in resp.text
assert "proxy-providers" not in resp.text
assert "alive" not in resp.text
# --- Admin UI tests ---
async def test_index_returns_200(http_client):
resp = await http_client.get("/")
assert resp.status_code == 200
async def test_create_config_redirects(http_client, db_session):
from sqlalchemy import select
with patch("main.write_and_reload_mihomo", new_callable=AsyncMock):
resp = await http_client.post(
"/configs/new",
data={"name": "MyConfig", "base_yaml": "proxies: []\nrules: []\nproxy-groups: []\n"},
follow_redirects=False,
)
assert resp.status_code == 303
result = await db_session.execute(select(Config).where(Config.name == "MyConfig"))
config = result.scalar_one_or_none()
assert config is not None
assert config.token is not None
async def test_config_detail_returns_200(http_client, db_session):
config = Config(name="c", token=str(uuid.uuid4()), base_yaml="proxies: []\n")
db_session.add(config)
await db_session.commit()
resp = await http_client.get(f"/configs/{config.id}")
assert resp.status_code == 200
async def test_config_detail_404_for_missing(http_client):
resp = await http_client.get("/configs/99999")
assert resp.status_code == 404
async def test_update_config_base_yaml(http_client, db_session):
config = Config(name="c", token=str(uuid.uuid4()), base_yaml="proxies: []\n")
db_session.add(config)
await db_session.commit()
with patch("main.write_and_reload_mihomo", new_callable=AsyncMock):
resp = await http_client.post(
f"/configs/{config.id}",
data={"base_yaml": "proxies: []\nrules: []\n"},
follow_redirects=False,
)
assert resp.status_code == 303
await db_session.refresh(config)
assert "rules" in config.base_yaml
async def test_delete_config(http_client, db_session):
from sqlalchemy import select
config = Config(name="c", token=str(uuid.uuid4()), base_yaml="proxies: []\n")
db_session.add(config)
await db_session.flush()
sub = Subscription(config_id=config.id, name="s", url="https://example.com/sub")
log = ExportLog(config_id=config.id, node_count=1, success=True)
db_session.add(sub)
db_session.add(log)
await db_session.commit()
config_id = config.id
sub_id = sub.id
log_id = log.id
with patch("main.write_and_reload_mihomo", new_callable=AsyncMock):
resp = await http_client.post(f"/configs/{config_id}/delete", follow_redirects=False)
assert resp.status_code == 303
result = await db_session.execute(select(Config).where(Config.id == config_id))
assert result.scalar_one_or_none() is None
assert (await db_session.get(Subscription, sub_id)) is None
assert (await db_session.get(ExportLog, log_id)) is None
async def test_add_subscription(http_client, db_session):
from sqlalchemy import select
config = Config(name="c", token=str(uuid.uuid4()), base_yaml="proxies: []\n")
db_session.add(config)
await db_session.commit()
with patch("main.write_and_reload_mihomo", new_callable=AsyncMock):
resp = await http_client.post(
f"/configs/{config.id}/subscriptions/new",
data={"name": "mysub", "url": "https://example.com/sub"},
follow_redirects=False,
)
assert resp.status_code == 303
result = await db_session.execute(
select(Subscription).where(Subscription.config_id == config.id)
)
subs = result.scalars().all()
assert len(subs) == 1
assert subs[0].name == "mysub"
async def test_delete_subscription(http_client, db_session):
from sqlalchemy import select
config = Config(name="c", token=str(uuid.uuid4()), base_yaml="proxies: []\n")
db_session.add(config)
await db_session.flush()
sub = Subscription(config_id=config.id, name="s", url="https://example.com/sub")
db_session.add(sub)
await db_session.commit()
sub_id = sub.id
with patch("main.write_and_reload_mihomo", new_callable=AsyncMock):
resp = await http_client.post(f"/subscriptions/{sub_id}/delete", follow_redirects=False)
assert resp.status_code == 303
result = await db_session.execute(select(Subscription).where(Subscription.id == sub_id))
assert result.scalar_one_or_none() is None
async def test_logs_page_returns_200(http_client, db_session):
config = Config(name="c", token=str(uuid.uuid4()), base_yaml="proxies: []\n")
db_session.add(config)
await db_session.flush()
log = ExportLog(config_id=config.id, node_count=3, success=True)
db_session.add(log)
await db_session.commit()
resp = await http_client.get(f"/configs/{config.id}/logs")
assert resp.status_code == 200
async def test_force_refresh(http_client, db_session):
from sqlalchemy import select
config = Config(name="c", token=str(uuid.uuid4()), base_yaml="proxies: []\n")
db_session.add(config)
await db_session.flush()
sub = Subscription(config_id=config.id, name="p", url="https://example.com/sub")
db_session.add(sub)
await db_session.commit()
with patch("main.mihomo_client") as mock_mc:
mock_mc.refresh_and_collect = AsyncMock(return_value=[])
resp = await http_client.post(f"/configs/{config.id}/refresh", follow_redirects=False)
assert resp.status_code == 303
assert mock_mc.refresh_and_collect.await_count == 1
logs = (await db_session.execute(select(ExportLog))).scalars().all()
assert logs == []
+198
View File
@@ -0,0 +1,198 @@
import base64
import json
from urllib.parse import urlparse, parse_qs, unquote
def parse_proxy_uris(text: str) -> list[dict]:
"""Parse newline-separated proxy URIs into Mihomo proxy dicts."""
result = []
for line in text.splitlines():
line = line.strip()
if not line or "://" not in line:
continue
try:
proxy = _parse_uri(line)
if proxy:
result.append(proxy)
except Exception:
pass
return result
def _parse_uri(uri: str) -> dict | None:
scheme = uri.split("://")[0].lower()
if scheme == "vless":
return _parse_vless(uri)
if scheme == "vmess":
return _parse_vmess(uri)
if scheme in ("ss", "shadowsocks"):
return _parse_ss(uri)
if scheme == "trojan":
return _parse_trojan(uri)
if scheme in ("hysteria2", "hy2"):
return _parse_hysteria2(uri)
return None
def _name(parsed) -> str:
return unquote(parsed.fragment) if parsed.fragment else f"{parsed.hostname}:{parsed.port}"
def _parse_vless(uri: str) -> dict:
p = urlparse(uri)
qs = parse_qs(p.query)
q = {k: v[0] for k, v in qs.items()}
proxy: dict = {
"name": _name(p),
"type": "vless",
"server": p.hostname,
"port": p.port,
"uuid": p.username,
"udp": True,
}
net = q.get("type", "tcp")
security = q.get("security", "")
if security in ("tls", "reality"):
proxy["tls"] = True
if sni := q.get("sni") or q.get("host"):
proxy["servername"] = sni
if fp := q.get("fp"):
proxy["client-fingerprint"] = fp
if security == "reality":
proxy["reality-opts"] = {
"public-key": q.get("pbk", ""),
"short-id": q.get("sid", ""),
}
if flow := q.get("flow"):
proxy["flow"] = flow
if net == "ws":
proxy["network"] = "ws"
proxy["ws-opts"] = {
"path": q.get("path", "/"),
"headers": {"Host": q.get("host", p.hostname)},
}
elif net == "grpc":
proxy["network"] = "grpc"
proxy["grpc-opts"] = {"grpc-service-name": q.get("serviceName", "")}
elif net == "h2":
proxy["network"] = "h2"
proxy["h2-opts"] = {
"host": [q.get("host", p.hostname)],
"path": q.get("path", "/"),
}
return proxy
def _parse_vmess(uri: str) -> dict:
b64 = uri[len("vmess://"):]
b64 += "=" * (-len(b64) % 4)
data = json.loads(base64.b64decode(b64).decode())
proxy: dict = {
"name": data.get("ps") or f"{data.get('add')}:{data.get('port')}",
"type": "vmess",
"server": data["add"],
"port": int(data["port"]),
"uuid": data["id"],
"alterId": int(data.get("aid", 0)),
"cipher": data.get("scy") or data.get("type") or "auto",
"udp": True,
}
net = data.get("net", "tcp")
tls = data.get("tls", "") == "tls"
if tls:
proxy["tls"] = True
if sni := data.get("sni") or data.get("host"):
proxy["servername"] = sni
if net == "ws":
proxy["network"] = "ws"
proxy["ws-opts"] = {
"path": data.get("path", "/"),
"headers": {"Host": data.get("host", data["add"])},
}
elif net == "grpc":
proxy["network"] = "grpc"
proxy["grpc-opts"] = {"grpc-service-name": data.get("path", "")}
return proxy
def _parse_ss(uri: str) -> dict:
p = urlparse(uri)
name = _name(p)
if p.username and p.hostname:
# ss://cipher:password@host:port#name
cipher, password = p.username, p.password or ""
else:
# ss://base64(cipher:password)@host:port or ss://base64(cipher:password@host:port)
raw = uri[len("ss://"):]
raw = raw.split("#")[0]
if "@" in raw:
user_info = raw.split("@")[0]
else:
user_info = raw
decoded = base64.b64decode(user_info + "==").decode(errors="replace")
if "@" in decoded:
# base64(cipher:password@host:port)
parts = decoded.rsplit("@", 1)
cipher, password = parts[0].split(":", 1)
host_port = parts[1].rsplit(":", 1)
p = p._replace(hostname=host_port[0], port=int(host_port[1]) if len(host_port) > 1 else p.port)
else:
cipher, password = decoded.split(":", 1)
return {
"name": name,
"type": "ss",
"server": p.hostname,
"port": p.port,
"cipher": cipher,
"password": password,
"udp": True,
}
def _parse_trojan(uri: str) -> dict:
p = urlparse(uri)
qs = parse_qs(p.query)
q = {k: v[0] for k, v in qs.items()}
proxy: dict = {
"name": _name(p),
"type": "trojan",
"server": p.hostname,
"port": p.port,
"password": p.username,
"udp": True,
"tls": True,
}
if sni := q.get("sni") or q.get("peer"):
proxy["servername"] = sni
if fp := q.get("fp"):
proxy["client-fingerprint"] = fp
net = q.get("type", "tcp")
if net == "ws":
proxy["network"] = "ws"
proxy["ws-opts"] = {
"path": q.get("path", "/"),
"headers": {"Host": q.get("host", p.hostname)},
}
elif net == "grpc":
proxy["network"] = "grpc"
proxy["grpc-opts"] = {"grpc-service-name": q.get("serviceName", "")}
return proxy
def _parse_hysteria2(uri: str) -> dict:
p = urlparse(uri)
qs = parse_qs(p.query)
q = {k: v[0] for k, v in qs.items()}
proxy: dict = {
"name": _name(p),
"type": "hysteria2",
"server": p.hostname,
"port": p.port,
"password": p.username or p.password or "",
"udp": True,
}
if sni := q.get("sni"):
proxy["sni"] = sni
if q.get("insecure") == "1":
proxy["skip-cert-verify"] = True
return proxy
+2 -2
View File
@@ -8,14 +8,14 @@ volumes:
services:
mihomo:
image: ghcr.io/metacubex/mihomo:latest
image: metacubex/mihomo:latest
volumes:
- ./data/mihomo:/root/.config/mihomo
networks:
- proxy_net
restart: unless-stopped
healthcheck:
test: ["CMD", "wget", "-q", "-O-", "http://localhost:9090/version"]
test: ["CMD", "wget", "-q", "-O-", "--header=Authorization: Bearer changeme", "http://localhost:9090/version"]
interval: 5s
timeout: 3s
retries: 12