from __future__ import annotations import asyncio import logging import subprocess from pathlib import Path from typing import Optional import httpx import yaml from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from .config_store import default_store from .mediamtx_client import MediaMTXClient from .models import ( AppConfig, AppConfigUpdate, Camera, MediaMTXAddCameraRequest, MediaMTXCamera, MediaMTXConfigView, RecordingToggle, ScheduleUpdate, SchedulerEnabled, ) from .recordings import list_recordings from .scheduler import Scheduler def _cors_origins() -> list[str]: return ["http://localhost:5173"] logger = logging.getLogger("ipcam_dashboard") PROJECT_ROOT = Path(__file__).resolve().parents[2] DEFAULT_LOCAL_RECORDINGS_DIR = PROJECT_ROOT / "mediamtx" / "recordings" MEDIAMTX_YML_PATH = PROJECT_ROOT / "mediamtx" / "mediamtx.yml" MEDIAMTX_COMPOSE_PATH = PROJECT_ROOT / "mediamtx" / "docker-compose.yml" app = FastAPI(title="IPCam Dashboard API") app.add_middleware( CORSMiddleware, allow_origins=_cors_origins(), allow_credentials=True, allow_methods=["*"] , allow_headers=["*"] , ) store = default_store() def _clean_text(value: Optional[str]) -> Optional[str]: if value is None: return None cleaned = str(value).strip().strip("`'\"") return cleaned or None def _sanitize_cfg_fields(cfg: AppConfig) -> bool: changed = False clean_api = _clean_text(cfg.mediamtx_api_url) clean_webrtc = _clean_text(cfg.mediamtx_webrtc_url) clean_user = _clean_text(cfg.mediamtx_api_user) clean_pass = _clean_text(cfg.mediamtx_api_pass) clean_recordings = _clean_text(cfg.recordings_dir) if clean_api and clean_api != cfg.mediamtx_api_url: cfg.mediamtx_api_url = clean_api changed = True if clean_webrtc and clean_webrtc != cfg.mediamtx_webrtc_url: cfg.mediamtx_webrtc_url = clean_webrtc changed = True if clean_user != cfg.mediamtx_api_user: cfg.mediamtx_api_user = clean_user changed = True if clean_pass != cfg.mediamtx_api_pass: cfg.mediamtx_api_pass = clean_pass changed = True if clean_recordings and clean_recordings != cfg.recordings_dir: cfg.recordings_dir = clean_recordings changed = True return changed def _load_mediamtx_yml() -> dict: if not MEDIAMTX_YML_PATH.exists(): raise HTTPException(status_code=500, detail="mediamtx_yml_not_found") raw = MEDIAMTX_YML_PATH.read_text(encoding="utf-8") return yaml.safe_load(raw) or {} def _save_mediamtx_yml(data: dict) -> None: MEDIAMTX_YML_PATH.write_text( yaml.safe_dump(data, sort_keys=False, allow_unicode=False), encoding="utf-8", ) def _build_mediamtx_view(data: dict, cfg: AppConfig) -> MediaMTXConfigView: path_defaults = data.get("pathDefaults") or {} record_enabled = bool(path_defaults.get("record", False)) cameras: list[MediaMTXCamera] = [] paths = data.get("paths") or {} if isinstance(paths, dict): for name, path_cfg in paths.items(): if not isinstance(path_cfg, dict): continue source = path_cfg.get("source") if isinstance(source, str) and source.strip(): cameras.append(MediaMTXCamera(name=str(name), rtsp_url=source)) cameras.sort(key=lambda x: x.name) return MediaMTXConfigView( api_url=cfg.mediamtx_api_url, webrtc_url=cfg.mediamtx_webrtc_url, record_enabled=record_enabled, cameras=cameras, ) async def _sync_app_config_from_mediamtx() -> AppConfig: cfg = await store.load() _sanitize_cfg_fields(cfg) data = _load_mediamtx_yml() cfg.cameras = [ Camera(name=c.name, rtsp_url=c.rtsp_url) for c in _build_mediamtx_view(data, cfg).cameras ] await store.save(cfg) return cfg def _restart_mediamtx() -> dict: cmds = [ ["docker", "compose", "-f", str(MEDIAMTX_COMPOSE_PATH), "restart", "mediamtx"], ["docker-compose", "-f", str(MEDIAMTX_COMPOSE_PATH), "restart", "mediamtx"], ] last_err = "" for cmd in cmds: try: proc = subprocess.run( cmd, cwd=str(MEDIAMTX_COMPOSE_PATH.parent), capture_output=True, text=True, timeout=40, check=False, ) if proc.returncode == 0: return {"ok": True, "output": proc.stdout.strip()} last_err = (proc.stderr or proc.stdout or "").strip() except FileNotFoundError: last_err = "docker_command_not_found" except subprocess.TimeoutExpired: last_err = "docker_restart_timeout" raise HTTPException(status_code=500, detail=f"mediamtx_restart_failed: {last_err}") async def _apply_recording(enabled: bool) -> None: cfg = await store.load() if _sanitize_cfg_fields(cfg): await store.save(cfg) try: paths = await MediaMTXClient( api_url=cfg.mediamtx_api_url, username=cfg.mediamtx_api_user, password=cfg.mediamtx_api_pass, ).list_paths_status() names = [it.get("name") for it in (paths.get("items") or []) if isinstance(it, dict) and it.get("name")] except httpx.HTTPError as e: logger.warning("scheduler_skip_apply_recording: mediamtx_unreachable (%s)", type(e).__name__) return if not names: return client = MediaMTXClient( api_url=cfg.mediamtx_api_url, username=cfg.mediamtx_api_user, password=cfg.mediamtx_api_pass, ) try: await client.set_recording_bulk(names, enabled) except httpx.HTTPError as e: logger.warning("scheduler_apply_recording_failed: %s", type(e).__name__) scheduler = Scheduler(apply=_apply_recording) async def _scheduler_loop() -> None: while True: try: cfg = await store.load() await scheduler.tick(cfg.schedule) except httpx.HTTPStatusError as e: code = e.response.status_code if code == 401: logger.warning( "scheduler_tick_unauthorized: MediaMTX API rejected credentials. " "Set mediamtx_api_user/mediamtx_api_pass in api/data/config.json" ) else: logger.warning("scheduler_tick_http_status_%s", code) except httpx.HTTPError: logger.warning("scheduler_tick_http_error") except Exception: logger.exception("scheduler_tick_failed") finally: await asyncio.sleep(60) def _raise_mediamtx_http_error(err: httpx.HTTPError) -> None: if isinstance(err, httpx.HTTPStatusError): code = err.response.status_code if code == 401: raise HTTPException(status_code=502, detail="mediamtx_unauthorized") if code == 403: raise HTTPException(status_code=502, detail="mediamtx_forbidden") raise HTTPException(status_code=502, detail=f"mediamtx_http_{code}") raise HTTPException(status_code=502, detail="mediamtx_unreachable") @app.on_event("startup") async def _startup() -> None: cfg = await _sync_app_config_from_mediamtx() recordings_dir = Path(cfg.recordings_dir) # Backward-compatible migration: old defaults used "/recordings". if str(recordings_dir) == "/recordings": recordings_dir = DEFAULT_LOCAL_RECORDINGS_DIR cfg.recordings_dir = str(recordings_dir) await store.save(cfg) try: recordings_dir.mkdir(parents=True, exist_ok=True) except PermissionError: # Final fallback for local dev/deploy on same machine. recordings_dir = DEFAULT_LOCAL_RECORDINGS_DIR recordings_dir.mkdir(parents=True, exist_ok=True) cfg.recordings_dir = str(recordings_dir) await store.save(cfg) app.mount("/videos", StaticFiles(directory=str(recordings_dir)), name="videos") asyncio.create_task(_scheduler_loop()) @app.get("/api/health") async def health() -> dict: return {"status": "ok"} @app.get("/api/config") async def get_config() -> AppConfig: return await _sync_app_config_from_mediamtx() @app.get("/api/mediamtx/config") async def get_mediamtx_config() -> MediaMTXConfigView: cfg = await store.load() data = _load_mediamtx_yml() view = _build_mediamtx_view(data, cfg) await _sync_app_config_from_mediamtx() return view @app.post("/api/config/basic") async def update_basic_config(payload: AppConfigUpdate) -> AppConfig: cfg = await store.load() cfg.mediamtx_api_url = _clean_text(payload.mediamtx_api_url) or cfg.mediamtx_api_url cfg.mediamtx_webrtc_url = _clean_text(payload.mediamtx_webrtc_url) or cfg.mediamtx_webrtc_url cfg.mediamtx_api_user = _clean_text(payload.mediamtx_api_user) cfg.mediamtx_api_pass = _clean_text(payload.mediamtx_api_pass) cfg.recordings_dir = _clean_text(payload.recordings_dir) or cfg.recordings_dir cfg.api_port = payload.api_port await store.save(cfg) return await _sync_app_config_from_mediamtx() @app.post("/api/mediamtx/cameras") async def add_mediamtx_camera(payload: MediaMTXAddCameraRequest) -> MediaMTXConfigView: cfg = await store.load() data = _load_mediamtx_yml() paths = data.setdefault("paths", {}) if not isinstance(paths, dict): raise HTTPException(status_code=400, detail="invalid_mediamtx_paths") used = {str(k) for k in paths.keys()} idx = 1 while f"cam{idx}" in used: idx += 1 name = f"cam{idx}" paths[name] = {"source": payload.rtsp_url.strip()} _save_mediamtx_yml(data) await _sync_app_config_from_mediamtx() return _build_mediamtx_view(data, cfg) @app.delete("/api/mediamtx/cameras/{name}") async def delete_mediamtx_camera(name: str) -> MediaMTXConfigView: cfg = await store.load() data = _load_mediamtx_yml() paths = data.get("paths") or {} if not isinstance(paths, dict) or name not in paths: raise HTTPException(status_code=404, detail="camera_not_found") del paths[name] data["paths"] = paths _save_mediamtx_yml(data) await _sync_app_config_from_mediamtx() return _build_mediamtx_view(data, cfg) @app.post("/api/mediamtx/recording") async def set_mediamtx_recording(data: RecordingToggle) -> MediaMTXConfigView: cfg = await store.load() payload = _load_mediamtx_yml() path_defaults = payload.setdefault("pathDefaults", {}) if not isinstance(path_defaults, dict): raise HTTPException(status_code=400, detail="invalid_mediamtx_path_defaults") path_defaults["record"] = bool(data.enabled) payload["pathDefaults"] = path_defaults _save_mediamtx_yml(payload) return _build_mediamtx_view(payload, cfg) @app.post("/api/mediamtx/restart") async def restart_mediamtx() -> dict: return _restart_mediamtx() @app.get("/api/paths") async def list_paths() -> dict: cfg = await store.load() client = MediaMTXClient( api_url=cfg.mediamtx_api_url, username=cfg.mediamtx_api_user, password=cfg.mediamtx_api_pass, ) try: return await client.list_paths_status() except httpx.HTTPError as e: _raise_mediamtx_http_error(e) @app.post("/api/recording") async def toggle_recording(data: RecordingToggle) -> dict: try: await _apply_recording(data.enabled) except httpx.HTTPError as e: _raise_mediamtx_http_error(e) return {"enabled": data.enabled} @app.post("/api/scheduler/enabled") async def set_scheduler_enabled(data: SchedulerEnabled) -> AppConfig: cfg = await store.load() cfg.schedule.enabled = data.enabled await store.save(cfg) await scheduler.tick(cfg.schedule) return cfg @app.post("/api/scheduler/schedule") async def update_schedule(data: ScheduleUpdate) -> AppConfig: cfg = await store.load() cfg.schedule.weekdays_from = data.weekdays_from cfg.schedule.weekdays_to = data.weekdays_to cfg.schedule.weekend_all_day = data.weekend_all_day await store.save(cfg) await scheduler.tick(cfg.schedule) return cfg @app.get("/api/recordings") async def recordings( camera: str, date: Optional[str] = None, limit: int = 200, offset: int = 0, ) -> list[dict]: cfg = await _sync_app_config_from_mediamtx() if not any(c.name == camera for c in cfg.cameras): raise HTTPException(status_code=404, detail="camera_not_found") if limit < 1 or limit > 2000: raise HTTPException(status_code=400, detail="invalid_limit") if offset < 0: raise HTTPException(status_code=400, detail="invalid_offset") items = list_recordings(cfg.recordings_dir, camera, date, limit, offset) return [ { "camera": it.camera, "filename": it.filename, "timestamp": it.timestamp, "url": it.url, } for it in items ]