Files
mihomo_injecter/app/mihomo.py
T

71 lines
2.4 KiB
Python

import asyncio
import logging
import httpx
logger = logging.getLogger(__name__)
class MihomoClient:
def __init__(self, base_url: str, secret: str):
self._base_url = base_url.rstrip("/")
self._headers = {"Authorization": f"Bearer {secret}"} if secret else {}
async def trigger_provider_refresh(self, name: str) -> None:
async with httpx.AsyncClient() as c:
resp = await c.put(
f"{self._base_url}/providers/proxies/{name}",
headers=self._headers,
timeout=10.0,
)
resp.raise_for_status()
async def get_provider(self, name: str) -> dict:
async with httpx.AsyncClient() as c:
resp = await c.get(
f"{self._base_url}/providers/proxies/{name}",
headers=self._headers,
timeout=10.0,
)
resp.raise_for_status()
return resp.json()
async def refresh_and_collect(self, name: str, timeout: int = 30) -> list[dict]:
initial = await self.get_provider(name)
initial_ts = initial.get("updatedAt", "")
await self.trigger_provider_refresh(name)
loop = asyncio.get_running_loop()
deadline = loop.time() + timeout
while loop.time() < deadline:
await asyncio.sleep(1)
data = await self.get_provider(name)
new_ts = data.get("updatedAt")
if new_ts is not None and new_ts != initial_ts:
return data.get("proxies", [])
raise TimeoutError(f"Provider {name!r} did not refresh within {timeout}s")
async def reload_config(self) -> None:
async with httpx.AsyncClient() as c:
resp = await c.put(
f"{self._base_url}/configs",
json={},
headers=self._headers,
timeout=10.0,
)
resp.raise_for_status()
async def wait_ready(self, retries: int = 60) -> None:
for i in range(retries):
try:
async with httpx.AsyncClient() as c:
resp = await c.get(f"{self._base_url}/version", timeout=3.0, headers=self._headers)
resp.raise_for_status()
return
except Exception:
logger.info("Waiting for Mihomo... (%d/%d)", i + 1, retries)
await asyncio.sleep(1)
raise RuntimeError("Mihomo did not become available")