695 lines
28 KiB
Python
695 lines
28 KiB
Python
#!/usr/bin/env python3
|
|
import argparse
|
|
import datetime as dt
|
|
import glob
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import socket
|
|
import sqlite3
|
|
import sys
|
|
import time
|
|
import threading
|
|
import urllib.error
|
|
import urllib.request
|
|
import webbrowser
|
|
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
|
from urllib.parse import parse_qs, urlparse
|
|
from zoneinfo import ZoneInfo
|
|
|
|
_quota_refresh_lock = threading.Lock()
|
|
# Path to the companion HTML file (same directory as this script)
|
|
HTML_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "usage-dashboard.html")
|
|
JSON_DIR = os.path.expanduser("~/.cli-proxy-api")
|
|
BASE_DIR = os.path.expanduser("/mnt/ssd/cliproxyapi/usage-dashboard")
|
|
AUTH_DIR = os.path.expanduser("/mnt/ssd/cliproxyapi")
|
|
DB_PATH = os.path.join(BASE_DIR, "usage.sqlite")
|
|
CONFIG_PATH = os.path.join(BASE_DIR, "config.json")
|
|
LOCAL_TZ = ZoneInfo("Asia/Ho_Chi_Minh")
|
|
|
|
|
|
DEFAULT_CONFIG = {
|
|
"cliproxy_host": "127.0.0.1",
|
|
"cliproxy_port": 8317,
|
|
"management_key": "123456",
|
|
"poll_interval_seconds": 2,
|
|
"quota_refresh_seconds": 300,
|
|
"dashboard_host": "0.0.0.0",
|
|
"dashboard_port": 8320,
|
|
}
|
|
|
|
|
|
def ensure_dirs():
|
|
os.makedirs(BASE_DIR, exist_ok=True)
|
|
|
|
|
|
def load_config():
|
|
ensure_dirs()
|
|
if not os.path.exists(CONFIG_PATH):
|
|
with open(CONFIG_PATH, "w") as f:
|
|
json.dump(DEFAULT_CONFIG, f, indent=2)
|
|
os.chmod(CONFIG_PATH, 0o600)
|
|
with open(CONFIG_PATH) as f:
|
|
cfg = json.load(f)
|
|
merged = dict(DEFAULT_CONFIG)
|
|
merged.update(cfg)
|
|
merged["management_key"] = os.environ.get("CLIPROXY_MANAGEMENT_KEY", merged["management_key"])
|
|
return merged
|
|
|
|
|
|
def db_connect():
|
|
ensure_dirs()
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.execute("PRAGMA busy_timeout=5000")
|
|
return conn
|
|
|
|
|
|
def init_db():
|
|
with db_connect() as conn:
|
|
conn.executescript(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS usage_events (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
event_key TEXT NOT NULL UNIQUE,
|
|
timestamp TEXT NOT NULL,
|
|
ts_epoch REAL NOT NULL,
|
|
local_date TEXT NOT NULL,
|
|
local_hour TEXT NOT NULL,
|
|
request_id TEXT,
|
|
auth_index TEXT,
|
|
source TEXT,
|
|
provider TEXT,
|
|
model TEXT,
|
|
endpoint TEXT,
|
|
auth_type TEXT,
|
|
api_key_hash TEXT,
|
|
failed INTEGER NOT NULL DEFAULT 0,
|
|
latency_ms INTEGER DEFAULT 0,
|
|
input_tokens INTEGER DEFAULT 0,
|
|
output_tokens INTEGER DEFAULT 0,
|
|
reasoning_tokens INTEGER DEFAULT 0,
|
|
cached_tokens INTEGER DEFAULT 0,
|
|
total_tokens INTEGER DEFAULT 0,
|
|
raw_json TEXT NOT NULL
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_usage_ts ON usage_events(ts_epoch);
|
|
CREATE INDEX IF NOT EXISTS idx_usage_date ON usage_events(local_date);
|
|
CREATE INDEX IF NOT EXISTS idx_usage_source ON usage_events(source);
|
|
CREATE INDEX IF NOT EXISTS idx_usage_auth ON usage_events(auth_index);
|
|
|
|
CREATE TABLE IF NOT EXISTS quota_snapshots (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp TEXT NOT NULL,
|
|
ts_epoch REAL NOT NULL,
|
|
email TEXT NOT NULL,
|
|
plan TEXT,
|
|
allowed INTEGER,
|
|
limit_reached INTEGER,
|
|
primary_used_percent INTEGER,
|
|
primary_remaining_percent INTEGER,
|
|
primary_reset_at TEXT,
|
|
secondary_used_percent INTEGER,
|
|
secondary_remaining_percent INTEGER,
|
|
secondary_reset_at TEXT,
|
|
credits_balance TEXT,
|
|
raw_json TEXT NOT NULL
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_quota_email_ts ON quota_snapshots(email, ts_epoch);
|
|
"""
|
|
)
|
|
|
|
|
|
def parse_rfc3339(value):
|
|
if not value:
|
|
return dt.datetime.now(dt.timezone.utc)
|
|
text = value.replace("Z", "+00:00")
|
|
try:
|
|
parsed = dt.datetime.fromisoformat(text)
|
|
except ValueError:
|
|
return dt.datetime.now(dt.timezone.utc)
|
|
if parsed.tzinfo is None:
|
|
parsed = parsed.replace(tzinfo=dt.timezone.utc)
|
|
return parsed.astimezone(dt.timezone.utc)
|
|
|
|
|
|
def resp_command(*parts):
|
|
data = [f"*{len(parts)}\r\n".encode()]
|
|
for part in parts:
|
|
b = str(part).encode()
|
|
data.append(f"${len(b)}\r\n".encode())
|
|
data.append(b + b"\r\n")
|
|
return b"".join(data)
|
|
|
|
|
|
class RespClient:
|
|
def __init__(self, host, port, password, timeout=10):
|
|
if not password:
|
|
raise RuntimeError("management_key is required in config.json or CLIPROXY_MANAGEMENT_KEY")
|
|
self.sock = socket.create_connection((host, port), timeout=timeout)
|
|
self.file = self.sock.makefile("rb")
|
|
self.send("AUTH", password)
|
|
reply = self.read()
|
|
if not (isinstance(reply, str) and reply.upper() == "OK"):
|
|
raise RuntimeError(f"AUTH failed: {reply!r}")
|
|
|
|
def close(self):
|
|
try:
|
|
self.file.close()
|
|
finally:
|
|
self.sock.close()
|
|
|
|
def send(self, *parts):
|
|
self.sock.sendall(resp_command(*parts))
|
|
|
|
def read_line(self):
|
|
line = self.file.readline()
|
|
if not line:
|
|
raise EOFError("RESP connection closed")
|
|
return line.rstrip(b"\r\n")
|
|
|
|
def read(self):
|
|
line = self.read_line()
|
|
prefix = line[:1]
|
|
payload = line[1:]
|
|
if prefix == b"+":
|
|
return payload.decode()
|
|
if prefix == b"-":
|
|
raise RuntimeError(payload.decode())
|
|
if prefix == b":":
|
|
return int(payload)
|
|
if prefix == b"$":
|
|
length = int(payload)
|
|
if length == -1:
|
|
return None
|
|
data = self.file.read(length)
|
|
self.file.read(2)
|
|
return data.decode("utf-8", "replace")
|
|
if prefix == b"*":
|
|
count = int(payload)
|
|
if count == -1:
|
|
return None
|
|
return [self.read() for _ in range(count)]
|
|
raise RuntimeError(f"Unknown RESP prefix: {line!r}")
|
|
|
|
def rpop(self, count=100):
|
|
result = []
|
|
for _ in range(count):
|
|
self.send("RPOP", "queue")
|
|
item = self.read()
|
|
if item is None:
|
|
break
|
|
result.append(item)
|
|
return result
|
|
|
|
|
|
def event_key(payload, raw):
|
|
rid = payload.get("request_id")
|
|
if rid:
|
|
return rid
|
|
return hashlib.sha256(raw.encode()).hexdigest()
|
|
|
|
|
|
def insert_usage(raw_items):
|
|
inserted = 0
|
|
with db_connect() as conn:
|
|
for raw in raw_items:
|
|
try:
|
|
payload = json.loads(raw)
|
|
except json.JSONDecodeError as e:
|
|
print(f"insert_usage: JSON decode error: {e} — raw: {raw[:120]!r}", file=sys.stderr, flush=True)
|
|
continue
|
|
ts_utc = parse_rfc3339(payload.get("timestamp"))
|
|
ts_local = ts_utc.astimezone(LOCAL_TZ)
|
|
tokens = payload.get("tokens") or {}
|
|
api_key = payload.get("api_key") or ""
|
|
api_hash = hashlib.sha256(api_key.encode()).hexdigest()[:12] if api_key else ""
|
|
values = {
|
|
"event_key": event_key(payload, raw),
|
|
"timestamp": ts_utc.isoformat(),
|
|
"ts_epoch": ts_utc.timestamp(),
|
|
"local_date": ts_local.strftime("%Y-%m-%d"),
|
|
"local_hour": ts_local.strftime("%Y-%m-%d %H:00"),
|
|
"request_id": payload.get("request_id"),
|
|
"auth_index": payload.get("auth_index"),
|
|
"source": payload.get("source"),
|
|
"provider": payload.get("provider"),
|
|
"model": payload.get("model"),
|
|
"endpoint": payload.get("endpoint"),
|
|
"auth_type": payload.get("auth_type"),
|
|
"api_key_hash": api_hash,
|
|
"failed": 1 if payload.get("failed") else 0,
|
|
"latency_ms": int(payload.get("latency_ms") or 0),
|
|
"input_tokens": int(tokens.get("input_tokens") or 0),
|
|
"output_tokens": int(tokens.get("output_tokens") or 0),
|
|
"reasoning_tokens": int(tokens.get("reasoning_tokens") or 0),
|
|
"cached_tokens": int(tokens.get("cached_tokens") or 0),
|
|
"total_tokens": int(tokens.get("total_tokens") or 0),
|
|
"raw_json": raw,
|
|
}
|
|
try:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO usage_events (
|
|
event_key,timestamp,ts_epoch,local_date,local_hour,request_id,auth_index,source,
|
|
provider,model,endpoint,auth_type,api_key_hash,failed,latency_ms,input_tokens,
|
|
output_tokens,reasoning_tokens,cached_tokens,total_tokens,raw_json
|
|
) VALUES (
|
|
:event_key,:timestamp,:ts_epoch,:local_date,:local_hour,:request_id,:auth_index,:source,
|
|
:provider,:model,:endpoint,:auth_type,:api_key_hash,:failed,:latency_ms,:input_tokens,
|
|
:output_tokens,:reasoning_tokens,:cached_tokens,:total_tokens,:raw_json
|
|
)
|
|
""",
|
|
values,
|
|
)
|
|
inserted += 1
|
|
except sqlite3.IntegrityError:
|
|
pass # duplicate event_key, expected
|
|
except Exception as e:
|
|
print(f"insert_usage: unexpected error: {e} — payload keys: {list(payload.keys())}", file=sys.stderr, flush=True)
|
|
return inserted
|
|
|
|
|
|
def latest_quota_age():
|
|
with db_connect() as conn:
|
|
row = conn.execute("SELECT MAX(ts_epoch) AS ts FROM quota_snapshots").fetchone()
|
|
return None if row["ts"] is None else time.time() - row["ts"]
|
|
|
|
|
|
def auth_files():
|
|
return sorted(glob.glob(os.path.join(JSON_DIR, "codex-*.json")))
|
|
|
|
def refresh_quota(force=False):
|
|
# Dùng lock để tránh nhiều request đồng thời
|
|
acquired = _quota_refresh_lock.acquire(blocking=False)
|
|
if not acquired:
|
|
print("refresh_quota: already running, skipping duplicate call", flush=True)
|
|
return 0
|
|
|
|
try:
|
|
cfg = load_config()
|
|
age = latest_quota_age()
|
|
if not force and age is not None and age < cfg["quota_refresh_seconds"]:
|
|
return 0
|
|
|
|
files = auth_files()
|
|
# ✅ LOG 1: Số lượng auth files tìm thấy
|
|
print(f"refresh_quota: found {len(files)} auth files in {JSON_DIR}", flush=True)
|
|
|
|
now = dt.datetime.now(dt.timezone.utc)
|
|
inserted = 0
|
|
with db_connect() as conn:
|
|
for path in files:
|
|
try:
|
|
auth = json.load(open(path))
|
|
token = auth.get("access_token")
|
|
email = auth.get("email") or os.path.basename(path)
|
|
if not token:
|
|
# ✅ LOG 2: Skip no token
|
|
print(f"refresh_quota: skipping {email} (no access_token)", flush=True)
|
|
continue
|
|
# ✅ LOG 3: Đang fetch quota cho email nào
|
|
print(f"refresh_quota: fetching quota for {email}...", flush=True)
|
|
req = urllib.request.Request(
|
|
"https://chatgpt.com/backend-api/wham/usage",
|
|
headers={
|
|
"Authorization": "Bearer " + token,
|
|
"Accept": "application/json",
|
|
"User-Agent": "codex-cli",
|
|
},
|
|
)
|
|
with urllib.request.urlopen(req, timeout=20) as resp:
|
|
data = json.load(resp)
|
|
rl = data.get("rate_limit") or {}
|
|
primary = rl.get("primary_window") or {}
|
|
secondary = rl.get("secondary_window") or {}
|
|
primary_used = int(primary.get("used_percent") or 0)
|
|
secondary_used = int(secondary.get("used_percent") or 0)
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO quota_snapshots (
|
|
timestamp,ts_epoch,email,plan,allowed,limit_reached,
|
|
primary_used_percent,primary_remaining_percent,primary_reset_at,
|
|
secondary_used_percent,secondary_remaining_percent,secondary_reset_at,
|
|
credits_balance,raw_json
|
|
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)
|
|
""",
|
|
(
|
|
now.isoformat(),
|
|
now.timestamp(),
|
|
email,
|
|
data.get("plan_type"),
|
|
1 if rl.get("allowed") else 0,
|
|
1 if rl.get("limit_reached") else 0,
|
|
primary_used,
|
|
max(0, 100 - primary_used),
|
|
epoch_to_local(primary.get("reset_at")),
|
|
secondary_used,
|
|
max(0, 100 - secondary_used),
|
|
epoch_to_local(secondary.get("reset_at")),
|
|
str((data.get("credits") or {}).get("balance", "")),
|
|
json.dumps(data, ensure_ascii=False),
|
|
),
|
|
)
|
|
inserted += 1
|
|
# ✅ LOG 4: Lưu thành công, hiển thị plan type
|
|
print(f"refresh_quota: saved quota for {email} (plan: {data.get('plan_type')})", flush=True)
|
|
except (OSError, urllib.error.URLError, urllib.error.HTTPError, json.JSONDecodeError, KeyError) as exc:
|
|
# ✅ LOG 5: Lỗi khi fetch (đã có sẵn, thêm flush=True)
|
|
print(f"quota refresh failed for {path}: {exc}", file=sys.stderr, flush=True)
|
|
# ✅ LOG 6: Tổng kết số quota snapshots đã insert
|
|
print(f"refresh_quota: inserted {inserted} quota snapshots", flush=True)
|
|
return inserted
|
|
finally:
|
|
_quota_refresh_lock.release()
|
|
|
|
def epoch_to_local(value):
|
|
if not value:
|
|
return ""
|
|
return dt.datetime.fromtimestamp(int(value), LOCAL_TZ).strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
|
|
def debug_collect():
|
|
"""Test one poll cycle and print raw queue items — does NOT write to DB."""
|
|
cfg = load_config()
|
|
print(f"[debug] Config: {cfg}", flush=True)
|
|
print(f"[debug] Connecting to {cfg['cliproxy_host']}:{cfg['cliproxy_port']} ...", flush=True)
|
|
try:
|
|
client = RespClient(cfg["cliproxy_host"], cfg["cliproxy_port"], cfg["management_key"])
|
|
except Exception as e:
|
|
print(f"[debug] Connection FAILED: {e}", file=sys.stderr, flush=True)
|
|
return
|
|
print("[debug] AUTH OK", flush=True)
|
|
try:
|
|
raw_items = client.rpop(10)
|
|
print(f"[debug] rpop returned {len(raw_items)} item(s)", flush=True)
|
|
for i, raw in enumerate(raw_items):
|
|
print(f"[debug] item[{i}]: {raw[:300]}", flush=True)
|
|
try:
|
|
payload = json.loads(raw)
|
|
print(f"[debug] parsed OK — keys: {list(payload.keys())}", flush=True)
|
|
tokens = payload.get("tokens") or {}
|
|
print(f"[debug] tokens: {tokens}", flush=True)
|
|
print(f"[debug] model: {payload.get('model')}, source: {payload.get('source')}", flush=True)
|
|
except Exception as e:
|
|
print(f"[debug] parse error: {e}", file=sys.stderr, flush=True)
|
|
finally:
|
|
client.close()
|
|
if not raw_items:
|
|
print("[debug] Queue is EMPTY — no events to collect. Make sure CLIProxy is running and receiving requests.", flush=True)
|
|
|
|
|
|
def collect_forever():
|
|
init_db()
|
|
cfg = load_config()
|
|
last_quota = 0
|
|
print(f"collector: connecting to {cfg['cliproxy_host']}:{cfg['cliproxy_port']}", flush=True)
|
|
while True:
|
|
try:
|
|
client = RespClient(cfg["cliproxy_host"], cfg["cliproxy_port"], cfg["management_key"])
|
|
print("collector: connected and authenticated OK", flush=True)
|
|
try:
|
|
while True:
|
|
raw_items = client.rpop(100)
|
|
if raw_items:
|
|
print(f"collector: got {len(raw_items)} raw item(s) from queue", flush=True)
|
|
inserted = insert_usage(raw_items)
|
|
print(f"collector: inserted {inserted}/{len(raw_items)} events into DB", flush=True)
|
|
now = time.time()
|
|
# if now - last_quota >= cfg["quota_refresh_seconds"]:
|
|
# refresh_quota(force=True)
|
|
# last_quota = now
|
|
time.sleep(cfg["poll_interval_seconds"])
|
|
finally:
|
|
client.close()
|
|
print("collector: connection closed", flush=True)
|
|
except Exception as exc:
|
|
import traceback
|
|
print(f"collector error: {exc}", file=sys.stderr, flush=True)
|
|
traceback.print_exc(file=sys.stderr)
|
|
print("collector: retrying in 5s...", file=sys.stderr, flush=True)
|
|
time.sleep(5)
|
|
|
|
|
|
def range_bounds(name):
|
|
now = dt.datetime.now(LOCAL_TZ)
|
|
if name == "5h":
|
|
start = now - dt.timedelta(hours=5)
|
|
elif name == "1h":
|
|
start = now - dt.timedelta(hours=1)
|
|
elif name == "24h":
|
|
start = now - dt.timedelta(hours=24)
|
|
elif name == "7d":
|
|
start = now - dt.timedelta(days=7)
|
|
else:
|
|
start = now.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
return start.astimezone(dt.timezone.utc).timestamp(), now.astimezone(dt.timezone.utc).timestamp()
|
|
|
|
|
|
def query_summary(range_name):
|
|
start, end = range_bounds(range_name)
|
|
with db_connect() as conn:
|
|
total = conn.execute(
|
|
"""
|
|
SELECT COUNT(*) requests,
|
|
COALESCE(SUM(total_tokens),0) total_tokens,
|
|
COALESCE(SUM(input_tokens),0) input_tokens,
|
|
COALESCE(SUM(output_tokens),0) output_tokens,
|
|
COALESCE(SUM(reasoning_tokens),0) reasoning_tokens,
|
|
COALESCE(SUM(cached_tokens),0) cached_tokens,
|
|
COALESCE(SUM(failed),0) failed
|
|
FROM usage_events WHERE ts_epoch BETWEEN ? AND ?
|
|
""",
|
|
(start, end),
|
|
).fetchone()
|
|
accounts = conn.execute(
|
|
"""
|
|
SELECT COALESCE(source, auth_index, 'unknown') account,
|
|
COUNT(*) requests,
|
|
COALESCE(SUM(total_tokens),0) total_tokens,
|
|
COALESCE(SUM(input_tokens),0) input_tokens,
|
|
COALESCE(SUM(output_tokens),0) output_tokens,
|
|
COALESCE(SUM(reasoning_tokens),0) reasoning_tokens,
|
|
COALESCE(SUM(failed),0) failed
|
|
FROM usage_events WHERE ts_epoch BETWEEN ? AND ?
|
|
GROUP BY account ORDER BY total_tokens DESC
|
|
""",
|
|
(start, end),
|
|
).fetchall()
|
|
models = conn.execute(
|
|
"""
|
|
SELECT COALESCE(model, 'unknown') model,
|
|
COUNT(*) requests,
|
|
COALESCE(SUM(total_tokens),0) total_tokens,
|
|
COALESCE(SUM(failed),0) failed
|
|
FROM usage_events WHERE ts_epoch BETWEEN ? AND ?
|
|
GROUP BY model ORDER BY total_tokens DESC LIMIT 12
|
|
""",
|
|
(start, end),
|
|
).fetchall()
|
|
hours = conn.execute(
|
|
"""
|
|
SELECT local_hour hour,
|
|
COUNT(*) requests,
|
|
COALESCE(SUM(total_tokens),0) total_tokens,
|
|
COALESCE(SUM(failed),0) failed
|
|
FROM usage_events WHERE ts_epoch BETWEEN ? AND ?
|
|
GROUP BY local_hour ORDER BY local_hour
|
|
""",
|
|
(start, end),
|
|
).fetchall()
|
|
return {
|
|
"range": range_name,
|
|
"summary": dict(total),
|
|
"accounts": [dict(x) for x in accounts],
|
|
"models": [dict(x) for x in models],
|
|
"hours": [dict(x) for x in hours],
|
|
}
|
|
|
|
|
|
def latest_quotas(force=False):
|
|
if force:
|
|
refresh_quota(force=True)
|
|
with db_connect() as conn:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT q.* FROM quota_snapshots q
|
|
JOIN (
|
|
SELECT email, MAX(ts_epoch) ts FROM quota_snapshots GROUP BY email
|
|
) latest ON latest.email = q.email AND latest.ts = q.ts_epoch
|
|
ORDER BY email
|
|
"""
|
|
).fetchall()
|
|
return [dict(row) for row in rows]
|
|
|
|
|
|
def recent_requests(limit=100):
|
|
with db_connect() as conn:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT timestamp, source, auth_index, model, endpoint, failed, latency_ms,
|
|
input_tokens, output_tokens, reasoning_tokens, cached_tokens, total_tokens, request_id
|
|
FROM usage_events ORDER BY ts_epoch DESC LIMIT ?
|
|
""",
|
|
(limit,),
|
|
).fetchall()
|
|
result = []
|
|
for row in rows:
|
|
item = dict(row)
|
|
item["local_time"] = parse_rfc3339(item["timestamp"]).astimezone(LOCAL_TZ).strftime("%Y-%m-%d %H:%M:%S")
|
|
result.append(item)
|
|
return result
|
|
|
|
|
|
def json_response(handler, payload, status=200):
|
|
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
|
|
handler.send_response(status)
|
|
handler.send_header("Content-Type", "application/json; charset=utf-8")
|
|
handler.send_header("Content-Length", str(len(body)))
|
|
handler.send_header("Cache-Control", "no-store")
|
|
handler.end_headers()
|
|
handler.wfile.write(body)
|
|
|
|
|
|
class DashboardHandler(BaseHTTPRequestHandler):
|
|
def log_message(self, fmt, *args):
|
|
return
|
|
|
|
def do_GET(self):
|
|
parsed = urlparse(self.path)
|
|
qs = parse_qs(parsed.query)
|
|
try:
|
|
if parsed.path == "/":
|
|
self.serve_html()
|
|
elif parsed.path == "/api/summary":
|
|
json_response(self, query_summary(qs.get("range", ["today"])[0]))
|
|
elif parsed.path == "/api/quota":
|
|
json_response(self, {"quotas": latest_quotas(force=qs.get("force", ["0"])[0] == "1")})
|
|
elif parsed.path == "/api/requests":
|
|
limit = min(500, int(qs.get("limit", ["100"])[0]))
|
|
json_response(self, {"requests": recent_requests(limit)})
|
|
elif parsed.path == "/api/health":
|
|
json_response(self, {"ok": True, "db": DB_PATH, "auth_files": len(auth_files())})
|
|
else:
|
|
json_response(self, {"error": "not found"}, 404)
|
|
except Exception as exc:
|
|
json_response(self, {"error": str(exc)}, 500)
|
|
|
|
def serve_html(self):
|
|
try:
|
|
with open(HTML_PATH, "rb") as f:
|
|
body = f.read()
|
|
except FileNotFoundError:
|
|
body = b"<h1>Dashboard HTML not found</h1><p>Expected: " + HTML_PATH.encode() + b"</p>"
|
|
self.send_response(500)
|
|
self.send_header("Content-Type", "text/html; charset=utf-8")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
return
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "text/html; charset=utf-8")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.send_header("Cache-Control", "no-store")
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
|
|
|
|
|
|
def serve():
|
|
init_db()
|
|
cfg = load_config()
|
|
server = ThreadingHTTPServer((cfg["dashboard_host"], int(cfg["dashboard_port"])), DashboardHandler)
|
|
print(f"dashboard listening on http://{cfg['dashboard_host']}:{cfg['dashboard_port']}", flush=True)
|
|
server.serve_forever()
|
|
|
|
|
|
def print_report(range_name):
|
|
init_db()
|
|
summary = query_summary(range_name)
|
|
print(json.dumps(summary, ensure_ascii=False, indent=2))
|
|
|
|
|
|
def start(open_browser=True):
|
|
"""Run init + collect (background thread) + serve in a single command."""
|
|
# 1. Init DB and config
|
|
init_db()
|
|
cfg = load_config()
|
|
host = cfg["dashboard_host"]
|
|
port = int(cfg["dashboard_port"])
|
|
|
|
# When binding to 0.0.0.0, show localhost for browser and also the LAN IP
|
|
if host in ("0.0.0.0", ""):
|
|
local_url = f"http://127.0.0.1:{port}"
|
|
try:
|
|
lan_ip = socket.gethostbyname(socket.gethostname())
|
|
except OSError:
|
|
lan_ip = None
|
|
lan_url = f"http://{lan_ip}:{port}" if lan_ip and lan_ip != "127.0.0.1" else None
|
|
else:
|
|
local_url = f"http://{host}:{port}"
|
|
lan_url = None
|
|
|
|
# 2. Start collector in a daemon thread (dies automatically when main process exits)
|
|
collector_thread = threading.Thread(target=collect_forever, daemon=True, name="collector")
|
|
collector_thread.start()
|
|
print("collector started (background thread)", flush=True)
|
|
|
|
# 3. Open browser after a short delay so the server is ready
|
|
if open_browser:
|
|
def _open():
|
|
time.sleep(1.5)
|
|
webbrowser.open(local_url)
|
|
threading.Thread(target=_open, daemon=True).start()
|
|
|
|
# 4. Start HTTP server (blocks — keeps the process alive)
|
|
server = ThreadingHTTPServer((host, port), DashboardHandler)
|
|
print(f"dashboard listening on {host}:{port}", flush=True)
|
|
print(f" local → {local_url}", flush=True)
|
|
if lan_url:
|
|
print(f" LAN → {lan_url}", flush=True)
|
|
print("Press Ctrl+C to stop.", flush=True)
|
|
try:
|
|
server.serve_forever()
|
|
except KeyboardInterrupt:
|
|
print("\nShutting down.", flush=True)
|
|
server.shutdown()
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="CLIProxyAPI usage dashboard")
|
|
sub = parser.add_subparsers(dest="cmd", required=True)
|
|
sub.add_parser("init", help="Initialise database and config")
|
|
sub.add_parser("collect", help="Run collector loop (foreground)")
|
|
sub.add_parser("debug", help="Test one poll cycle, print raw queue items (no DB write)")
|
|
sub.add_parser("serve", help="Run HTTP server only (no collector)")
|
|
start_p = sub.add_parser("start", help="Init + collect + serve in one command (recommended)")
|
|
start_p.add_argument("--no-browser", action="store_true", help="Do not open browser automatically")
|
|
quota_p = sub.add_parser("quota", help="Show current quota snapshots")
|
|
quota_p.add_argument("--force", action="store_true")
|
|
report_p = sub.add_parser("report", help="Print usage summary as JSON")
|
|
report_p.add_argument("range", choices=["today", "1h", "5h", "24h", "7d"])
|
|
args = parser.parse_args()
|
|
if args.cmd == "init":
|
|
init_db()
|
|
load_config()
|
|
print(DB_PATH)
|
|
elif args.cmd == "collect":
|
|
collect_forever()
|
|
elif args.cmd == "debug":
|
|
debug_collect()
|
|
elif args.cmd == "serve":
|
|
serve()
|
|
elif args.cmd == "start":
|
|
start(open_browser=not args.no_browser)
|
|
elif args.cmd == "quota":
|
|
init_db()
|
|
print(json.dumps({"quotas": latest_quotas(force=args.force)}, ensure_ascii=False, indent=2))
|
|
elif args.cmd == "report":
|
|
print_report(args.range)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |