Files
IPCam_OrangePi_Dashboard/api/app/scheduler.py
T
2026-04-26 21:27:00 +07:00

47 lines
1.1 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, time
from typing import Awaitable, Callable, Optional
from .models import Schedule
def _parse_hhmm(value: str) -> time:
hh, mm = value.split(":")
return time(hour=int(hh), minute=int(mm))
def should_record_now(now: datetime, schedule: Schedule) -> bool:
if not schedule.enabled:
return False
weekday = now.weekday()
if weekday in (5, 6) and schedule.weekend_all_day:
return True
start = _parse_hhmm(schedule.weekdays_from)
end = _parse_hhmm(schedule.weekdays_to)
now_t = now.time().replace(second=0, microsecond=0)
if start == end:
return True
if start < end:
return start <= now_t < end
return now_t >= start or now_t < end
@dataclass
class Scheduler:
apply: Callable[[bool], Awaitable[None]]
_last_state: Optional[bool] = None
async def tick(self, schedule: Schedule) -> None:
now = datetime.now()
state = should_record_now(now, schedule)
if self._last_state != state:
await self.apply(state)
self._last_state = state