from __future__ import annotations import asyncio import logging import subprocess from pathlib import Path from typing import Optional import httpx import yaml from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from .config_store import default_store from .mediamtx_client import MediaMTXClient from .models import ( AppConfig, Camera, MediaMTXAddCameraRequest, MediaMTXCamera, MediaMTXConfigView, RecordingToggle, ScheduleUpdate, SchedulerEnabled, ) from .recordings import list_recordings from .scheduler import Scheduler def _cors_origins() -> list[str]: return ["http://localhost:5173"] logger = logging.getLogger("ipcam_dashboard") PROJECT_ROOT = Path(__file__).resolve().parents[2] DEFAULT_LOCAL_RECORDINGS_DIR = PROJECT_ROOT / "mediamtx" / "recordings" MEDIAMTX_YML_PATH = PROJECT_ROOT / "mediamtx" / "mediamtx.yml" MEDIAMTX_COMPOSE_PATH = PROJECT_ROOT / "mediamtx" / "docker-compose.yml" app = FastAPI(title="IPCam Dashboard API") app.add_middleware( CORSMiddleware, allow_origins=_cors_origins(), allow_credentials=True, allow_methods=["*"] , allow_headers=["*"] , ) store = default_store() def _extract_port(address: str, fallback: int) -> int: if not address: return fallback text = str(address).strip() if text.startswith(":"): text = text[1:] if ":" in text: text = text.rsplit(":", 1)[-1] try: value = int(text) return value if 1 <= value <= 65535 else fallback except ValueError: return fallback def _load_mediamtx_yml() -> dict: if not MEDIAMTX_YML_PATH.exists(): raise HTTPException(status_code=500, detail="mediamtx_yml_not_found") raw = MEDIAMTX_YML_PATH.read_text(encoding="utf-8") return yaml.safe_load(raw) or {} def _save_mediamtx_yml(data: dict) -> None: MEDIAMTX_YML_PATH.write_text( yaml.safe_dump(data, sort_keys=False, allow_unicode=False), encoding="utf-8", ) def _build_mediamtx_view(data: dict) -> MediaMTXConfigView: api_port = _extract_port(str(data.get("apiAddress", ":9997")), 9997) webrtc_port = _extract_port(str(data.get("webrtcAddress", ":8889")), 8889) hosts = data.get("webrtcAdditionalHosts") or [] host = hosts[0] if isinstance(hosts, list) and hosts else "127.0.0.1" path_defaults = data.get("pathDefaults") or {} record_enabled = bool(path_defaults.get("record", False)) cameras: list[MediaMTXCamera] = [] paths = data.get("paths") or {} if isinstance(paths, dict): for name, cfg in paths.items(): if not isinstance(cfg, dict): continue source = cfg.get("source") if isinstance(source, str) and source.strip(): cameras.append(MediaMTXCamera(name=str(name), rtsp_url=source)) cameras.sort(key=lambda x: x.name) return MediaMTXConfigView( api_url=f"http://127.0.0.1:{api_port}", webrtc_url=f"http://{host}:{webrtc_port}", record_enabled=record_enabled, cameras=cameras, ) async def _sync_app_config_from_mediamtx() -> AppConfig: cfg = await store.load() data = _load_mediamtx_yml() view = _build_mediamtx_view(data) cfg.mediamtx_api_url = view.api_url cfg.mediamtx_webrtc_url = view.webrtc_url cfg.cameras = [Camera(name=c.name, rtsp_url=c.rtsp_url) for c in view.cameras] await store.save(cfg) return cfg def _restart_mediamtx() -> dict: cmds = [ ["docker", "compose", "-f", str(MEDIAMTX_COMPOSE_PATH), "restart", "mediamtx"], ["docker-compose", "-f", str(MEDIAMTX_COMPOSE_PATH), "restart", "mediamtx"], ] last_err = "" for cmd in cmds: try: proc = subprocess.run( cmd, cwd=str(MEDIAMTX_COMPOSE_PATH.parent), capture_output=True, text=True, timeout=40, check=False, ) if proc.returncode == 0: return {"ok": True, "output": proc.stdout.strip()} last_err = (proc.stderr or proc.stdout or "").strip() except FileNotFoundError: last_err = "docker_command_not_found" except subprocess.TimeoutExpired: last_err = "docker_restart_timeout" raise HTTPException(status_code=500, detail=f"mediamtx_restart_failed: {last_err}") async def _apply_recording(enabled: bool) -> None: cfg = await store.load() try: paths = await MediaMTXClient( api_url=cfg.mediamtx_api_url, username=cfg.mediamtx_api_user, password=cfg.mediamtx_api_pass, ).list_paths_status() names = [it.get("name") for it in (paths.get("items") or []) if isinstance(it, dict) and it.get("name")] except Exception: names = [c.name for c in cfg.cameras] if not names: return client = MediaMTXClient( api_url=cfg.mediamtx_api_url, username=cfg.mediamtx_api_user, password=cfg.mediamtx_api_pass, ) await client.set_recording_bulk(names, enabled) scheduler = Scheduler(apply=_apply_recording) async def _scheduler_loop() -> None: while True: try: cfg = await store.load() await scheduler.tick(cfg.schedule) except Exception: logger.exception("scheduler_tick_failed") finally: await asyncio.sleep(60) def _raise_mediamtx_http_error(err: httpx.HTTPError) -> None: if isinstance(err, httpx.HTTPStatusError): code = err.response.status_code if code == 401: raise HTTPException(status_code=502, detail="mediamtx_unauthorized") if code == 403: raise HTTPException(status_code=502, detail="mediamtx_forbidden") raise HTTPException(status_code=502, detail=f"mediamtx_http_{code}") raise HTTPException(status_code=502, detail="mediamtx_unreachable") @app.on_event("startup") async def _startup() -> None: cfg = await _sync_app_config_from_mediamtx() recordings_dir = Path(cfg.recordings_dir) # Backward-compatible migration: old defaults used "/recordings". if str(recordings_dir) == "/recordings": recordings_dir = DEFAULT_LOCAL_RECORDINGS_DIR cfg.recordings_dir = str(recordings_dir) await store.save(cfg) try: recordings_dir.mkdir(parents=True, exist_ok=True) except PermissionError: # Final fallback for local dev/deploy on same machine. recordings_dir = DEFAULT_LOCAL_RECORDINGS_DIR recordings_dir.mkdir(parents=True, exist_ok=True) cfg.recordings_dir = str(recordings_dir) await store.save(cfg) app.mount("/videos", StaticFiles(directory=str(recordings_dir)), name="videos") asyncio.create_task(_scheduler_loop()) @app.get("/api/health") async def health() -> dict: return {"status": "ok"} @app.get("/api/config") async def get_config() -> AppConfig: return await _sync_app_config_from_mediamtx() @app.get("/api/mediamtx/config") async def get_mediamtx_config() -> MediaMTXConfigView: data = _load_mediamtx_yml() view = _build_mediamtx_view(data) await _sync_app_config_from_mediamtx() return view @app.post("/api/mediamtx/cameras") async def add_mediamtx_camera(payload: MediaMTXAddCameraRequest) -> MediaMTXConfigView: data = _load_mediamtx_yml() paths = data.setdefault("paths", {}) if not isinstance(paths, dict): raise HTTPException(status_code=400, detail="invalid_mediamtx_paths") used = {str(k) for k in paths.keys()} idx = 1 while f"cam{idx}" in used: idx += 1 name = f"cam{idx}" paths[name] = {"source": payload.rtsp_url.strip()} _save_mediamtx_yml(data) await _sync_app_config_from_mediamtx() return _build_mediamtx_view(data) @app.delete("/api/mediamtx/cameras/{name}") async def delete_mediamtx_camera(name: str) -> MediaMTXConfigView: data = _load_mediamtx_yml() paths = data.get("paths") or {} if not isinstance(paths, dict) or name not in paths: raise HTTPException(status_code=404, detail="camera_not_found") del paths[name] data["paths"] = paths _save_mediamtx_yml(data) await _sync_app_config_from_mediamtx() return _build_mediamtx_view(data) @app.post("/api/mediamtx/recording") async def set_mediamtx_recording(data: RecordingToggle) -> MediaMTXConfigView: payload = _load_mediamtx_yml() path_defaults = payload.setdefault("pathDefaults", {}) if not isinstance(path_defaults, dict): raise HTTPException(status_code=400, detail="invalid_mediamtx_path_defaults") path_defaults["record"] = bool(data.enabled) payload["pathDefaults"] = path_defaults _save_mediamtx_yml(payload) return _build_mediamtx_view(payload) @app.post("/api/mediamtx/restart") async def restart_mediamtx() -> dict: return _restart_mediamtx() @app.get("/api/paths") async def list_paths() -> dict: cfg = await store.load() client = MediaMTXClient( api_url=cfg.mediamtx_api_url, username=cfg.mediamtx_api_user, password=cfg.mediamtx_api_pass, ) try: return await client.list_paths_status() except httpx.HTTPError as e: _raise_mediamtx_http_error(e) @app.post("/api/recording") async def toggle_recording(data: RecordingToggle) -> dict: try: await _apply_recording(data.enabled) except httpx.HTTPError as e: _raise_mediamtx_http_error(e) return {"enabled": data.enabled} @app.post("/api/scheduler/enabled") async def set_scheduler_enabled(data: SchedulerEnabled) -> AppConfig: cfg = await store.load() cfg.schedule.enabled = data.enabled await store.save(cfg) await scheduler.tick(cfg.schedule) return cfg @app.post("/api/scheduler/schedule") async def update_schedule(data: ScheduleUpdate) -> AppConfig: cfg = await store.load() cfg.schedule.weekdays_from = data.weekdays_from cfg.schedule.weekdays_to = data.weekdays_to cfg.schedule.weekend_all_day = data.weekend_all_day await store.save(cfg) await scheduler.tick(cfg.schedule) return cfg @app.get("/api/recordings") async def recordings( camera: str, date: Optional[str] = None, limit: int = 200, offset: int = 0, ) -> list[dict]: cfg = await _sync_app_config_from_mediamtx() if not any(c.name == camera for c in cfg.cameras): raise HTTPException(status_code=404, detail="camera_not_found") if limit < 1 or limit > 2000: raise HTTPException(status_code=400, detail="invalid_limit") if offset < 0: raise HTTPException(status_code=400, detail="invalid_offset") items = list_recordings(cfg.recordings_dir, camera, date, limit, offset) return [ { "camera": it.camera, "filename": it.filename, "timestamp": it.timestamp, "url": it.url, } for it in items ]