Files
IPCam_OrangePi_Dashboard/api/app/main.py
T
2026-05-08 16:42:15 +07:00

438 lines
13 KiB
Python

from __future__ import annotations
import asyncio
import logging
import subprocess
from pathlib import Path
from typing import Optional
import httpx
import yaml
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from .config_store import default_store
from .mediamtx_client import MediaMTXClient
from .models import (
AppConfig,
AppConfigUpdate,
Camera,
MediaMTXAddCameraRequest,
MediaMTXCamera,
MediaMTXConfigView,
RecordDeleteAfterUpdate,
RecordingToggle,
ScheduleUpdate,
SchedulerEnabled,
)
from .recordings import list_recordings
from .scheduler import Scheduler
def _cors_origins() -> list[str]:
return ["http://localhost:5173"]
logger = logging.getLogger("ipcam_dashboard")
PROJECT_ROOT = Path(__file__).resolve().parents[2]
DEFAULT_LOCAL_RECORDINGS_DIR = PROJECT_ROOT / "mediamtx" / "recordings"
MEDIAMTX_YML_PATH = PROJECT_ROOT / "mediamtx" / "mediamtx.yml"
MEDIAMTX_COMPOSE_PATH = PROJECT_ROOT / "mediamtx" / "docker-compose.yml"
app = FastAPI(title="IPCam Dashboard API")
app.add_middleware(
CORSMiddleware,
allow_origins=_cors_origins(),
allow_credentials=True,
allow_methods=["*"] ,
allow_headers=["*"] ,
)
store = default_store()
def _clean_text(value: Optional[str]) -> Optional[str]:
if value is None:
return None
cleaned = str(value).strip().strip("`'\"")
return cleaned or None
def _sanitize_cfg_fields(cfg: AppConfig) -> bool:
changed = False
clean_api = _clean_text(cfg.mediamtx_api_url)
clean_webrtc = _clean_text(cfg.mediamtx_webrtc_url)
clean_user = _clean_text(cfg.mediamtx_api_user)
clean_pass = _clean_text(cfg.mediamtx_api_pass)
clean_recordings = _clean_text(cfg.recordings_dir)
if clean_api and clean_api != cfg.mediamtx_api_url:
cfg.mediamtx_api_url = clean_api
changed = True
if clean_webrtc and clean_webrtc != cfg.mediamtx_webrtc_url:
cfg.mediamtx_webrtc_url = clean_webrtc
changed = True
if clean_user != cfg.mediamtx_api_user:
cfg.mediamtx_api_user = clean_user
changed = True
if clean_pass != cfg.mediamtx_api_pass:
cfg.mediamtx_api_pass = clean_pass
changed = True
if clean_recordings and clean_recordings != cfg.recordings_dir:
cfg.recordings_dir = clean_recordings
changed = True
return changed
def _load_mediamtx_yml() -> dict:
if not MEDIAMTX_YML_PATH.exists():
raise HTTPException(status_code=500, detail="mediamtx_yml_not_found")
raw = MEDIAMTX_YML_PATH.read_text(encoding="utf-8")
return yaml.safe_load(raw) or {}
def _save_mediamtx_yml(data: dict) -> None:
MEDIAMTX_YML_PATH.write_text(
yaml.safe_dump(data, sort_keys=False, allow_unicode=False),
encoding="utf-8",
)
def _set_recording_in_mediamtx_yml(enabled: bool) -> bool:
data = _load_mediamtx_yml()
path_defaults = data.setdefault("pathDefaults", {})
if not isinstance(path_defaults, dict):
raise HTTPException(status_code=400, detail="invalid_mediamtx_path_defaults")
current = bool(path_defaults.get("record", False))
if current == bool(enabled):
return False
path_defaults["record"] = bool(enabled)
data["pathDefaults"] = path_defaults
_save_mediamtx_yml(data)
return True
def _parse_record_delete_after_days(value: object) -> Optional[int]:
if not isinstance(value, str):
return None
text = value.strip().lower()
if text == "24h":
return 1
if text == "72h":
return 3
if text == "168h":
return 7
return None
def _set_record_delete_after_days_in_mediamtx_yml(days: int) -> None:
if days not in {1, 3, 7}:
raise HTTPException(status_code=400, detail="invalid_record_delete_after_days")
data = _load_mediamtx_yml()
path_defaults = data.setdefault("pathDefaults", {})
if not isinstance(path_defaults, dict):
raise HTTPException(status_code=400, detail="invalid_mediamtx_path_defaults")
path_defaults["recordDeleteAfter"] = f"{days * 24}h"
data["pathDefaults"] = path_defaults
_save_mediamtx_yml(data)
def _build_mediamtx_view(data: dict, cfg: AppConfig) -> MediaMTXConfigView:
path_defaults = data.get("pathDefaults") or {}
record_enabled = bool(path_defaults.get("record", False))
record_delete_after_days = _parse_record_delete_after_days(path_defaults.get("recordDeleteAfter"))
cameras: list[MediaMTXCamera] = []
paths = data.get("paths") or {}
if isinstance(paths, dict):
for name, path_cfg in paths.items():
if not isinstance(path_cfg, dict):
continue
source = path_cfg.get("source")
if isinstance(source, str) and source.strip():
cameras.append(MediaMTXCamera(name=str(name), rtsp_url=source))
cameras.sort(key=lambda x: x.name)
return MediaMTXConfigView(
api_url=cfg.mediamtx_api_url,
webrtc_url=cfg.mediamtx_webrtc_url,
record_enabled=record_enabled,
record_delete_after_days=record_delete_after_days,
cameras=cameras,
)
async def _sync_app_config_from_mediamtx() -> AppConfig:
cfg = await store.load()
_sanitize_cfg_fields(cfg)
data = _load_mediamtx_yml()
cfg.cameras = [
Camera(name=c.name, rtsp_url=c.rtsp_url)
for c in _build_mediamtx_view(data, cfg).cameras
]
await store.save(cfg)
return cfg
def _restart_mediamtx() -> dict:
cmds = [
["docker", "compose", "-f", str(MEDIAMTX_COMPOSE_PATH), "restart", "mediamtx"],
["docker-compose", "-f", str(MEDIAMTX_COMPOSE_PATH), "restart", "mediamtx"],
]
last_err = ""
for cmd in cmds:
try:
proc = subprocess.run(
cmd,
cwd=str(MEDIAMTX_COMPOSE_PATH.parent),
capture_output=True,
text=True,
timeout=40,
check=False,
)
if proc.returncode == 0:
return {"ok": True, "output": proc.stdout.strip()}
last_err = (proc.stderr or proc.stdout or "").strip()
except FileNotFoundError:
last_err = "docker_command_not_found"
except subprocess.TimeoutExpired:
last_err = "docker_restart_timeout"
raise HTTPException(status_code=500, detail=f"mediamtx_restart_failed: {last_err}")
async def _apply_recording(enabled: bool) -> None:
cfg = await store.load()
if _sanitize_cfg_fields(cfg):
await store.save(cfg)
try:
changed = _set_recording_in_mediamtx_yml(enabled)
if not changed:
return
try:
_restart_mediamtx()
logger.info("scheduler_applied_recording_and_restarted: enabled=%s", enabled)
except HTTPException as e:
logger.warning("scheduler_restart_mediamtx_failed: %s", e.detail)
except Exception as e:
logger.warning("scheduler_apply_recording_failed: %s", type(e).__name__)
scheduler = Scheduler(apply=_apply_recording)
async def _scheduler_loop() -> None:
while True:
try:
cfg = await store.load()
await scheduler.tick(cfg.schedule)
except httpx.HTTPStatusError as e:
code = e.response.status_code
if code == 401:
logger.warning(
"scheduler_tick_unauthorized: MediaMTX API rejected credentials. "
"Set mediamtx_api_user/mediamtx_api_pass in api/data/config.json"
)
else:
logger.warning("scheduler_tick_http_status_%s", code)
except httpx.HTTPError:
logger.warning("scheduler_tick_http_error")
except Exception:
logger.exception("scheduler_tick_failed")
finally:
await asyncio.sleep(60)
def _raise_mediamtx_http_error(err: httpx.HTTPError) -> None:
if isinstance(err, httpx.HTTPStatusError):
code = err.response.status_code
if code == 401:
raise HTTPException(status_code=502, detail="mediamtx_unauthorized")
if code == 403:
raise HTTPException(status_code=502, detail="mediamtx_forbidden")
raise HTTPException(status_code=502, detail=f"mediamtx_http_{code}")
raise HTTPException(status_code=502, detail="mediamtx_unreachable")
@app.on_event("startup")
async def _startup() -> None:
cfg = await _sync_app_config_from_mediamtx()
recordings_dir = Path(cfg.recordings_dir)
# Backward-compatible migration: old defaults used "/recordings".
if str(recordings_dir) == "/recordings":
recordings_dir = DEFAULT_LOCAL_RECORDINGS_DIR
cfg.recordings_dir = str(recordings_dir)
await store.save(cfg)
try:
recordings_dir.mkdir(parents=True, exist_ok=True)
except PermissionError:
# Final fallback for local dev/deploy on same machine.
recordings_dir = DEFAULT_LOCAL_RECORDINGS_DIR
recordings_dir.mkdir(parents=True, exist_ok=True)
cfg.recordings_dir = str(recordings_dir)
await store.save(cfg)
app.mount("/videos", StaticFiles(directory=str(recordings_dir)), name="videos")
asyncio.create_task(_scheduler_loop())
@app.get("/api/health")
async def health() -> dict:
return {"status": "ok"}
@app.get("/api/config")
async def get_config() -> AppConfig:
return await _sync_app_config_from_mediamtx()
@app.get("/api/mediamtx/config")
async def get_mediamtx_config() -> MediaMTXConfigView:
cfg = await store.load()
data = _load_mediamtx_yml()
view = _build_mediamtx_view(data, cfg)
await _sync_app_config_from_mediamtx()
return view
@app.post("/api/config/basic")
async def update_basic_config(payload: AppConfigUpdate) -> AppConfig:
cfg = await store.load()
cfg.mediamtx_api_url = _clean_text(payload.mediamtx_api_url) or cfg.mediamtx_api_url
cfg.mediamtx_webrtc_url = _clean_text(payload.mediamtx_webrtc_url) or cfg.mediamtx_webrtc_url
cfg.mediamtx_api_user = _clean_text(payload.mediamtx_api_user)
cfg.mediamtx_api_pass = _clean_text(payload.mediamtx_api_pass)
cfg.recordings_dir = _clean_text(payload.recordings_dir) or cfg.recordings_dir
cfg.api_port = payload.api_port
await store.save(cfg)
return await _sync_app_config_from_mediamtx()
@app.post("/api/mediamtx/cameras")
async def add_mediamtx_camera(payload: MediaMTXAddCameraRequest) -> MediaMTXConfigView:
cfg = await store.load()
data = _load_mediamtx_yml()
paths = data.setdefault("paths", {})
if not isinstance(paths, dict):
raise HTTPException(status_code=400, detail="invalid_mediamtx_paths")
used = {str(k) for k in paths.keys()}
idx = 1
while f"cam{idx}" in used:
idx += 1
name = f"cam{idx}"
paths[name] = {"source": payload.rtsp_url.strip()}
_save_mediamtx_yml(data)
await _sync_app_config_from_mediamtx()
return _build_mediamtx_view(data, cfg)
@app.delete("/api/mediamtx/cameras/{name}")
async def delete_mediamtx_camera(name: str) -> MediaMTXConfigView:
cfg = await store.load()
data = _load_mediamtx_yml()
paths = data.get("paths") or {}
if not isinstance(paths, dict) or name not in paths:
raise HTTPException(status_code=404, detail="camera_not_found")
del paths[name]
data["paths"] = paths
_save_mediamtx_yml(data)
await _sync_app_config_from_mediamtx()
return _build_mediamtx_view(data, cfg)
@app.post("/api/mediamtx/recording")
async def set_mediamtx_recording(data: RecordingToggle) -> MediaMTXConfigView:
cfg = await store.load()
_set_recording_in_mediamtx_yml(data.enabled)
payload = _load_mediamtx_yml()
return _build_mediamtx_view(payload, cfg)
@app.post("/api/mediamtx/record-delete-after")
async def set_record_delete_after(data: RecordDeleteAfterUpdate) -> MediaMTXConfigView:
cfg = await store.load()
_set_record_delete_after_days_in_mediamtx_yml(data.days)
payload = _load_mediamtx_yml()
return _build_mediamtx_view(payload, cfg)
@app.post("/api/mediamtx/restart")
async def restart_mediamtx() -> dict:
return _restart_mediamtx()
@app.get("/api/paths")
async def list_paths() -> dict:
cfg = await store.load()
client = MediaMTXClient(
api_url=cfg.mediamtx_api_url,
username=cfg.mediamtx_api_user,
password=cfg.mediamtx_api_pass,
)
try:
return await client.list_paths_status()
except httpx.HTTPError as e:
_raise_mediamtx_http_error(e)
@app.post("/api/recording")
async def toggle_recording(data: RecordingToggle) -> dict:
try:
await _apply_recording(data.enabled)
except httpx.HTTPError as e:
_raise_mediamtx_http_error(e)
return {"enabled": data.enabled}
@app.post("/api/scheduler/enabled")
async def set_scheduler_enabled(data: SchedulerEnabled) -> AppConfig:
cfg = await store.load()
if _sanitize_cfg_fields(cfg):
await store.save(cfg)
cfg.schedule.enabled = data.enabled
await store.save(cfg)
await scheduler.tick(cfg.schedule)
return await _sync_app_config_from_mediamtx()
@app.post("/api/scheduler/schedule")
async def update_schedule(data: ScheduleUpdate) -> AppConfig:
cfg = await store.load()
if _sanitize_cfg_fields(cfg):
await store.save(cfg)
cfg.schedule.weekdays_from = data.weekdays_from
cfg.schedule.weekdays_to = data.weekdays_to
cfg.schedule.weekend_all_day = data.weekend_all_day
await store.save(cfg)
await scheduler.tick(cfg.schedule)
return await _sync_app_config_from_mediamtx()
@app.get("/api/recordings")
async def recordings(
camera: str,
date: Optional[str] = None,
limit: int = 200,
offset: int = 0,
) -> list[dict]:
cfg = await _sync_app_config_from_mediamtx()
if not any(c.name == camera for c in cfg.cameras):
raise HTTPException(status_code=404, detail="camera_not_found")
if limit < 1 or limit > 2000:
raise HTTPException(status_code=400, detail="invalid_limit")
if offset < 0:
raise HTTPException(status_code=400, detail="invalid_offset")
items = list_recordings(cfg.recordings_dir, camera, date, limit, offset)
return [
{
"camera": it.camera,
"filename": it.filename,
"timestamp": it.timestamp,
"url": it.url,
}
for it in items
]