from __future__ import annotations from dataclasses import dataclass from typing import Any, Optional import httpx @dataclass(frozen=True) class MediaMTXClient: api_url: str username: Optional[str] = None password: Optional[str] = None def _auth(self) -> Optional[tuple[str, str]]: if self.username and self.password: return (self.username, self.password) return None async def list_paths_status(self) -> dict[str, Any]: async with httpx.AsyncClient(timeout=5) as client: r = await client.get(f"{self.api_url}/v3/paths/list", auth=self._auth()) r.raise_for_status() return r.json() async def set_recording_bulk(self, names: list[str], enabled: bool) -> dict[str, Any]: payload = {"paths": {name: {"record": enabled} for name in names}} async with httpx.AsyncClient(timeout=8) as client: r = await client.post( f"{self.api_url}/v3/config/paths/patch", json=payload, auth=self._auth(), ) if r.status_code != 404: r.raise_for_status() return r.json() # Backward-compatible fallback for MediaMTX versions that don't support bulk patch. results: dict[str, Any] = {"fallback": []} async with httpx.AsyncClient(timeout=10) as client: for name in names: rr = await client.patch( f"{self.api_url}/v3/config/paths/patch/{name}", json={"record": enabled}, auth=self._auth(), ) if rr.status_code == 404: rr = await client.post( f"{self.api_url}/v3/config/paths/add/{name}", json={"record": enabled}, auth=self._auth(), ) rr.raise_for_status() results["fallback"].append({"name": name, "status": rr.status_code}) return results async def upsert_paths_sources_bulk(self, sources: dict[str, str]) -> dict[str, Any]: payload = {"paths": {name: {"source": url} for name, url in sources.items()}} async with httpx.AsyncClient(timeout=10) as client: r = await client.post( f"{self.api_url}/v3/config/paths/patch", json=payload, auth=self._auth(), ) if r.status_code != 404: r.raise_for_status() return r.json() results: dict[str, Any] = {"fallback": []} async with httpx.AsyncClient(timeout=10) as client: for name, url in sources.items(): rr = await client.post( f"{self.api_url}/v3/config/paths/add/{name}", json={"source": url}, auth=self._auth(), ) if rr.status_code == 409: rr = await client.patch( f"{self.api_url}/v3/config/paths/patch/{name}", json={"source": url}, auth=self._auth(), ) rr.raise_for_status() results["fallback"].append({"name": name, "status": rr.status_code}) return results async def delete_path(self, name: str) -> None: async with httpx.AsyncClient(timeout=8) as client: r = await client.delete( f"{self.api_url}/v3/config/paths/delete/{name}", auth=self._auth(), ) if r.status_code in (404, 410): return if r.status_code == 405: rr = await client.post( f"{self.api_url}/v3/config/paths/patch", json={"paths": {name: {"source": ""}}}, auth=self._auth(), ) rr.raise_for_status() return r.raise_for_status()