Files
IPCam_OrangePi_Dashboard/api/app/main.py
T
2026-04-28 10:18:44 +07:00

221 lines
5.7 KiB
Python

from __future__ import annotations
import asyncio
import os
import logging
from pathlib import Path
from typing import Optional
import httpx
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from .config_store import default_store
from .mediamtx_client import MediaMTXClient
from .models import (
AppConfig,
Camera,
RecordingToggle,
ScheduleUpdate,
SchedulerEnabled,
)
from .recordings import list_recordings
from .scheduler import Scheduler
try:
from dotenv import load_dotenv
load_dotenv(Path(__file__).resolve().parents[2] / ".env")
except Exception:
pass
def _cors_origins() -> list[str]:
raw = os.getenv("CORS_ORIGINS", "http://localhost:5173")
return [x.strip() for x in raw.split(",") if x.strip()]
logger = logging.getLogger("ipcam_dashboard")
app = FastAPI(title="IPCam Dashboard API")
app.add_middleware(
CORSMiddleware,
allow_origins=_cors_origins(),
allow_credentials=True,
allow_methods=["*"] ,
allow_headers=["*"] ,
)
store = default_store()
async def _apply_recording(enabled: bool) -> None:
cfg = await store.load()
names = [c.name for c in cfg.cameras]
if not names:
return
client = MediaMTXClient(
api_url=cfg.mediamtx_api_url,
username=os.getenv("MEDIAMTX_API_USER"),
password=os.getenv("MEDIAMTX_API_PASS"),
)
await client.set_recording_bulk(names, enabled)
scheduler = Scheduler(apply=_apply_recording)
async def _scheduler_loop() -> None:
while True:
try:
cfg = await store.load()
await scheduler.tick(cfg.schedule)
except Exception:
logger.exception("scheduler_tick_failed")
finally:
await asyncio.sleep(60)
def _raise_mediamtx_http_error(err: httpx.HTTPError) -> None:
if isinstance(err, httpx.HTTPStatusError):
code = err.response.status_code
if code == 401:
raise HTTPException(status_code=502, detail="mediamtx_unauthorized")
if code == 403:
raise HTTPException(status_code=502, detail="mediamtx_forbidden")
raise HTTPException(status_code=502, detail=f"mediamtx_http_{code}")
raise HTTPException(status_code=502, detail="mediamtx_unreachable")
@app.on_event("startup")
async def _startup() -> None:
cfg = await store.load()
recordings_dir = cfg.recordings_dir
Path(recordings_dir).mkdir(parents=True, exist_ok=True)
app.mount("/videos", StaticFiles(directory=recordings_dir), name="videos")
asyncio.create_task(_scheduler_loop())
@app.get("/api/health")
async def health() -> dict:
return {"status": "ok"}
@app.get("/api/config")
async def get_config() -> AppConfig:
return await store.load()
@app.post("/api/cameras")
async def add_camera(camera: Camera) -> AppConfig:
cfg = await store.load()
if any(c.name == camera.name for c in cfg.cameras):
raise HTTPException(status_code=409, detail="camera_already_exists")
cfg.cameras.append(camera)
await store.save(cfg)
client = MediaMTXClient(
api_url=cfg.mediamtx_api_url,
username=os.getenv("MEDIAMTX_API_USER"),
password=os.getenv("MEDIAMTX_API_PASS"),
)
try:
await client.upsert_paths_sources_bulk({camera.name: camera.rtsp_url})
except httpx.HTTPError as e:
_raise_mediamtx_http_error(e)
return cfg
@app.delete("/api/cameras/{name}")
async def delete_camera(name: str) -> AppConfig:
cfg = await store.load()
before = len(cfg.cameras)
cfg.cameras = [c for c in cfg.cameras if c.name != name]
if len(cfg.cameras) == before:
raise HTTPException(status_code=404, detail="camera_not_found")
await store.save(cfg)
client = MediaMTXClient(
api_url=cfg.mediamtx_api_url,
username=os.getenv("MEDIAMTX_API_USER"),
password=os.getenv("MEDIAMTX_API_PASS"),
)
try:
await client.delete_path(name)
except httpx.HTTPError as e:
_raise_mediamtx_http_error(e)
return cfg
@app.get("/api/paths")
async def list_paths() -> dict:
cfg = await store.load()
client = MediaMTXClient(
api_url=cfg.mediamtx_api_url,
username=os.getenv("MEDIAMTX_API_USER"),
password=os.getenv("MEDIAMTX_API_PASS"),
)
try:
return await client.list_paths_status()
except httpx.HTTPError as e:
_raise_mediamtx_http_error(e)
@app.post("/api/recording")
async def toggle_recording(data: RecordingToggle) -> dict:
try:
await _apply_recording(data.enabled)
except httpx.HTTPError as e:
_raise_mediamtx_http_error(e)
return {"enabled": data.enabled}
@app.post("/api/scheduler/enabled")
async def set_scheduler_enabled(data: SchedulerEnabled) -> AppConfig:
cfg = await store.load()
cfg.schedule.enabled = data.enabled
await store.save(cfg)
await scheduler.tick(cfg.schedule)
return cfg
@app.post("/api/scheduler/schedule")
async def update_schedule(data: ScheduleUpdate) -> AppConfig:
cfg = await store.load()
cfg.schedule.weekdays_from = data.weekdays_from
cfg.schedule.weekdays_to = data.weekdays_to
cfg.schedule.weekend_all_day = data.weekend_all_day
await store.save(cfg)
await scheduler.tick(cfg.schedule)
return cfg
@app.get("/api/recordings")
async def recordings(
camera: str,
date: Optional[str] = None,
limit: int = 200,
offset: int = 0,
) -> list[dict]:
cfg = await store.load()
if not any(c.name == camera for c in cfg.cameras):
raise HTTPException(status_code=404, detail="camera_not_found")
if limit < 1 or limit > 2000:
raise HTTPException(status_code=400, detail="invalid_limit")
if offset < 0:
raise HTTPException(status_code=400, detail="invalid_offset")
items = list_recordings(cfg.recordings_dir, camera, date, limit, offset)
return [
{
"camera": it.camera,
"filename": it.filename,
"timestamp": it.timestamp,
"url": it.url,
}
for it in items
]