update .gitignore
This commit is contained in:
@@ -0,0 +1,695 @@
|
||||
#!/usr/bin/env python3
|
||||
import argparse
|
||||
import datetime as dt
|
||||
import glob
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import socket
|
||||
import sqlite3
|
||||
import sys
|
||||
import time
|
||||
import threading
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
import webbrowser
|
||||
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
||||
from urllib.parse import parse_qs, urlparse
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
_quota_refresh_lock = threading.Lock()
|
||||
# Path to the companion HTML file (same directory as this script)
|
||||
HTML_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "usage-dashboard.html")
|
||||
JSON_DIR = os.path.expanduser("~/.cli-proxy-api")
|
||||
BASE_DIR = os.path.expanduser("~/cliproxyapi/usage-dashboard")
|
||||
AUTH_DIR = os.path.expanduser("~/cliproxyapi")
|
||||
DB_PATH = os.path.join(BASE_DIR, "usage.sqlite")
|
||||
CONFIG_PATH = os.path.join(BASE_DIR, "config.json")
|
||||
LOCAL_TZ = ZoneInfo("Asia/Ho_Chi_Minh")
|
||||
|
||||
|
||||
DEFAULT_CONFIG = {
|
||||
"cliproxy_host": "127.0.0.1",
|
||||
"cliproxy_port": 8317,
|
||||
"management_key": "123456",
|
||||
"poll_interval_seconds": 2,
|
||||
"quota_refresh_seconds": 300,
|
||||
"dashboard_host": "0.0.0.0",
|
||||
"dashboard_port": 8320,
|
||||
}
|
||||
|
||||
|
||||
def ensure_dirs():
|
||||
os.makedirs(BASE_DIR, exist_ok=True)
|
||||
|
||||
|
||||
def load_config():
|
||||
ensure_dirs()
|
||||
if not os.path.exists(CONFIG_PATH):
|
||||
with open(CONFIG_PATH, "w") as f:
|
||||
json.dump(DEFAULT_CONFIG, f, indent=2)
|
||||
os.chmod(CONFIG_PATH, 0o600)
|
||||
with open(CONFIG_PATH) as f:
|
||||
cfg = json.load(f)
|
||||
merged = dict(DEFAULT_CONFIG)
|
||||
merged.update(cfg)
|
||||
merged["management_key"] = os.environ.get("CLIPROXY_MANAGEMENT_KEY", merged["management_key"])
|
||||
return merged
|
||||
|
||||
|
||||
def db_connect():
|
||||
ensure_dirs()
|
||||
conn = sqlite3.connect(DB_PATH)
|
||||
conn.row_factory = sqlite3.Row
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
conn.execute("PRAGMA busy_timeout=5000")
|
||||
return conn
|
||||
|
||||
|
||||
def init_db():
|
||||
with db_connect() as conn:
|
||||
conn.executescript(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS usage_events (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
event_key TEXT NOT NULL UNIQUE,
|
||||
timestamp TEXT NOT NULL,
|
||||
ts_epoch REAL NOT NULL,
|
||||
local_date TEXT NOT NULL,
|
||||
local_hour TEXT NOT NULL,
|
||||
request_id TEXT,
|
||||
auth_index TEXT,
|
||||
source TEXT,
|
||||
provider TEXT,
|
||||
model TEXT,
|
||||
endpoint TEXT,
|
||||
auth_type TEXT,
|
||||
api_key_hash TEXT,
|
||||
failed INTEGER NOT NULL DEFAULT 0,
|
||||
latency_ms INTEGER DEFAULT 0,
|
||||
input_tokens INTEGER DEFAULT 0,
|
||||
output_tokens INTEGER DEFAULT 0,
|
||||
reasoning_tokens INTEGER DEFAULT 0,
|
||||
cached_tokens INTEGER DEFAULT 0,
|
||||
total_tokens INTEGER DEFAULT 0,
|
||||
raw_json TEXT NOT NULL
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_usage_ts ON usage_events(ts_epoch);
|
||||
CREATE INDEX IF NOT EXISTS idx_usage_date ON usage_events(local_date);
|
||||
CREATE INDEX IF NOT EXISTS idx_usage_source ON usage_events(source);
|
||||
CREATE INDEX IF NOT EXISTS idx_usage_auth ON usage_events(auth_index);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS quota_snapshots (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
timestamp TEXT NOT NULL,
|
||||
ts_epoch REAL NOT NULL,
|
||||
email TEXT NOT NULL,
|
||||
plan TEXT,
|
||||
allowed INTEGER,
|
||||
limit_reached INTEGER,
|
||||
primary_used_percent INTEGER,
|
||||
primary_remaining_percent INTEGER,
|
||||
primary_reset_at TEXT,
|
||||
secondary_used_percent INTEGER,
|
||||
secondary_remaining_percent INTEGER,
|
||||
secondary_reset_at TEXT,
|
||||
credits_balance TEXT,
|
||||
raw_json TEXT NOT NULL
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_quota_email_ts ON quota_snapshots(email, ts_epoch);
|
||||
"""
|
||||
)
|
||||
|
||||
|
||||
def parse_rfc3339(value):
|
||||
if not value:
|
||||
return dt.datetime.now(dt.timezone.utc)
|
||||
text = value.replace("Z", "+00:00")
|
||||
try:
|
||||
parsed = dt.datetime.fromisoformat(text)
|
||||
except ValueError:
|
||||
return dt.datetime.now(dt.timezone.utc)
|
||||
if parsed.tzinfo is None:
|
||||
parsed = parsed.replace(tzinfo=dt.timezone.utc)
|
||||
return parsed.astimezone(dt.timezone.utc)
|
||||
|
||||
|
||||
def resp_command(*parts):
|
||||
data = [f"*{len(parts)}\r\n".encode()]
|
||||
for part in parts:
|
||||
b = str(part).encode()
|
||||
data.append(f"${len(b)}\r\n".encode())
|
||||
data.append(b + b"\r\n")
|
||||
return b"".join(data)
|
||||
|
||||
|
||||
class RespClient:
|
||||
def __init__(self, host, port, password, timeout=10):
|
||||
if not password:
|
||||
raise RuntimeError("management_key is required in config.json or CLIPROXY_MANAGEMENT_KEY")
|
||||
self.sock = socket.create_connection((host, port), timeout=timeout)
|
||||
self.file = self.sock.makefile("rb")
|
||||
self.send("AUTH", password)
|
||||
reply = self.read()
|
||||
if not (isinstance(reply, str) and reply.upper() == "OK"):
|
||||
raise RuntimeError(f"AUTH failed: {reply!r}")
|
||||
|
||||
def close(self):
|
||||
try:
|
||||
self.file.close()
|
||||
finally:
|
||||
self.sock.close()
|
||||
|
||||
def send(self, *parts):
|
||||
self.sock.sendall(resp_command(*parts))
|
||||
|
||||
def read_line(self):
|
||||
line = self.file.readline()
|
||||
if not line:
|
||||
raise EOFError("RESP connection closed")
|
||||
return line.rstrip(b"\r\n")
|
||||
|
||||
def read(self):
|
||||
line = self.read_line()
|
||||
prefix = line[:1]
|
||||
payload = line[1:]
|
||||
if prefix == b"+":
|
||||
return payload.decode()
|
||||
if prefix == b"-":
|
||||
raise RuntimeError(payload.decode())
|
||||
if prefix == b":":
|
||||
return int(payload)
|
||||
if prefix == b"$":
|
||||
length = int(payload)
|
||||
if length == -1:
|
||||
return None
|
||||
data = self.file.read(length)
|
||||
self.file.read(2)
|
||||
return data.decode("utf-8", "replace")
|
||||
if prefix == b"*":
|
||||
count = int(payload)
|
||||
if count == -1:
|
||||
return None
|
||||
return [self.read() for _ in range(count)]
|
||||
raise RuntimeError(f"Unknown RESP prefix: {line!r}")
|
||||
|
||||
def rpop(self, count=100):
|
||||
result = []
|
||||
for _ in range(count):
|
||||
self.send("RPOP", "queue")
|
||||
item = self.read()
|
||||
if item is None:
|
||||
break
|
||||
result.append(item)
|
||||
return result
|
||||
|
||||
|
||||
def event_key(payload, raw):
|
||||
rid = payload.get("request_id")
|
||||
if rid:
|
||||
return rid
|
||||
return hashlib.sha256(raw.encode()).hexdigest()
|
||||
|
||||
|
||||
def insert_usage(raw_items):
|
||||
inserted = 0
|
||||
with db_connect() as conn:
|
||||
for raw in raw_items:
|
||||
try:
|
||||
payload = json.loads(raw)
|
||||
except json.JSONDecodeError as e:
|
||||
print(f"insert_usage: JSON decode error: {e} — raw: {raw[:120]!r}", file=sys.stderr, flush=True)
|
||||
continue
|
||||
ts_utc = parse_rfc3339(payload.get("timestamp"))
|
||||
ts_local = ts_utc.astimezone(LOCAL_TZ)
|
||||
tokens = payload.get("tokens") or {}
|
||||
api_key = payload.get("api_key") or ""
|
||||
api_hash = hashlib.sha256(api_key.encode()).hexdigest()[:12] if api_key else ""
|
||||
values = {
|
||||
"event_key": event_key(payload, raw),
|
||||
"timestamp": ts_utc.isoformat(),
|
||||
"ts_epoch": ts_utc.timestamp(),
|
||||
"local_date": ts_local.strftime("%Y-%m-%d"),
|
||||
"local_hour": ts_local.strftime("%Y-%m-%d %H:00"),
|
||||
"request_id": payload.get("request_id"),
|
||||
"auth_index": payload.get("auth_index"),
|
||||
"source": payload.get("source"),
|
||||
"provider": payload.get("provider"),
|
||||
"model": payload.get("model"),
|
||||
"endpoint": payload.get("endpoint"),
|
||||
"auth_type": payload.get("auth_type"),
|
||||
"api_key_hash": api_hash,
|
||||
"failed": 1 if payload.get("failed") else 0,
|
||||
"latency_ms": int(payload.get("latency_ms") or 0),
|
||||
"input_tokens": int(tokens.get("input_tokens") or 0),
|
||||
"output_tokens": int(tokens.get("output_tokens") or 0),
|
||||
"reasoning_tokens": int(tokens.get("reasoning_tokens") or 0),
|
||||
"cached_tokens": int(tokens.get("cached_tokens") or 0),
|
||||
"total_tokens": int(tokens.get("total_tokens") or 0),
|
||||
"raw_json": raw,
|
||||
}
|
||||
try:
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO usage_events (
|
||||
event_key,timestamp,ts_epoch,local_date,local_hour,request_id,auth_index,source,
|
||||
provider,model,endpoint,auth_type,api_key_hash,failed,latency_ms,input_tokens,
|
||||
output_tokens,reasoning_tokens,cached_tokens,total_tokens,raw_json
|
||||
) VALUES (
|
||||
:event_key,:timestamp,:ts_epoch,:local_date,:local_hour,:request_id,:auth_index,:source,
|
||||
:provider,:model,:endpoint,:auth_type,:api_key_hash,:failed,:latency_ms,:input_tokens,
|
||||
:output_tokens,:reasoning_tokens,:cached_tokens,:total_tokens,:raw_json
|
||||
)
|
||||
""",
|
||||
values,
|
||||
)
|
||||
inserted += 1
|
||||
except sqlite3.IntegrityError:
|
||||
pass # duplicate event_key, expected
|
||||
except Exception as e:
|
||||
print(f"insert_usage: unexpected error: {e} — payload keys: {list(payload.keys())}", file=sys.stderr, flush=True)
|
||||
return inserted
|
||||
|
||||
|
||||
def latest_quota_age():
|
||||
with db_connect() as conn:
|
||||
row = conn.execute("SELECT MAX(ts_epoch) AS ts FROM quota_snapshots").fetchone()
|
||||
return None if row["ts"] is None else time.time() - row["ts"]
|
||||
|
||||
|
||||
def auth_files():
|
||||
return sorted(glob.glob(os.path.join(JSON_DIR, "codex-*.json")))
|
||||
|
||||
def refresh_quota(force=False):
|
||||
# Dùng lock để tránh nhiều request đồng thời
|
||||
acquired = _quota_refresh_lock.acquire(blocking=False)
|
||||
if not acquired:
|
||||
print("refresh_quota: already running, skipping duplicate call", flush=True)
|
||||
return 0
|
||||
|
||||
try:
|
||||
cfg = load_config()
|
||||
age = latest_quota_age()
|
||||
if not force and age is not None and age < cfg["quota_refresh_seconds"]:
|
||||
return 0
|
||||
|
||||
files = auth_files()
|
||||
# ✅ LOG 1: Số lượng auth files tìm thấy
|
||||
print(f"refresh_quota: found {len(files)} auth files in {JSON_DIR}", flush=True)
|
||||
|
||||
now = dt.datetime.now(dt.timezone.utc)
|
||||
inserted = 0
|
||||
with db_connect() as conn:
|
||||
for path in files:
|
||||
try:
|
||||
auth = json.load(open(path))
|
||||
token = auth.get("access_token")
|
||||
email = auth.get("email") or os.path.basename(path)
|
||||
if not token:
|
||||
# ✅ LOG 2: Skip no token
|
||||
print(f"refresh_quota: skipping {email} (no access_token)", flush=True)
|
||||
continue
|
||||
# ✅ LOG 3: Đang fetch quota cho email nào
|
||||
print(f"refresh_quota: fetching quota for {email}...", flush=True)
|
||||
req = urllib.request.Request(
|
||||
"https://chatgpt.com/backend-api/wham/usage",
|
||||
headers={
|
||||
"Authorization": "Bearer " + token,
|
||||
"Accept": "application/json",
|
||||
"User-Agent": "codex-cli",
|
||||
},
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=20) as resp:
|
||||
data = json.load(resp)
|
||||
rl = data.get("rate_limit") or {}
|
||||
primary = rl.get("primary_window") or {}
|
||||
secondary = rl.get("secondary_window") or {}
|
||||
primary_used = int(primary.get("used_percent") or 0)
|
||||
secondary_used = int(secondary.get("used_percent") or 0)
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO quota_snapshots (
|
||||
timestamp,ts_epoch,email,plan,allowed,limit_reached,
|
||||
primary_used_percent,primary_remaining_percent,primary_reset_at,
|
||||
secondary_used_percent,secondary_remaining_percent,secondary_reset_at,
|
||||
credits_balance,raw_json
|
||||
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)
|
||||
""",
|
||||
(
|
||||
now.isoformat(),
|
||||
now.timestamp(),
|
||||
email,
|
||||
data.get("plan_type"),
|
||||
1 if rl.get("allowed") else 0,
|
||||
1 if rl.get("limit_reached") else 0,
|
||||
primary_used,
|
||||
max(0, 100 - primary_used),
|
||||
epoch_to_local(primary.get("reset_at")),
|
||||
secondary_used,
|
||||
max(0, 100 - secondary_used),
|
||||
epoch_to_local(secondary.get("reset_at")),
|
||||
str((data.get("credits") or {}).get("balance", "")),
|
||||
json.dumps(data, ensure_ascii=False),
|
||||
),
|
||||
)
|
||||
inserted += 1
|
||||
# ✅ LOG 4: Lưu thành công, hiển thị plan type
|
||||
print(f"refresh_quota: saved quota for {email} (plan: {data.get('plan_type')})", flush=True)
|
||||
except (OSError, urllib.error.URLError, urllib.error.HTTPError, json.JSONDecodeError, KeyError) as exc:
|
||||
# ✅ LOG 5: Lỗi khi fetch (đã có sẵn, thêm flush=True)
|
||||
print(f"quota refresh failed for {path}: {exc}", file=sys.stderr, flush=True)
|
||||
# ✅ LOG 6: Tổng kết số quota snapshots đã insert
|
||||
print(f"refresh_quota: inserted {inserted} quota snapshots", flush=True)
|
||||
return inserted
|
||||
finally:
|
||||
_quota_refresh_lock.release()
|
||||
|
||||
def epoch_to_local(value):
|
||||
if not value:
|
||||
return ""
|
||||
return dt.datetime.fromtimestamp(int(value), LOCAL_TZ).strftime("%Y-%m-%d %H:%M:%S")
|
||||
|
||||
|
||||
def debug_collect():
|
||||
"""Test one poll cycle and print raw queue items — does NOT write to DB."""
|
||||
cfg = load_config()
|
||||
print(f"[debug] Config: {cfg}", flush=True)
|
||||
print(f"[debug] Connecting to {cfg['cliproxy_host']}:{cfg['cliproxy_port']} ...", flush=True)
|
||||
try:
|
||||
client = RespClient(cfg["cliproxy_host"], cfg["cliproxy_port"], cfg["management_key"])
|
||||
except Exception as e:
|
||||
print(f"[debug] Connection FAILED: {e}", file=sys.stderr, flush=True)
|
||||
return
|
||||
print("[debug] AUTH OK", flush=True)
|
||||
try:
|
||||
raw_items = client.rpop(10)
|
||||
print(f"[debug] rpop returned {len(raw_items)} item(s)", flush=True)
|
||||
for i, raw in enumerate(raw_items):
|
||||
print(f"[debug] item[{i}]: {raw[:300]}", flush=True)
|
||||
try:
|
||||
payload = json.loads(raw)
|
||||
print(f"[debug] parsed OK — keys: {list(payload.keys())}", flush=True)
|
||||
tokens = payload.get("tokens") or {}
|
||||
print(f"[debug] tokens: {tokens}", flush=True)
|
||||
print(f"[debug] model: {payload.get('model')}, source: {payload.get('source')}", flush=True)
|
||||
except Exception as e:
|
||||
print(f"[debug] parse error: {e}", file=sys.stderr, flush=True)
|
||||
finally:
|
||||
client.close()
|
||||
if not raw_items:
|
||||
print("[debug] Queue is EMPTY — no events to collect. Make sure CLIProxy is running and receiving requests.", flush=True)
|
||||
|
||||
|
||||
def collect_forever():
|
||||
init_db()
|
||||
cfg = load_config()
|
||||
last_quota = 0
|
||||
print(f"collector: connecting to {cfg['cliproxy_host']}:{cfg['cliproxy_port']}", flush=True)
|
||||
while True:
|
||||
try:
|
||||
client = RespClient(cfg["cliproxy_host"], cfg["cliproxy_port"], cfg["management_key"])
|
||||
print("collector: connected and authenticated OK", flush=True)
|
||||
try:
|
||||
while True:
|
||||
raw_items = client.rpop(100)
|
||||
if raw_items:
|
||||
print(f"collector: got {len(raw_items)} raw item(s) from queue", flush=True)
|
||||
inserted = insert_usage(raw_items)
|
||||
print(f"collector: inserted {inserted}/{len(raw_items)} events into DB", flush=True)
|
||||
now = time.time()
|
||||
# if now - last_quota >= cfg["quota_refresh_seconds"]:
|
||||
# refresh_quota(force=True)
|
||||
# last_quota = now
|
||||
time.sleep(cfg["poll_interval_seconds"])
|
||||
finally:
|
||||
client.close()
|
||||
print("collector: connection closed", flush=True)
|
||||
except Exception as exc:
|
||||
import traceback
|
||||
print(f"collector error: {exc}", file=sys.stderr, flush=True)
|
||||
traceback.print_exc(file=sys.stderr)
|
||||
print("collector: retrying in 5s...", file=sys.stderr, flush=True)
|
||||
time.sleep(5)
|
||||
|
||||
|
||||
def range_bounds(name):
|
||||
now = dt.datetime.now(LOCAL_TZ)
|
||||
if name == "5h":
|
||||
start = now - dt.timedelta(hours=5)
|
||||
elif name == "1h":
|
||||
start = now - dt.timedelta(hours=1)
|
||||
elif name == "24h":
|
||||
start = now - dt.timedelta(hours=24)
|
||||
elif name == "7d":
|
||||
start = now - dt.timedelta(days=7)
|
||||
else:
|
||||
start = now.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
return start.astimezone(dt.timezone.utc).timestamp(), now.astimezone(dt.timezone.utc).timestamp()
|
||||
|
||||
|
||||
def query_summary(range_name):
|
||||
start, end = range_bounds(range_name)
|
||||
with db_connect() as conn:
|
||||
total = conn.execute(
|
||||
"""
|
||||
SELECT COUNT(*) requests,
|
||||
COALESCE(SUM(total_tokens),0) total_tokens,
|
||||
COALESCE(SUM(input_tokens),0) input_tokens,
|
||||
COALESCE(SUM(output_tokens),0) output_tokens,
|
||||
COALESCE(SUM(reasoning_tokens),0) reasoning_tokens,
|
||||
COALESCE(SUM(cached_tokens),0) cached_tokens,
|
||||
COALESCE(SUM(failed),0) failed
|
||||
FROM usage_events WHERE ts_epoch BETWEEN ? AND ?
|
||||
""",
|
||||
(start, end),
|
||||
).fetchone()
|
||||
accounts = conn.execute(
|
||||
"""
|
||||
SELECT COALESCE(source, auth_index, 'unknown') account,
|
||||
COUNT(*) requests,
|
||||
COALESCE(SUM(total_tokens),0) total_tokens,
|
||||
COALESCE(SUM(input_tokens),0) input_tokens,
|
||||
COALESCE(SUM(output_tokens),0) output_tokens,
|
||||
COALESCE(SUM(reasoning_tokens),0) reasoning_tokens,
|
||||
COALESCE(SUM(failed),0) failed
|
||||
FROM usage_events WHERE ts_epoch BETWEEN ? AND ?
|
||||
GROUP BY account ORDER BY total_tokens DESC
|
||||
""",
|
||||
(start, end),
|
||||
).fetchall()
|
||||
models = conn.execute(
|
||||
"""
|
||||
SELECT COALESCE(model, 'unknown') model,
|
||||
COUNT(*) requests,
|
||||
COALESCE(SUM(total_tokens),0) total_tokens,
|
||||
COALESCE(SUM(failed),0) failed
|
||||
FROM usage_events WHERE ts_epoch BETWEEN ? AND ?
|
||||
GROUP BY model ORDER BY total_tokens DESC LIMIT 12
|
||||
""",
|
||||
(start, end),
|
||||
).fetchall()
|
||||
hours = conn.execute(
|
||||
"""
|
||||
SELECT local_hour hour,
|
||||
COUNT(*) requests,
|
||||
COALESCE(SUM(total_tokens),0) total_tokens,
|
||||
COALESCE(SUM(failed),0) failed
|
||||
FROM usage_events WHERE ts_epoch BETWEEN ? AND ?
|
||||
GROUP BY local_hour ORDER BY local_hour
|
||||
""",
|
||||
(start, end),
|
||||
).fetchall()
|
||||
return {
|
||||
"range": range_name,
|
||||
"summary": dict(total),
|
||||
"accounts": [dict(x) for x in accounts],
|
||||
"models": [dict(x) for x in models],
|
||||
"hours": [dict(x) for x in hours],
|
||||
}
|
||||
|
||||
|
||||
def latest_quotas(force=False):
|
||||
if force:
|
||||
refresh_quota(force=True)
|
||||
with db_connect() as conn:
|
||||
rows = conn.execute(
|
||||
"""
|
||||
SELECT q.* FROM quota_snapshots q
|
||||
JOIN (
|
||||
SELECT email, MAX(ts_epoch) ts FROM quota_snapshots GROUP BY email
|
||||
) latest ON latest.email = q.email AND latest.ts = q.ts_epoch
|
||||
ORDER BY email
|
||||
"""
|
||||
).fetchall()
|
||||
return [dict(row) for row in rows]
|
||||
|
||||
|
||||
def recent_requests(limit=100):
|
||||
with db_connect() as conn:
|
||||
rows = conn.execute(
|
||||
"""
|
||||
SELECT timestamp, source, auth_index, model, endpoint, failed, latency_ms,
|
||||
input_tokens, output_tokens, reasoning_tokens, cached_tokens, total_tokens, request_id
|
||||
FROM usage_events ORDER BY ts_epoch DESC LIMIT ?
|
||||
""",
|
||||
(limit,),
|
||||
).fetchall()
|
||||
result = []
|
||||
for row in rows:
|
||||
item = dict(row)
|
||||
item["local_time"] = parse_rfc3339(item["timestamp"]).astimezone(LOCAL_TZ).strftime("%Y-%m-%d %H:%M:%S")
|
||||
result.append(item)
|
||||
return result
|
||||
|
||||
|
||||
def json_response(handler, payload, status=200):
|
||||
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
|
||||
handler.send_response(status)
|
||||
handler.send_header("Content-Type", "application/json; charset=utf-8")
|
||||
handler.send_header("Content-Length", str(len(body)))
|
||||
handler.send_header("Cache-Control", "no-store")
|
||||
handler.end_headers()
|
||||
handler.wfile.write(body)
|
||||
|
||||
|
||||
class DashboardHandler(BaseHTTPRequestHandler):
|
||||
def log_message(self, fmt, *args):
|
||||
return
|
||||
|
||||
def do_GET(self):
|
||||
parsed = urlparse(self.path)
|
||||
qs = parse_qs(parsed.query)
|
||||
try:
|
||||
if parsed.path == "/":
|
||||
self.serve_html()
|
||||
elif parsed.path == "/api/summary":
|
||||
json_response(self, query_summary(qs.get("range", ["today"])[0]))
|
||||
elif parsed.path == "/api/quota":
|
||||
json_response(self, {"quotas": latest_quotas(force=qs.get("force", ["0"])[0] == "1")})
|
||||
elif parsed.path == "/api/requests":
|
||||
limit = min(500, int(qs.get("limit", ["100"])[0]))
|
||||
json_response(self, {"requests": recent_requests(limit)})
|
||||
elif parsed.path == "/api/health":
|
||||
json_response(self, {"ok": True, "db": DB_PATH, "auth_files": len(auth_files())})
|
||||
else:
|
||||
json_response(self, {"error": "not found"}, 404)
|
||||
except Exception as exc:
|
||||
json_response(self, {"error": str(exc)}, 500)
|
||||
|
||||
def serve_html(self):
|
||||
try:
|
||||
with open(HTML_PATH, "rb") as f:
|
||||
body = f.read()
|
||||
except FileNotFoundError:
|
||||
body = b"<h1>Dashboard HTML not found</h1><p>Expected: " + HTML_PATH.encode() + b"</p>"
|
||||
self.send_response(500)
|
||||
self.send_header("Content-Type", "text/html; charset=utf-8")
|
||||
self.send_header("Content-Length", str(len(body)))
|
||||
self.end_headers()
|
||||
self.wfile.write(body)
|
||||
return
|
||||
self.send_response(200)
|
||||
self.send_header("Content-Type", "text/html; charset=utf-8")
|
||||
self.send_header("Content-Length", str(len(body)))
|
||||
self.send_header("Cache-Control", "no-store")
|
||||
self.end_headers()
|
||||
self.wfile.write(body)
|
||||
|
||||
|
||||
|
||||
|
||||
def serve():
|
||||
init_db()
|
||||
cfg = load_config()
|
||||
server = ThreadingHTTPServer((cfg["dashboard_host"], int(cfg["dashboard_port"])), DashboardHandler)
|
||||
print(f"dashboard listening on http://{cfg['dashboard_host']}:{cfg['dashboard_port']}", flush=True)
|
||||
server.serve_forever()
|
||||
|
||||
|
||||
def print_report(range_name):
|
||||
init_db()
|
||||
summary = query_summary(range_name)
|
||||
print(json.dumps(summary, ensure_ascii=False, indent=2))
|
||||
|
||||
|
||||
def start(open_browser=True):
|
||||
"""Run init + collect (background thread) + serve in a single command."""
|
||||
# 1. Init DB and config
|
||||
init_db()
|
||||
cfg = load_config()
|
||||
host = cfg["dashboard_host"]
|
||||
port = int(cfg["dashboard_port"])
|
||||
|
||||
# When binding to 0.0.0.0, show localhost for browser and also the LAN IP
|
||||
if host in ("0.0.0.0", ""):
|
||||
local_url = f"http://127.0.0.1:{port}"
|
||||
try:
|
||||
lan_ip = socket.gethostbyname(socket.gethostname())
|
||||
except OSError:
|
||||
lan_ip = None
|
||||
lan_url = f"http://{lan_ip}:{port}" if lan_ip and lan_ip != "127.0.0.1" else None
|
||||
else:
|
||||
local_url = f"http://{host}:{port}"
|
||||
lan_url = None
|
||||
|
||||
# 2. Start collector in a daemon thread (dies automatically when main process exits)
|
||||
collector_thread = threading.Thread(target=collect_forever, daemon=True, name="collector")
|
||||
collector_thread.start()
|
||||
print("collector started (background thread)", flush=True)
|
||||
|
||||
# 3. Open browser after a short delay so the server is ready
|
||||
if open_browser:
|
||||
def _open():
|
||||
time.sleep(1.5)
|
||||
webbrowser.open(local_url)
|
||||
threading.Thread(target=_open, daemon=True).start()
|
||||
|
||||
# 4. Start HTTP server (blocks — keeps the process alive)
|
||||
server = ThreadingHTTPServer((host, port), DashboardHandler)
|
||||
print(f"dashboard listening on {host}:{port}", flush=True)
|
||||
print(f" local → {local_url}", flush=True)
|
||||
if lan_url:
|
||||
print(f" LAN → {lan_url}", flush=True)
|
||||
print("Press Ctrl+C to stop.", flush=True)
|
||||
try:
|
||||
server.serve_forever()
|
||||
except KeyboardInterrupt:
|
||||
print("\nShutting down.", flush=True)
|
||||
server.shutdown()
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="CLIProxyAPI usage dashboard")
|
||||
sub = parser.add_subparsers(dest="cmd", required=True)
|
||||
sub.add_parser("init", help="Initialise database and config")
|
||||
sub.add_parser("collect", help="Run collector loop (foreground)")
|
||||
sub.add_parser("debug", help="Test one poll cycle, print raw queue items (no DB write)")
|
||||
sub.add_parser("serve", help="Run HTTP server only (no collector)")
|
||||
start_p = sub.add_parser("start", help="Init + collect + serve in one command (recommended)")
|
||||
start_p.add_argument("--no-browser", action="store_true", help="Do not open browser automatically")
|
||||
quota_p = sub.add_parser("quota", help="Show current quota snapshots")
|
||||
quota_p.add_argument("--force", action="store_true")
|
||||
report_p = sub.add_parser("report", help="Print usage summary as JSON")
|
||||
report_p.add_argument("range", choices=["today", "1h", "5h", "24h", "7d"])
|
||||
args = parser.parse_args()
|
||||
if args.cmd == "init":
|
||||
init_db()
|
||||
load_config()
|
||||
print(DB_PATH)
|
||||
elif args.cmd == "collect":
|
||||
collect_forever()
|
||||
elif args.cmd == "debug":
|
||||
debug_collect()
|
||||
elif args.cmd == "serve":
|
||||
serve()
|
||||
elif args.cmd == "start":
|
||||
start(open_browser=not args.no_browser)
|
||||
elif args.cmd == "quota":
|
||||
init_db()
|
||||
print(json.dumps({"quotas": latest_quotas(force=args.force)}, ensure_ascii=False, indent=2))
|
||||
elif args.cmd == "report":
|
||||
print_report(args.range)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user