#!/usr/bin/env python3 import argparse import datetime as dt import glob import hashlib import json import os import socket import sqlite3 import sys import time import threading import urllib.error import urllib.request import webbrowser from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from urllib.parse import parse_qs, urlparse from zoneinfo import ZoneInfo _quota_refresh_lock = threading.Lock() # Path to the companion HTML file (same directory as this script) HTML_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "usage-dashboard.html") JSON_DIR = os.path.expanduser("~/.cli-proxy-api") BASE_DIR = os.path.expanduser("~/cliproxyapi/usage-dashboard") AUTH_DIR = os.path.expanduser("~/cliproxyapi") DB_PATH = os.path.join(BASE_DIR, "usage.sqlite") CONFIG_PATH = os.path.join(BASE_DIR, "config.json") LOCAL_TZ = ZoneInfo("Asia/Ho_Chi_Minh") DEFAULT_CONFIG = { "cliproxy_host": "127.0.0.1", "cliproxy_port": 8317, "management_key": "123456", "poll_interval_seconds": 2, "quota_refresh_seconds": 300, "dashboard_host": "0.0.0.0", "dashboard_port": 8320, } def ensure_dirs(): os.makedirs(BASE_DIR, exist_ok=True) def load_config(): ensure_dirs() if not os.path.exists(CONFIG_PATH): with open(CONFIG_PATH, "w") as f: json.dump(DEFAULT_CONFIG, f, indent=2) os.chmod(CONFIG_PATH, 0o600) with open(CONFIG_PATH) as f: cfg = json.load(f) merged = dict(DEFAULT_CONFIG) merged.update(cfg) merged["management_key"] = os.environ.get("CLIPROXY_MANAGEMENT_KEY", merged["management_key"]) return merged def db_connect(): ensure_dirs() conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA busy_timeout=5000") return conn def init_db(): with db_connect() as conn: conn.executescript( """ CREATE TABLE IF NOT EXISTS usage_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, event_key TEXT NOT NULL UNIQUE, timestamp TEXT NOT NULL, ts_epoch REAL NOT NULL, local_date TEXT NOT NULL, local_hour TEXT NOT NULL, request_id TEXT, auth_index TEXT, source TEXT, provider TEXT, model TEXT, endpoint TEXT, auth_type TEXT, api_key_hash TEXT, failed INTEGER NOT NULL DEFAULT 0, latency_ms INTEGER DEFAULT 0, input_tokens INTEGER DEFAULT 0, output_tokens INTEGER DEFAULT 0, reasoning_tokens INTEGER DEFAULT 0, cached_tokens INTEGER DEFAULT 0, total_tokens INTEGER DEFAULT 0, raw_json TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_usage_ts ON usage_events(ts_epoch); CREATE INDEX IF NOT EXISTS idx_usage_date ON usage_events(local_date); CREATE INDEX IF NOT EXISTS idx_usage_source ON usage_events(source); CREATE INDEX IF NOT EXISTS idx_usage_auth ON usage_events(auth_index); CREATE TABLE IF NOT EXISTS quota_snapshots ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, ts_epoch REAL NOT NULL, email TEXT NOT NULL, plan TEXT, allowed INTEGER, limit_reached INTEGER, primary_used_percent INTEGER, primary_remaining_percent INTEGER, primary_reset_at TEXT, secondary_used_percent INTEGER, secondary_remaining_percent INTEGER, secondary_reset_at TEXT, credits_balance TEXT, raw_json TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_quota_email_ts ON quota_snapshots(email, ts_epoch); """ ) def parse_rfc3339(value): if not value: return dt.datetime.now(dt.timezone.utc) text = value.replace("Z", "+00:00") try: parsed = dt.datetime.fromisoformat(text) except ValueError: return dt.datetime.now(dt.timezone.utc) if parsed.tzinfo is None: parsed = parsed.replace(tzinfo=dt.timezone.utc) return parsed.astimezone(dt.timezone.utc) def resp_command(*parts): data = [f"*{len(parts)}\r\n".encode()] for part in parts: b = str(part).encode() data.append(f"${len(b)}\r\n".encode()) data.append(b + b"\r\n") return b"".join(data) class RespClient: def __init__(self, host, port, password, timeout=10): if not password: raise RuntimeError("management_key is required in config.json or CLIPROXY_MANAGEMENT_KEY") self.sock = socket.create_connection((host, port), timeout=timeout) self.file = self.sock.makefile("rb") self.send("AUTH", password) reply = self.read() if not (isinstance(reply, str) and reply.upper() == "OK"): raise RuntimeError(f"AUTH failed: {reply!r}") def close(self): try: self.file.close() finally: self.sock.close() def send(self, *parts): self.sock.sendall(resp_command(*parts)) def read_line(self): line = self.file.readline() if not line: raise EOFError("RESP connection closed") return line.rstrip(b"\r\n") def read(self): line = self.read_line() prefix = line[:1] payload = line[1:] if prefix == b"+": return payload.decode() if prefix == b"-": raise RuntimeError(payload.decode()) if prefix == b":": return int(payload) if prefix == b"$": length = int(payload) if length == -1: return None data = self.file.read(length) self.file.read(2) return data.decode("utf-8", "replace") if prefix == b"*": count = int(payload) if count == -1: return None return [self.read() for _ in range(count)] raise RuntimeError(f"Unknown RESP prefix: {line!r}") def rpop(self, count=100): result = [] for _ in range(count): self.send("RPOP", "queue") item = self.read() if item is None: break result.append(item) return result def event_key(payload, raw): rid = payload.get("request_id") if rid: return rid return hashlib.sha256(raw.encode()).hexdigest() def insert_usage(raw_items): inserted = 0 with db_connect() as conn: for raw in raw_items: try: payload = json.loads(raw) except json.JSONDecodeError as e: print(f"insert_usage: JSON decode error: {e} — raw: {raw[:120]!r}", file=sys.stderr, flush=True) continue ts_utc = parse_rfc3339(payload.get("timestamp")) ts_local = ts_utc.astimezone(LOCAL_TZ) tokens = payload.get("tokens") or {} api_key = payload.get("api_key") or "" api_hash = hashlib.sha256(api_key.encode()).hexdigest()[:12] if api_key else "" values = { "event_key": event_key(payload, raw), "timestamp": ts_utc.isoformat(), "ts_epoch": ts_utc.timestamp(), "local_date": ts_local.strftime("%Y-%m-%d"), "local_hour": ts_local.strftime("%Y-%m-%d %H:00"), "request_id": payload.get("request_id"), "auth_index": payload.get("auth_index"), "source": payload.get("source"), "provider": payload.get("provider"), "model": payload.get("model"), "endpoint": payload.get("endpoint"), "auth_type": payload.get("auth_type"), "api_key_hash": api_hash, "failed": 1 if payload.get("failed") else 0, "latency_ms": int(payload.get("latency_ms") or 0), "input_tokens": int(tokens.get("input_tokens") or 0), "output_tokens": int(tokens.get("output_tokens") or 0), "reasoning_tokens": int(tokens.get("reasoning_tokens") or 0), "cached_tokens": int(tokens.get("cached_tokens") or 0), "total_tokens": int(tokens.get("total_tokens") or 0), "raw_json": raw, } try: conn.execute( """ INSERT INTO usage_events ( event_key,timestamp,ts_epoch,local_date,local_hour,request_id,auth_index,source, provider,model,endpoint,auth_type,api_key_hash,failed,latency_ms,input_tokens, output_tokens,reasoning_tokens,cached_tokens,total_tokens,raw_json ) VALUES ( :event_key,:timestamp,:ts_epoch,:local_date,:local_hour,:request_id,:auth_index,:source, :provider,:model,:endpoint,:auth_type,:api_key_hash,:failed,:latency_ms,:input_tokens, :output_tokens,:reasoning_tokens,:cached_tokens,:total_tokens,:raw_json ) """, values, ) inserted += 1 except sqlite3.IntegrityError: pass # duplicate event_key, expected except Exception as e: print(f"insert_usage: unexpected error: {e} — payload keys: {list(payload.keys())}", file=sys.stderr, flush=True) return inserted def latest_quota_age(): with db_connect() as conn: row = conn.execute("SELECT MAX(ts_epoch) AS ts FROM quota_snapshots").fetchone() return None if row["ts"] is None else time.time() - row["ts"] def auth_files(): return sorted(glob.glob(os.path.join(JSON_DIR, "codex-*.json"))) def refresh_quota(force=False): # Dùng lock để tránh nhiều request đồng thời acquired = _quota_refresh_lock.acquire(blocking=False) if not acquired: print("refresh_quota: already running, skipping duplicate call", flush=True) return 0 try: cfg = load_config() age = latest_quota_age() if not force and age is not None and age < cfg["quota_refresh_seconds"]: return 0 files = auth_files() # ✅ LOG 1: Số lượng auth files tìm thấy print(f"refresh_quota: found {len(files)} auth files in {JSON_DIR}", flush=True) now = dt.datetime.now(dt.timezone.utc) inserted = 0 with db_connect() as conn: for path in files: try: auth = json.load(open(path)) token = auth.get("access_token") email = auth.get("email") or os.path.basename(path) if not token: # ✅ LOG 2: Skip no token print(f"refresh_quota: skipping {email} (no access_token)", flush=True) continue # ✅ LOG 3: Đang fetch quota cho email nào print(f"refresh_quota: fetching quota for {email}...", flush=True) req = urllib.request.Request( "https://chatgpt.com/backend-api/wham/usage", headers={ "Authorization": "Bearer " + token, "Accept": "application/json", "User-Agent": "codex-cli", }, ) with urllib.request.urlopen(req, timeout=20) as resp: data = json.load(resp) rl = data.get("rate_limit") or {} primary = rl.get("primary_window") or {} secondary = rl.get("secondary_window") or {} primary_used = int(primary.get("used_percent") or 0) secondary_used = int(secondary.get("used_percent") or 0) conn.execute( """ INSERT INTO quota_snapshots ( timestamp,ts_epoch,email,plan,allowed,limit_reached, primary_used_percent,primary_remaining_percent,primary_reset_at, secondary_used_percent,secondary_remaining_percent,secondary_reset_at, credits_balance,raw_json ) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?) """, ( now.isoformat(), now.timestamp(), email, data.get("plan_type"), 1 if rl.get("allowed") else 0, 1 if rl.get("limit_reached") else 0, primary_used, max(0, 100 - primary_used), epoch_to_local(primary.get("reset_at")), secondary_used, max(0, 100 - secondary_used), epoch_to_local(secondary.get("reset_at")), str((data.get("credits") or {}).get("balance", "")), json.dumps(data, ensure_ascii=False), ), ) inserted += 1 # ✅ LOG 4: Lưu thành công, hiển thị plan type print(f"refresh_quota: saved quota for {email} (plan: {data.get('plan_type')})", flush=True) except (OSError, urllib.error.URLError, urllib.error.HTTPError, json.JSONDecodeError, KeyError) as exc: # ✅ LOG 5: Lỗi khi fetch (đã có sẵn, thêm flush=True) print(f"quota refresh failed for {path}: {exc}", file=sys.stderr, flush=True) # ✅ LOG 6: Tổng kết số quota snapshots đã insert print(f"refresh_quota: inserted {inserted} quota snapshots", flush=True) return inserted finally: _quota_refresh_lock.release() def epoch_to_local(value): if not value: return "" return dt.datetime.fromtimestamp(int(value), LOCAL_TZ).strftime("%Y-%m-%d %H:%M:%S") def debug_collect(): """Test one poll cycle and print raw queue items — does NOT write to DB.""" cfg = load_config() print(f"[debug] Config: {cfg}", flush=True) print(f"[debug] Connecting to {cfg['cliproxy_host']}:{cfg['cliproxy_port']} ...", flush=True) try: client = RespClient(cfg["cliproxy_host"], cfg["cliproxy_port"], cfg["management_key"]) except Exception as e: print(f"[debug] Connection FAILED: {e}", file=sys.stderr, flush=True) return print("[debug] AUTH OK", flush=True) try: raw_items = client.rpop(10) print(f"[debug] rpop returned {len(raw_items)} item(s)", flush=True) for i, raw in enumerate(raw_items): print(f"[debug] item[{i}]: {raw[:300]}", flush=True) try: payload = json.loads(raw) print(f"[debug] parsed OK — keys: {list(payload.keys())}", flush=True) tokens = payload.get("tokens") or {} print(f"[debug] tokens: {tokens}", flush=True) print(f"[debug] model: {payload.get('model')}, source: {payload.get('source')}", flush=True) except Exception as e: print(f"[debug] parse error: {e}", file=sys.stderr, flush=True) finally: client.close() if not raw_items: print("[debug] Queue is EMPTY — no events to collect. Make sure CLIProxy is running and receiving requests.", flush=True) def collect_forever(): init_db() cfg = load_config() last_quota = 0 print(f"collector: connecting to {cfg['cliproxy_host']}:{cfg['cliproxy_port']}", flush=True) while True: try: client = RespClient(cfg["cliproxy_host"], cfg["cliproxy_port"], cfg["management_key"]) print("collector: connected and authenticated OK", flush=True) try: while True: raw_items = client.rpop(100) if raw_items: print(f"collector: got {len(raw_items)} raw item(s) from queue", flush=True) inserted = insert_usage(raw_items) print(f"collector: inserted {inserted}/{len(raw_items)} events into DB", flush=True) now = time.time() # if now - last_quota >= cfg["quota_refresh_seconds"]: # refresh_quota(force=True) # last_quota = now time.sleep(cfg["poll_interval_seconds"]) finally: client.close() print("collector: connection closed", flush=True) except Exception as exc: import traceback print(f"collector error: {exc}", file=sys.stderr, flush=True) traceback.print_exc(file=sys.stderr) print("collector: retrying in 5s...", file=sys.stderr, flush=True) time.sleep(5) def range_bounds(name): now = dt.datetime.now(LOCAL_TZ) if name == "5h": start = now - dt.timedelta(hours=5) elif name == "1h": start = now - dt.timedelta(hours=1) elif name == "24h": start = now - dt.timedelta(hours=24) elif name == "7d": start = now - dt.timedelta(days=7) else: start = now.replace(hour=0, minute=0, second=0, microsecond=0) return start.astimezone(dt.timezone.utc).timestamp(), now.astimezone(dt.timezone.utc).timestamp() def query_summary(range_name): start, end = range_bounds(range_name) with db_connect() as conn: total = conn.execute( """ SELECT COUNT(*) requests, COALESCE(SUM(total_tokens),0) total_tokens, COALESCE(SUM(input_tokens),0) input_tokens, COALESCE(SUM(output_tokens),0) output_tokens, COALESCE(SUM(reasoning_tokens),0) reasoning_tokens, COALESCE(SUM(cached_tokens),0) cached_tokens, COALESCE(SUM(failed),0) failed FROM usage_events WHERE ts_epoch BETWEEN ? AND ? """, (start, end), ).fetchone() accounts = conn.execute( """ SELECT COALESCE(source, auth_index, 'unknown') account, COUNT(*) requests, COALESCE(SUM(total_tokens),0) total_tokens, COALESCE(SUM(input_tokens),0) input_tokens, COALESCE(SUM(output_tokens),0) output_tokens, COALESCE(SUM(reasoning_tokens),0) reasoning_tokens, COALESCE(SUM(failed),0) failed FROM usage_events WHERE ts_epoch BETWEEN ? AND ? GROUP BY account ORDER BY total_tokens DESC """, (start, end), ).fetchall() models = conn.execute( """ SELECT COALESCE(model, 'unknown') model, COUNT(*) requests, COALESCE(SUM(total_tokens),0) total_tokens, COALESCE(SUM(failed),0) failed FROM usage_events WHERE ts_epoch BETWEEN ? AND ? GROUP BY model ORDER BY total_tokens DESC LIMIT 12 """, (start, end), ).fetchall() hours = conn.execute( """ SELECT local_hour hour, COUNT(*) requests, COALESCE(SUM(total_tokens),0) total_tokens, COALESCE(SUM(failed),0) failed FROM usage_events WHERE ts_epoch BETWEEN ? AND ? GROUP BY local_hour ORDER BY local_hour """, (start, end), ).fetchall() return { "range": range_name, "summary": dict(total), "accounts": [dict(x) for x in accounts], "models": [dict(x) for x in models], "hours": [dict(x) for x in hours], } def latest_quotas(force=False): if force: refresh_quota(force=True) with db_connect() as conn: rows = conn.execute( """ SELECT q.* FROM quota_snapshots q JOIN ( SELECT email, MAX(ts_epoch) ts FROM quota_snapshots GROUP BY email ) latest ON latest.email = q.email AND latest.ts = q.ts_epoch ORDER BY email """ ).fetchall() return [dict(row) for row in rows] def recent_requests(limit=100): with db_connect() as conn: rows = conn.execute( """ SELECT timestamp, source, auth_index, model, endpoint, failed, latency_ms, input_tokens, output_tokens, reasoning_tokens, cached_tokens, total_tokens, request_id FROM usage_events ORDER BY ts_epoch DESC LIMIT ? """, (limit,), ).fetchall() result = [] for row in rows: item = dict(row) item["local_time"] = parse_rfc3339(item["timestamp"]).astimezone(LOCAL_TZ).strftime("%Y-%m-%d %H:%M:%S") result.append(item) return result def json_response(handler, payload, status=200): body = json.dumps(payload, ensure_ascii=False).encode("utf-8") handler.send_response(status) handler.send_header("Content-Type", "application/json; charset=utf-8") handler.send_header("Content-Length", str(len(body))) handler.send_header("Cache-Control", "no-store") handler.end_headers() handler.wfile.write(body) class DashboardHandler(BaseHTTPRequestHandler): def log_message(self, fmt, *args): return def do_GET(self): parsed = urlparse(self.path) qs = parse_qs(parsed.query) try: if parsed.path == "/": self.serve_html() elif parsed.path == "/api/summary": json_response(self, query_summary(qs.get("range", ["today"])[0])) elif parsed.path == "/api/quota": json_response(self, {"quotas": latest_quotas(force=qs.get("force", ["0"])[0] == "1")}) elif parsed.path == "/api/requests": limit = min(500, int(qs.get("limit", ["100"])[0])) json_response(self, {"requests": recent_requests(limit)}) elif parsed.path == "/api/health": json_response(self, {"ok": True, "db": DB_PATH, "auth_files": len(auth_files())}) else: json_response(self, {"error": "not found"}, 404) except Exception as exc: json_response(self, {"error": str(exc)}, 500) def serve_html(self): try: with open(HTML_PATH, "rb") as f: body = f.read() except FileNotFoundError: body = b"

Dashboard HTML not found

Expected: " + HTML_PATH.encode() + b"

" self.send_response(500) self.send_header("Content-Type", "text/html; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) return self.send_response(200) self.send_header("Content-Type", "text/html; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.send_header("Cache-Control", "no-store") self.end_headers() self.wfile.write(body) def serve(): init_db() cfg = load_config() server = ThreadingHTTPServer((cfg["dashboard_host"], int(cfg["dashboard_port"])), DashboardHandler) print(f"dashboard listening on http://{cfg['dashboard_host']}:{cfg['dashboard_port']}", flush=True) server.serve_forever() def print_report(range_name): init_db() summary = query_summary(range_name) print(json.dumps(summary, ensure_ascii=False, indent=2)) def start(open_browser=True): """Run init + collect (background thread) + serve in a single command.""" # 1. Init DB and config init_db() cfg = load_config() host = cfg["dashboard_host"] port = int(cfg["dashboard_port"]) # When binding to 0.0.0.0, show localhost for browser and also the LAN IP if host in ("0.0.0.0", ""): local_url = f"http://127.0.0.1:{port}" try: lan_ip = socket.gethostbyname(socket.gethostname()) except OSError: lan_ip = None lan_url = f"http://{lan_ip}:{port}" if lan_ip and lan_ip != "127.0.0.1" else None else: local_url = f"http://{host}:{port}" lan_url = None # 2. Start collector in a daemon thread (dies automatically when main process exits) collector_thread = threading.Thread(target=collect_forever, daemon=True, name="collector") collector_thread.start() print("collector started (background thread)", flush=True) # 3. Open browser after a short delay so the server is ready if open_browser: def _open(): time.sleep(1.5) webbrowser.open(local_url) threading.Thread(target=_open, daemon=True).start() # 4. Start HTTP server (blocks — keeps the process alive) server = ThreadingHTTPServer((host, port), DashboardHandler) print(f"dashboard listening on {host}:{port}", flush=True) print(f" local → {local_url}", flush=True) if lan_url: print(f" LAN → {lan_url}", flush=True) print("Press Ctrl+C to stop.", flush=True) try: server.serve_forever() except KeyboardInterrupt: print("\nShutting down.", flush=True) server.shutdown() def main(): parser = argparse.ArgumentParser(description="CLIProxyAPI usage dashboard") sub = parser.add_subparsers(dest="cmd", required=True) sub.add_parser("init", help="Initialise database and config") sub.add_parser("collect", help="Run collector loop (foreground)") sub.add_parser("debug", help="Test one poll cycle, print raw queue items (no DB write)") sub.add_parser("serve", help="Run HTTP server only (no collector)") start_p = sub.add_parser("start", help="Init + collect + serve in one command (recommended)") start_p.add_argument("--no-browser", action="store_true", help="Do not open browser automatically") quota_p = sub.add_parser("quota", help="Show current quota snapshots") quota_p.add_argument("--force", action="store_true") report_p = sub.add_parser("report", help="Print usage summary as JSON") report_p.add_argument("range", choices=["today", "1h", "5h", "24h", "7d"]) args = parser.parse_args() if args.cmd == "init": init_db() load_config() print(DB_PATH) elif args.cmd == "collect": collect_forever() elif args.cmd == "debug": debug_collect() elif args.cmd == "serve": serve() elif args.cmd == "start": start(open_browser=not args.no_browser) elif args.cmd == "quota": init_db() print(json.dumps({"quotas": latest_quotas(force=args.force)}, ensure_ascii=False, indent=2)) elif args.cmd == "report": print_report(args.range) if __name__ == "__main__": main()