Files
mihomo_injecter/app/uri_parser.py
T
urbnywrt 2e1c488bb9 feat: direct subscription fetching, URI parser, FLClashX compatibility
- Fetch subscriptions directly via httpx (mihomo UA → YAML, xray-checker → base64 URI list)
- Add uri_parser.py: vless/vmess/ss/trojan/hysteria2 URI → Mihomo proxy dicts
- Fix YAML quoting for Go parser (strings starting with special chars)
- Remove hidden:true and proxy-providers from delivered configs
- Inject all service groups into GLOBAL proxies for FLClashX group discovery
- Strip placeholder proxy-providers (e.g. "subscription") not in DB
- Fix Mihomo healthcheck: add Bearer auth header
- Add README

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-15 06:04:03 +03:00

199 lines
5.8 KiB
Python

import base64
import json
from urllib.parse import urlparse, parse_qs, unquote
def parse_proxy_uris(text: str) -> list[dict]:
"""Parse newline-separated proxy URIs into Mihomo proxy dicts."""
result = []
for line in text.splitlines():
line = line.strip()
if not line or "://" not in line:
continue
try:
proxy = _parse_uri(line)
if proxy:
result.append(proxy)
except Exception:
pass
return result
def _parse_uri(uri: str) -> dict | None:
scheme = uri.split("://")[0].lower()
if scheme == "vless":
return _parse_vless(uri)
if scheme == "vmess":
return _parse_vmess(uri)
if scheme in ("ss", "shadowsocks"):
return _parse_ss(uri)
if scheme == "trojan":
return _parse_trojan(uri)
if scheme in ("hysteria2", "hy2"):
return _parse_hysteria2(uri)
return None
def _name(parsed) -> str:
return unquote(parsed.fragment) if parsed.fragment else f"{parsed.hostname}:{parsed.port}"
def _parse_vless(uri: str) -> dict:
p = urlparse(uri)
qs = parse_qs(p.query)
q = {k: v[0] for k, v in qs.items()}
proxy: dict = {
"name": _name(p),
"type": "vless",
"server": p.hostname,
"port": p.port,
"uuid": p.username,
"udp": True,
}
net = q.get("type", "tcp")
security = q.get("security", "")
if security in ("tls", "reality"):
proxy["tls"] = True
if sni := q.get("sni") or q.get("host"):
proxy["servername"] = sni
if fp := q.get("fp"):
proxy["client-fingerprint"] = fp
if security == "reality":
proxy["reality-opts"] = {
"public-key": q.get("pbk", ""),
"short-id": q.get("sid", ""),
}
if flow := q.get("flow"):
proxy["flow"] = flow
if net == "ws":
proxy["network"] = "ws"
proxy["ws-opts"] = {
"path": q.get("path", "/"),
"headers": {"Host": q.get("host", p.hostname)},
}
elif net == "grpc":
proxy["network"] = "grpc"
proxy["grpc-opts"] = {"grpc-service-name": q.get("serviceName", "")}
elif net == "h2":
proxy["network"] = "h2"
proxy["h2-opts"] = {
"host": [q.get("host", p.hostname)],
"path": q.get("path", "/"),
}
return proxy
def _parse_vmess(uri: str) -> dict:
b64 = uri[len("vmess://"):]
b64 += "=" * (-len(b64) % 4)
data = json.loads(base64.b64decode(b64).decode())
proxy: dict = {
"name": data.get("ps") or f"{data.get('add')}:{data.get('port')}",
"type": "vmess",
"server": data["add"],
"port": int(data["port"]),
"uuid": data["id"],
"alterId": int(data.get("aid", 0)),
"cipher": data.get("scy") or data.get("type") or "auto",
"udp": True,
}
net = data.get("net", "tcp")
tls = data.get("tls", "") == "tls"
if tls:
proxy["tls"] = True
if sni := data.get("sni") or data.get("host"):
proxy["servername"] = sni
if net == "ws":
proxy["network"] = "ws"
proxy["ws-opts"] = {
"path": data.get("path", "/"),
"headers": {"Host": data.get("host", data["add"])},
}
elif net == "grpc":
proxy["network"] = "grpc"
proxy["grpc-opts"] = {"grpc-service-name": data.get("path", "")}
return proxy
def _parse_ss(uri: str) -> dict:
p = urlparse(uri)
name = _name(p)
if p.username and p.hostname:
# ss://cipher:password@host:port#name
cipher, password = p.username, p.password or ""
else:
# ss://base64(cipher:password)@host:port or ss://base64(cipher:password@host:port)
raw = uri[len("ss://"):]
raw = raw.split("#")[0]
if "@" in raw:
user_info = raw.split("@")[0]
else:
user_info = raw
decoded = base64.b64decode(user_info + "==").decode(errors="replace")
if "@" in decoded:
# base64(cipher:password@host:port)
parts = decoded.rsplit("@", 1)
cipher, password = parts[0].split(":", 1)
host_port = parts[1].rsplit(":", 1)
p = p._replace(hostname=host_port[0], port=int(host_port[1]) if len(host_port) > 1 else p.port)
else:
cipher, password = decoded.split(":", 1)
return {
"name": name,
"type": "ss",
"server": p.hostname,
"port": p.port,
"cipher": cipher,
"password": password,
"udp": True,
}
def _parse_trojan(uri: str) -> dict:
p = urlparse(uri)
qs = parse_qs(p.query)
q = {k: v[0] for k, v in qs.items()}
proxy: dict = {
"name": _name(p),
"type": "trojan",
"server": p.hostname,
"port": p.port,
"password": p.username,
"udp": True,
"tls": True,
}
if sni := q.get("sni") or q.get("peer"):
proxy["servername"] = sni
if fp := q.get("fp"):
proxy["client-fingerprint"] = fp
net = q.get("type", "tcp")
if net == "ws":
proxy["network"] = "ws"
proxy["ws-opts"] = {
"path": q.get("path", "/"),
"headers": {"Host": q.get("host", p.hostname)},
}
elif net == "grpc":
proxy["network"] = "grpc"
proxy["grpc-opts"] = {"grpc-service-name": q.get("serviceName", "")}
return proxy
def _parse_hysteria2(uri: str) -> dict:
p = urlparse(uri)
qs = parse_qs(p.query)
q = {k: v[0] for k, v in qs.items()}
proxy: dict = {
"name": _name(p),
"type": "hysteria2",
"server": p.hostname,
"port": p.port,
"password": p.username or p.password or "",
"udp": True,
}
if sni := q.get("sni"):
proxy["sni"] = sni
if q.get("insecure") == "1":
proxy["skip-cert-verify"] = True
return proxy