Files
IPCam_OrangePi_Dashboard/api/app/main.py
T
2026-04-28 17:24:26 +07:00

400 lines
12 KiB
Python

from __future__ import annotations
import asyncio
import logging
import subprocess
from pathlib import Path
from typing import Optional
from urllib.parse import urlparse
import httpx
import yaml
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from .config_store import default_store
from .mediamtx_client import MediaMTXClient
from .models import (
AppConfig,
Camera,
MediaMTXAddCameraRequest,
MediaMTXCamera,
MediaMTXConfigView,
RecordingToggle,
ScheduleUpdate,
SchedulerEnabled,
)
from .recordings import list_recordings
from .scheduler import Scheduler
def _cors_origins() -> list[str]:
return ["http://localhost:5173"]
logger = logging.getLogger("ipcam_dashboard")
PROJECT_ROOT = Path(__file__).resolve().parents[2]
DEFAULT_LOCAL_RECORDINGS_DIR = PROJECT_ROOT / "mediamtx" / "recordings"
MEDIAMTX_YML_PATH = PROJECT_ROOT / "mediamtx" / "mediamtx.yml"
MEDIAMTX_COMPOSE_PATH = PROJECT_ROOT / "mediamtx" / "docker-compose.yml"
app = FastAPI(title="IPCam Dashboard API")
app.add_middleware(
CORSMiddleware,
allow_origins=_cors_origins(),
allow_credentials=True,
allow_methods=["*"] ,
allow_headers=["*"] ,
)
store = default_store()
def _extract_port(address: str, fallback: int) -> int:
if not address:
return fallback
text = str(address).strip()
if text.startswith(":"):
text = text[1:]
if ":" in text:
text = text.rsplit(":", 1)[-1]
try:
value = int(text)
return value if 1 <= value <= 65535 else fallback
except ValueError:
return fallback
def _host_from_url(url: str, fallback: str) -> str:
try:
parsed = urlparse(url)
host = (parsed.hostname or "").strip()
if host and host not in {"0.0.0.0", "::"}:
return host
except ValueError:
pass
return fallback
def _extract_host(address: str, fallback: str) -> str:
text = str(address or "").strip()
if not text:
return fallback
if text.startswith(":"):
return fallback
if "://" in text:
return _host_from_url(text, fallback)
if text.startswith("[") and "]" in text:
host = text[1 : text.index("]")]
return host or fallback
if ":" in text:
host = text.rsplit(":", 1)[0].strip()
if host and host not in {"0.0.0.0", "::"}:
return host
return fallback
if text in {"0.0.0.0", "::"}:
return fallback
return text
def _load_mediamtx_yml() -> dict:
if not MEDIAMTX_YML_PATH.exists():
raise HTTPException(status_code=500, detail="mediamtx_yml_not_found")
raw = MEDIAMTX_YML_PATH.read_text(encoding="utf-8")
return yaml.safe_load(raw) or {}
def _save_mediamtx_yml(data: dict) -> None:
MEDIAMTX_YML_PATH.write_text(
yaml.safe_dump(data, sort_keys=False, allow_unicode=False),
encoding="utf-8",
)
def _build_mediamtx_view(data: dict, current_cfg: Optional[AppConfig] = None) -> MediaMTXConfigView:
api_port = _extract_port(str(data.get("apiAddress", ":9997")), 9997)
webrtc_port = _extract_port(str(data.get("webrtcAddress", ":8889")), 8889)
current_api_host = _host_from_url((current_cfg.mediamtx_api_url if current_cfg else ""), "127.0.0.1")
current_webrtc_host = _host_from_url((current_cfg.mediamtx_webrtc_url if current_cfg else ""), "127.0.0.1")
api_host = _extract_host(str(data.get("apiAddress", ":9997")), current_api_host)
hosts = data.get("webrtcAdditionalHosts") or []
host = hosts[0] if isinstance(hosts, list) and hosts else _extract_host(str(data.get("webrtcAddress", ":8889")), current_webrtc_host)
path_defaults = data.get("pathDefaults") or {}
record_enabled = bool(path_defaults.get("record", False))
cameras: list[MediaMTXCamera] = []
paths = data.get("paths") or {}
if isinstance(paths, dict):
for name, cfg in paths.items():
if not isinstance(cfg, dict):
continue
source = cfg.get("source")
if isinstance(source, str) and source.strip():
cameras.append(MediaMTXCamera(name=str(name), rtsp_url=source))
cameras.sort(key=lambda x: x.name)
return MediaMTXConfigView(
api_url=f"http://{api_host}:{api_port}",
webrtc_url=f"http://{host}:{webrtc_port}",
record_enabled=record_enabled,
cameras=cameras,
)
async def _sync_app_config_from_mediamtx() -> AppConfig:
cfg = await store.load()
data = _load_mediamtx_yml()
view = _build_mediamtx_view(data, cfg)
cfg.mediamtx_api_url = view.api_url
cfg.mediamtx_webrtc_url = view.webrtc_url
cfg.cameras = [Camera(name=c.name, rtsp_url=c.rtsp_url) for c in view.cameras]
await store.save(cfg)
return cfg
def _restart_mediamtx() -> dict:
cmds = [
["docker", "compose", "-f", str(MEDIAMTX_COMPOSE_PATH), "restart", "mediamtx"],
["docker-compose", "-f", str(MEDIAMTX_COMPOSE_PATH), "restart", "mediamtx"],
]
last_err = ""
for cmd in cmds:
try:
proc = subprocess.run(
cmd,
cwd=str(MEDIAMTX_COMPOSE_PATH.parent),
capture_output=True,
text=True,
timeout=40,
check=False,
)
if proc.returncode == 0:
return {"ok": True, "output": proc.stdout.strip()}
last_err = (proc.stderr or proc.stdout or "").strip()
except FileNotFoundError:
last_err = "docker_command_not_found"
except subprocess.TimeoutExpired:
last_err = "docker_restart_timeout"
raise HTTPException(status_code=500, detail=f"mediamtx_restart_failed: {last_err}")
async def _apply_recording(enabled: bool) -> None:
cfg = await store.load()
try:
paths = await MediaMTXClient(
api_url=cfg.mediamtx_api_url,
username=cfg.mediamtx_api_user,
password=cfg.mediamtx_api_pass,
).list_paths_status()
names = [it.get("name") for it in (paths.get("items") or []) if isinstance(it, dict) and it.get("name")]
except Exception:
names = [c.name for c in cfg.cameras]
if not names:
return
client = MediaMTXClient(
api_url=cfg.mediamtx_api_url,
username=cfg.mediamtx_api_user,
password=cfg.mediamtx_api_pass,
)
await client.set_recording_bulk(names, enabled)
scheduler = Scheduler(apply=_apply_recording)
async def _scheduler_loop() -> None:
while True:
try:
cfg = await store.load()
await scheduler.tick(cfg.schedule)
except httpx.HTTPStatusError as e:
code = e.response.status_code
if code == 401:
logger.warning(
"scheduler_tick_unauthorized: MediaMTX API rejected credentials. "
"Set mediamtx_api_user/mediamtx_api_pass in api/data/config.json"
)
else:
logger.exception("scheduler_tick_http_status_%s", code)
except httpx.HTTPError:
logger.exception("scheduler_tick_http_error")
except Exception:
logger.exception("scheduler_tick_failed")
finally:
await asyncio.sleep(60)
def _raise_mediamtx_http_error(err: httpx.HTTPError) -> None:
if isinstance(err, httpx.HTTPStatusError):
code = err.response.status_code
if code == 401:
raise HTTPException(status_code=502, detail="mediamtx_unauthorized")
if code == 403:
raise HTTPException(status_code=502, detail="mediamtx_forbidden")
raise HTTPException(status_code=502, detail=f"mediamtx_http_{code}")
raise HTTPException(status_code=502, detail="mediamtx_unreachable")
@app.on_event("startup")
async def _startup() -> None:
cfg = await _sync_app_config_from_mediamtx()
recordings_dir = Path(cfg.recordings_dir)
# Backward-compatible migration: old defaults used "/recordings".
if str(recordings_dir) == "/recordings":
recordings_dir = DEFAULT_LOCAL_RECORDINGS_DIR
cfg.recordings_dir = str(recordings_dir)
await store.save(cfg)
try:
recordings_dir.mkdir(parents=True, exist_ok=True)
except PermissionError:
# Final fallback for local dev/deploy on same machine.
recordings_dir = DEFAULT_LOCAL_RECORDINGS_DIR
recordings_dir.mkdir(parents=True, exist_ok=True)
cfg.recordings_dir = str(recordings_dir)
await store.save(cfg)
app.mount("/videos", StaticFiles(directory=str(recordings_dir)), name="videos")
asyncio.create_task(_scheduler_loop())
@app.get("/api/health")
async def health() -> dict:
return {"status": "ok"}
@app.get("/api/config")
async def get_config() -> AppConfig:
return await _sync_app_config_from_mediamtx()
@app.get("/api/mediamtx/config")
async def get_mediamtx_config() -> MediaMTXConfigView:
data = _load_mediamtx_yml()
view = _build_mediamtx_view(data, await store.load())
await _sync_app_config_from_mediamtx()
return view
@app.post("/api/mediamtx/cameras")
async def add_mediamtx_camera(payload: MediaMTXAddCameraRequest) -> MediaMTXConfigView:
data = _load_mediamtx_yml()
paths = data.setdefault("paths", {})
if not isinstance(paths, dict):
raise HTTPException(status_code=400, detail="invalid_mediamtx_paths")
used = {str(k) for k in paths.keys()}
idx = 1
while f"cam{idx}" in used:
idx += 1
name = f"cam{idx}"
paths[name] = {"source": payload.rtsp_url.strip()}
_save_mediamtx_yml(data)
await _sync_app_config_from_mediamtx()
return _build_mediamtx_view(data, await store.load())
@app.delete("/api/mediamtx/cameras/{name}")
async def delete_mediamtx_camera(name: str) -> MediaMTXConfigView:
data = _load_mediamtx_yml()
paths = data.get("paths") or {}
if not isinstance(paths, dict) or name not in paths:
raise HTTPException(status_code=404, detail="camera_not_found")
del paths[name]
data["paths"] = paths
_save_mediamtx_yml(data)
await _sync_app_config_from_mediamtx()
return _build_mediamtx_view(data, await store.load())
@app.post("/api/mediamtx/recording")
async def set_mediamtx_recording(data: RecordingToggle) -> MediaMTXConfigView:
payload = _load_mediamtx_yml()
path_defaults = payload.setdefault("pathDefaults", {})
if not isinstance(path_defaults, dict):
raise HTTPException(status_code=400, detail="invalid_mediamtx_path_defaults")
path_defaults["record"] = bool(data.enabled)
payload["pathDefaults"] = path_defaults
_save_mediamtx_yml(payload)
return _build_mediamtx_view(payload, await store.load())
@app.post("/api/mediamtx/restart")
async def restart_mediamtx() -> dict:
return _restart_mediamtx()
@app.get("/api/paths")
async def list_paths() -> dict:
cfg = await store.load()
client = MediaMTXClient(
api_url=cfg.mediamtx_api_url,
username=cfg.mediamtx_api_user,
password=cfg.mediamtx_api_pass,
)
try:
return await client.list_paths_status()
except httpx.HTTPError as e:
_raise_mediamtx_http_error(e)
@app.post("/api/recording")
async def toggle_recording(data: RecordingToggle) -> dict:
try:
await _apply_recording(data.enabled)
except httpx.HTTPError as e:
_raise_mediamtx_http_error(e)
return {"enabled": data.enabled}
@app.post("/api/scheduler/enabled")
async def set_scheduler_enabled(data: SchedulerEnabled) -> AppConfig:
cfg = await store.load()
cfg.schedule.enabled = data.enabled
await store.save(cfg)
await scheduler.tick(cfg.schedule)
return cfg
@app.post("/api/scheduler/schedule")
async def update_schedule(data: ScheduleUpdate) -> AppConfig:
cfg = await store.load()
cfg.schedule.weekdays_from = data.weekdays_from
cfg.schedule.weekdays_to = data.weekdays_to
cfg.schedule.weekend_all_day = data.weekend_all_day
await store.save(cfg)
await scheduler.tick(cfg.schedule)
return cfg
@app.get("/api/recordings")
async def recordings(
camera: str,
date: Optional[str] = None,
limit: int = 200,
offset: int = 0,
) -> list[dict]:
cfg = await _sync_app_config_from_mediamtx()
if not any(c.name == camera for c in cfg.cameras):
raise HTTPException(status_code=404, detail="camera_not_found")
if limit < 1 or limit > 2000:
raise HTTPException(status_code=400, detail="invalid_limit")
if offset < 0:
raise HTTPException(status_code=400, detail="invalid_offset")
items = list_recordings(cfg.recordings_dir, camera, date, limit, offset)
return [
{
"camera": it.camera,
"filename": it.filename,
"timestamp": it.timestamp,
"url": it.url,
}
for it in items
]