import asyncio import logging import httpx logger = logging.getLogger(__name__) class MihomoClient: def __init__(self, base_url: str, secret: str): self._base_url = base_url.rstrip("/") self._headers = {"Authorization": f"Bearer {secret}"} if secret else {} async def trigger_provider_refresh(self, name: str) -> None: async with httpx.AsyncClient() as c: resp = await c.put( f"{self._base_url}/providers/proxies/{name}", headers=self._headers, timeout=10.0, ) resp.raise_for_status() async def get_provider(self, name: str) -> dict: async with httpx.AsyncClient() as c: resp = await c.get( f"{self._base_url}/providers/proxies/{name}", headers=self._headers, timeout=10.0, ) resp.raise_for_status() return resp.json() async def refresh_and_collect(self, name: str, timeout: int = 30) -> list[dict]: initial = await self.get_provider(name) initial_ts = initial.get("updatedAt", "") await self.trigger_provider_refresh(name) loop = asyncio.get_event_loop() deadline = loop.time() + timeout while loop.time() < deadline: await asyncio.sleep(1) data = await self.get_provider(name) if data.get("updatedAt", "") != initial_ts: return data.get("proxies", []) raise TimeoutError(f"Provider {name!r} did not refresh within {timeout}s") async def reload_config(self) -> None: async with httpx.AsyncClient() as c: resp = await c.put( f"{self._base_url}/configs", json={}, headers=self._headers, timeout=10.0, ) resp.raise_for_status() async def wait_ready(self, retries: int = 60) -> None: for i in range(retries): try: async with httpx.AsyncClient() as c: resp = await c.get(f"{self._base_url}/version", timeout=3.0) resp.raise_for_status() return except Exception: logger.info("Waiting for Mihomo... (%d/%d)", i + 1, retries) await asyncio.sleep(1) raise RuntimeError("Mihomo did not become available")