Files
CLIProxyAPI-auto-install-on…/usage-dashboard/usage-dashboard.py
T
2026-05-21 13:28:04 +07:00

695 lines
28 KiB
Python

#!/usr/bin/env python3
import argparse
import datetime as dt
import glob
import hashlib
import json
import os
import socket
import sqlite3
import sys
import time
import threading
import urllib.error
import urllib.request
import webbrowser
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from urllib.parse import parse_qs, urlparse
from zoneinfo import ZoneInfo
_quota_refresh_lock = threading.Lock()
# Path to the companion HTML file (same directory as this script)
HTML_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "usage-dashboard.html")
JSON_DIR = os.path.expanduser("~/.cli-proxy-api")
BASE_DIR = os.path.expanduser("~/cliproxyapi/usage-dashboard")
AUTH_DIR = os.path.expanduser("~/cliproxyapi")
DB_PATH = os.path.join(BASE_DIR, "usage.sqlite")
CONFIG_PATH = os.path.join(BASE_DIR, "config.json")
LOCAL_TZ = ZoneInfo("Asia/Ho_Chi_Minh")
DEFAULT_CONFIG = {
"cliproxy_host": "127.0.0.1",
"cliproxy_port": 8317,
"management_key": "123456",
"poll_interval_seconds": 2,
"quota_refresh_seconds": 300,
"dashboard_host": "0.0.0.0",
"dashboard_port": 8320,
}
def ensure_dirs():
os.makedirs(BASE_DIR, exist_ok=True)
def load_config():
ensure_dirs()
if not os.path.exists(CONFIG_PATH):
with open(CONFIG_PATH, "w") as f:
json.dump(DEFAULT_CONFIG, f, indent=2)
os.chmod(CONFIG_PATH, 0o600)
with open(CONFIG_PATH) as f:
cfg = json.load(f)
merged = dict(DEFAULT_CONFIG)
merged.update(cfg)
merged["management_key"] = os.environ.get("CLIPROXY_MANAGEMENT_KEY", merged["management_key"])
return merged
def db_connect():
ensure_dirs()
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=5000")
return conn
def init_db():
with db_connect() as conn:
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS usage_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
event_key TEXT NOT NULL UNIQUE,
timestamp TEXT NOT NULL,
ts_epoch REAL NOT NULL,
local_date TEXT NOT NULL,
local_hour TEXT NOT NULL,
request_id TEXT,
auth_index TEXT,
source TEXT,
provider TEXT,
model TEXT,
endpoint TEXT,
auth_type TEXT,
api_key_hash TEXT,
failed INTEGER NOT NULL DEFAULT 0,
latency_ms INTEGER DEFAULT 0,
input_tokens INTEGER DEFAULT 0,
output_tokens INTEGER DEFAULT 0,
reasoning_tokens INTEGER DEFAULT 0,
cached_tokens INTEGER DEFAULT 0,
total_tokens INTEGER DEFAULT 0,
raw_json TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_usage_ts ON usage_events(ts_epoch);
CREATE INDEX IF NOT EXISTS idx_usage_date ON usage_events(local_date);
CREATE INDEX IF NOT EXISTS idx_usage_source ON usage_events(source);
CREATE INDEX IF NOT EXISTS idx_usage_auth ON usage_events(auth_index);
CREATE TABLE IF NOT EXISTS quota_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
ts_epoch REAL NOT NULL,
email TEXT NOT NULL,
plan TEXT,
allowed INTEGER,
limit_reached INTEGER,
primary_used_percent INTEGER,
primary_remaining_percent INTEGER,
primary_reset_at TEXT,
secondary_used_percent INTEGER,
secondary_remaining_percent INTEGER,
secondary_reset_at TEXT,
credits_balance TEXT,
raw_json TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_quota_email_ts ON quota_snapshots(email, ts_epoch);
"""
)
def parse_rfc3339(value):
if not value:
return dt.datetime.now(dt.timezone.utc)
text = value.replace("Z", "+00:00")
try:
parsed = dt.datetime.fromisoformat(text)
except ValueError:
return dt.datetime.now(dt.timezone.utc)
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=dt.timezone.utc)
return parsed.astimezone(dt.timezone.utc)
def resp_command(*parts):
data = [f"*{len(parts)}\r\n".encode()]
for part in parts:
b = str(part).encode()
data.append(f"${len(b)}\r\n".encode())
data.append(b + b"\r\n")
return b"".join(data)
class RespClient:
def __init__(self, host, port, password, timeout=10):
if not password:
raise RuntimeError("management_key is required in config.json or CLIPROXY_MANAGEMENT_KEY")
self.sock = socket.create_connection((host, port), timeout=timeout)
self.file = self.sock.makefile("rb")
self.send("AUTH", password)
reply = self.read()
if not (isinstance(reply, str) and reply.upper() == "OK"):
raise RuntimeError(f"AUTH failed: {reply!r}")
def close(self):
try:
self.file.close()
finally:
self.sock.close()
def send(self, *parts):
self.sock.sendall(resp_command(*parts))
def read_line(self):
line = self.file.readline()
if not line:
raise EOFError("RESP connection closed")
return line.rstrip(b"\r\n")
def read(self):
line = self.read_line()
prefix = line[:1]
payload = line[1:]
if prefix == b"+":
return payload.decode()
if prefix == b"-":
raise RuntimeError(payload.decode())
if prefix == b":":
return int(payload)
if prefix == b"$":
length = int(payload)
if length == -1:
return None
data = self.file.read(length)
self.file.read(2)
return data.decode("utf-8", "replace")
if prefix == b"*":
count = int(payload)
if count == -1:
return None
return [self.read() for _ in range(count)]
raise RuntimeError(f"Unknown RESP prefix: {line!r}")
def rpop(self, count=100):
result = []
for _ in range(count):
self.send("RPOP", "queue")
item = self.read()
if item is None:
break
result.append(item)
return result
def event_key(payload, raw):
rid = payload.get("request_id")
if rid:
return rid
return hashlib.sha256(raw.encode()).hexdigest()
def insert_usage(raw_items):
inserted = 0
with db_connect() as conn:
for raw in raw_items:
try:
payload = json.loads(raw)
except json.JSONDecodeError as e:
print(f"insert_usage: JSON decode error: {e} — raw: {raw[:120]!r}", file=sys.stderr, flush=True)
continue
ts_utc = parse_rfc3339(payload.get("timestamp"))
ts_local = ts_utc.astimezone(LOCAL_TZ)
tokens = payload.get("tokens") or {}
api_key = payload.get("api_key") or ""
api_hash = hashlib.sha256(api_key.encode()).hexdigest()[:12] if api_key else ""
values = {
"event_key": event_key(payload, raw),
"timestamp": ts_utc.isoformat(),
"ts_epoch": ts_utc.timestamp(),
"local_date": ts_local.strftime("%Y-%m-%d"),
"local_hour": ts_local.strftime("%Y-%m-%d %H:00"),
"request_id": payload.get("request_id"),
"auth_index": payload.get("auth_index"),
"source": payload.get("source"),
"provider": payload.get("provider"),
"model": payload.get("model"),
"endpoint": payload.get("endpoint"),
"auth_type": payload.get("auth_type"),
"api_key_hash": api_hash,
"failed": 1 if payload.get("failed") else 0,
"latency_ms": int(payload.get("latency_ms") or 0),
"input_tokens": int(tokens.get("input_tokens") or 0),
"output_tokens": int(tokens.get("output_tokens") or 0),
"reasoning_tokens": int(tokens.get("reasoning_tokens") or 0),
"cached_tokens": int(tokens.get("cached_tokens") or 0),
"total_tokens": int(tokens.get("total_tokens") or 0),
"raw_json": raw,
}
try:
conn.execute(
"""
INSERT INTO usage_events (
event_key,timestamp,ts_epoch,local_date,local_hour,request_id,auth_index,source,
provider,model,endpoint,auth_type,api_key_hash,failed,latency_ms,input_tokens,
output_tokens,reasoning_tokens,cached_tokens,total_tokens,raw_json
) VALUES (
:event_key,:timestamp,:ts_epoch,:local_date,:local_hour,:request_id,:auth_index,:source,
:provider,:model,:endpoint,:auth_type,:api_key_hash,:failed,:latency_ms,:input_tokens,
:output_tokens,:reasoning_tokens,:cached_tokens,:total_tokens,:raw_json
)
""",
values,
)
inserted += 1
except sqlite3.IntegrityError:
pass # duplicate event_key, expected
except Exception as e:
print(f"insert_usage: unexpected error: {e} — payload keys: {list(payload.keys())}", file=sys.stderr, flush=True)
return inserted
def latest_quota_age():
with db_connect() as conn:
row = conn.execute("SELECT MAX(ts_epoch) AS ts FROM quota_snapshots").fetchone()
return None if row["ts"] is None else time.time() - row["ts"]
def auth_files():
return sorted(glob.glob(os.path.join(JSON_DIR, "codex-*.json")))
def refresh_quota(force=False):
# Dùng lock để tránh nhiều request đồng thời
acquired = _quota_refresh_lock.acquire(blocking=False)
if not acquired:
print("refresh_quota: already running, skipping duplicate call", flush=True)
return 0
try:
cfg = load_config()
age = latest_quota_age()
if not force and age is not None and age < cfg["quota_refresh_seconds"]:
return 0
files = auth_files()
# ✅ LOG 1: Số lượng auth files tìm thấy
print(f"refresh_quota: found {len(files)} auth files in {JSON_DIR}", flush=True)
now = dt.datetime.now(dt.timezone.utc)
inserted = 0
with db_connect() as conn:
for path in files:
try:
auth = json.load(open(path))
token = auth.get("access_token")
email = auth.get("email") or os.path.basename(path)
if not token:
# ✅ LOG 2: Skip no token
print(f"refresh_quota: skipping {email} (no access_token)", flush=True)
continue
# ✅ LOG 3: Đang fetch quota cho email nào
print(f"refresh_quota: fetching quota for {email}...", flush=True)
req = urllib.request.Request(
"https://chatgpt.com/backend-api/wham/usage",
headers={
"Authorization": "Bearer " + token,
"Accept": "application/json",
"User-Agent": "codex-cli",
},
)
with urllib.request.urlopen(req, timeout=20) as resp:
data = json.load(resp)
rl = data.get("rate_limit") or {}
primary = rl.get("primary_window") or {}
secondary = rl.get("secondary_window") or {}
primary_used = int(primary.get("used_percent") or 0)
secondary_used = int(secondary.get("used_percent") or 0)
conn.execute(
"""
INSERT INTO quota_snapshots (
timestamp,ts_epoch,email,plan,allowed,limit_reached,
primary_used_percent,primary_remaining_percent,primary_reset_at,
secondary_used_percent,secondary_remaining_percent,secondary_reset_at,
credits_balance,raw_json
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)
""",
(
now.isoformat(),
now.timestamp(),
email,
data.get("plan_type"),
1 if rl.get("allowed") else 0,
1 if rl.get("limit_reached") else 0,
primary_used,
max(0, 100 - primary_used),
epoch_to_local(primary.get("reset_at")),
secondary_used,
max(0, 100 - secondary_used),
epoch_to_local(secondary.get("reset_at")),
str((data.get("credits") or {}).get("balance", "")),
json.dumps(data, ensure_ascii=False),
),
)
inserted += 1
# ✅ LOG 4: Lưu thành công, hiển thị plan type
print(f"refresh_quota: saved quota for {email} (plan: {data.get('plan_type')})", flush=True)
except (OSError, urllib.error.URLError, urllib.error.HTTPError, json.JSONDecodeError, KeyError) as exc:
# ✅ LOG 5: Lỗi khi fetch (đã có sẵn, thêm flush=True)
print(f"quota refresh failed for {path}: {exc}", file=sys.stderr, flush=True)
# ✅ LOG 6: Tổng kết số quota snapshots đã insert
print(f"refresh_quota: inserted {inserted} quota snapshots", flush=True)
return inserted
finally:
_quota_refresh_lock.release()
def epoch_to_local(value):
if not value:
return ""
return dt.datetime.fromtimestamp(int(value), LOCAL_TZ).strftime("%Y-%m-%d %H:%M:%S")
def debug_collect():
"""Test one poll cycle and print raw queue items — does NOT write to DB."""
cfg = load_config()
print(f"[debug] Config: {cfg}", flush=True)
print(f"[debug] Connecting to {cfg['cliproxy_host']}:{cfg['cliproxy_port']} ...", flush=True)
try:
client = RespClient(cfg["cliproxy_host"], cfg["cliproxy_port"], cfg["management_key"])
except Exception as e:
print(f"[debug] Connection FAILED: {e}", file=sys.stderr, flush=True)
return
print("[debug] AUTH OK", flush=True)
try:
raw_items = client.rpop(10)
print(f"[debug] rpop returned {len(raw_items)} item(s)", flush=True)
for i, raw in enumerate(raw_items):
print(f"[debug] item[{i}]: {raw[:300]}", flush=True)
try:
payload = json.loads(raw)
print(f"[debug] parsed OK — keys: {list(payload.keys())}", flush=True)
tokens = payload.get("tokens") or {}
print(f"[debug] tokens: {tokens}", flush=True)
print(f"[debug] model: {payload.get('model')}, source: {payload.get('source')}", flush=True)
except Exception as e:
print(f"[debug] parse error: {e}", file=sys.stderr, flush=True)
finally:
client.close()
if not raw_items:
print("[debug] Queue is EMPTY — no events to collect. Make sure CLIProxy is running and receiving requests.", flush=True)
def collect_forever():
init_db()
cfg = load_config()
last_quota = 0
print(f"collector: connecting to {cfg['cliproxy_host']}:{cfg['cliproxy_port']}", flush=True)
while True:
try:
client = RespClient(cfg["cliproxy_host"], cfg["cliproxy_port"], cfg["management_key"])
print("collector: connected and authenticated OK", flush=True)
try:
while True:
raw_items = client.rpop(100)
if raw_items:
print(f"collector: got {len(raw_items)} raw item(s) from queue", flush=True)
inserted = insert_usage(raw_items)
print(f"collector: inserted {inserted}/{len(raw_items)} events into DB", flush=True)
now = time.time()
# if now - last_quota >= cfg["quota_refresh_seconds"]:
# refresh_quota(force=True)
# last_quota = now
time.sleep(cfg["poll_interval_seconds"])
finally:
client.close()
print("collector: connection closed", flush=True)
except Exception as exc:
import traceback
print(f"collector error: {exc}", file=sys.stderr, flush=True)
traceback.print_exc(file=sys.stderr)
print("collector: retrying in 5s...", file=sys.stderr, flush=True)
time.sleep(5)
def range_bounds(name):
now = dt.datetime.now(LOCAL_TZ)
if name == "5h":
start = now - dt.timedelta(hours=5)
elif name == "1h":
start = now - dt.timedelta(hours=1)
elif name == "24h":
start = now - dt.timedelta(hours=24)
elif name == "7d":
start = now - dt.timedelta(days=7)
else:
start = now.replace(hour=0, minute=0, second=0, microsecond=0)
return start.astimezone(dt.timezone.utc).timestamp(), now.astimezone(dt.timezone.utc).timestamp()
def query_summary(range_name):
start, end = range_bounds(range_name)
with db_connect() as conn:
total = conn.execute(
"""
SELECT COUNT(*) requests,
COALESCE(SUM(total_tokens),0) total_tokens,
COALESCE(SUM(input_tokens),0) input_tokens,
COALESCE(SUM(output_tokens),0) output_tokens,
COALESCE(SUM(reasoning_tokens),0) reasoning_tokens,
COALESCE(SUM(cached_tokens),0) cached_tokens,
COALESCE(SUM(failed),0) failed
FROM usage_events WHERE ts_epoch BETWEEN ? AND ?
""",
(start, end),
).fetchone()
accounts = conn.execute(
"""
SELECT COALESCE(source, auth_index, 'unknown') account,
COUNT(*) requests,
COALESCE(SUM(total_tokens),0) total_tokens,
COALESCE(SUM(input_tokens),0) input_tokens,
COALESCE(SUM(output_tokens),0) output_tokens,
COALESCE(SUM(reasoning_tokens),0) reasoning_tokens,
COALESCE(SUM(failed),0) failed
FROM usage_events WHERE ts_epoch BETWEEN ? AND ?
GROUP BY account ORDER BY total_tokens DESC
""",
(start, end),
).fetchall()
models = conn.execute(
"""
SELECT COALESCE(model, 'unknown') model,
COUNT(*) requests,
COALESCE(SUM(total_tokens),0) total_tokens,
COALESCE(SUM(failed),0) failed
FROM usage_events WHERE ts_epoch BETWEEN ? AND ?
GROUP BY model ORDER BY total_tokens DESC LIMIT 12
""",
(start, end),
).fetchall()
hours = conn.execute(
"""
SELECT local_hour hour,
COUNT(*) requests,
COALESCE(SUM(total_tokens),0) total_tokens,
COALESCE(SUM(failed),0) failed
FROM usage_events WHERE ts_epoch BETWEEN ? AND ?
GROUP BY local_hour ORDER BY local_hour
""",
(start, end),
).fetchall()
return {
"range": range_name,
"summary": dict(total),
"accounts": [dict(x) for x in accounts],
"models": [dict(x) for x in models],
"hours": [dict(x) for x in hours],
}
def latest_quotas(force=False):
if force:
refresh_quota(force=True)
with db_connect() as conn:
rows = conn.execute(
"""
SELECT q.* FROM quota_snapshots q
JOIN (
SELECT email, MAX(ts_epoch) ts FROM quota_snapshots GROUP BY email
) latest ON latest.email = q.email AND latest.ts = q.ts_epoch
ORDER BY email
"""
).fetchall()
return [dict(row) for row in rows]
def recent_requests(limit=100):
with db_connect() as conn:
rows = conn.execute(
"""
SELECT timestamp, source, auth_index, model, endpoint, failed, latency_ms,
input_tokens, output_tokens, reasoning_tokens, cached_tokens, total_tokens, request_id
FROM usage_events ORDER BY ts_epoch DESC LIMIT ?
""",
(limit,),
).fetchall()
result = []
for row in rows:
item = dict(row)
item["local_time"] = parse_rfc3339(item["timestamp"]).astimezone(LOCAL_TZ).strftime("%Y-%m-%d %H:%M:%S")
result.append(item)
return result
def json_response(handler, payload, status=200):
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
handler.send_response(status)
handler.send_header("Content-Type", "application/json; charset=utf-8")
handler.send_header("Content-Length", str(len(body)))
handler.send_header("Cache-Control", "no-store")
handler.end_headers()
handler.wfile.write(body)
class DashboardHandler(BaseHTTPRequestHandler):
def log_message(self, fmt, *args):
return
def do_GET(self):
parsed = urlparse(self.path)
qs = parse_qs(parsed.query)
try:
if parsed.path == "/":
self.serve_html()
elif parsed.path == "/api/summary":
json_response(self, query_summary(qs.get("range", ["today"])[0]))
elif parsed.path == "/api/quota":
json_response(self, {"quotas": latest_quotas(force=qs.get("force", ["0"])[0] == "1")})
elif parsed.path == "/api/requests":
limit = min(500, int(qs.get("limit", ["100"])[0]))
json_response(self, {"requests": recent_requests(limit)})
elif parsed.path == "/api/health":
json_response(self, {"ok": True, "db": DB_PATH, "auth_files": len(auth_files())})
else:
json_response(self, {"error": "not found"}, 404)
except Exception as exc:
json_response(self, {"error": str(exc)}, 500)
def serve_html(self):
try:
with open(HTML_PATH, "rb") as f:
body = f.read()
except FileNotFoundError:
body = b"<h1>Dashboard HTML not found</h1><p>Expected: " + HTML_PATH.encode() + b"</p>"
self.send_response(500)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
return
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.send_header("Cache-Control", "no-store")
self.end_headers()
self.wfile.write(body)
def serve():
init_db()
cfg = load_config()
server = ThreadingHTTPServer((cfg["dashboard_host"], int(cfg["dashboard_port"])), DashboardHandler)
print(f"dashboard listening on http://{cfg['dashboard_host']}:{cfg['dashboard_port']}", flush=True)
server.serve_forever()
def print_report(range_name):
init_db()
summary = query_summary(range_name)
print(json.dumps(summary, ensure_ascii=False, indent=2))
def start(open_browser=True):
"""Run init + collect (background thread) + serve in a single command."""
# 1. Init DB and config
init_db()
cfg = load_config()
host = cfg["dashboard_host"]
port = int(cfg["dashboard_port"])
# When binding to 0.0.0.0, show localhost for browser and also the LAN IP
if host in ("0.0.0.0", ""):
local_url = f"http://127.0.0.1:{port}"
try:
lan_ip = socket.gethostbyname(socket.gethostname())
except OSError:
lan_ip = None
lan_url = f"http://{lan_ip}:{port}" if lan_ip and lan_ip != "127.0.0.1" else None
else:
local_url = f"http://{host}:{port}"
lan_url = None
# 2. Start collector in a daemon thread (dies automatically when main process exits)
collector_thread = threading.Thread(target=collect_forever, daemon=True, name="collector")
collector_thread.start()
print("collector started (background thread)", flush=True)
# 3. Open browser after a short delay so the server is ready
if open_browser:
def _open():
time.sleep(1.5)
webbrowser.open(local_url)
threading.Thread(target=_open, daemon=True).start()
# 4. Start HTTP server (blocks — keeps the process alive)
server = ThreadingHTTPServer((host, port), DashboardHandler)
print(f"dashboard listening on {host}:{port}", flush=True)
print(f" local → {local_url}", flush=True)
if lan_url:
print(f" LAN → {lan_url}", flush=True)
print("Press Ctrl+C to stop.", flush=True)
try:
server.serve_forever()
except KeyboardInterrupt:
print("\nShutting down.", flush=True)
server.shutdown()
def main():
parser = argparse.ArgumentParser(description="CLIProxyAPI usage dashboard")
sub = parser.add_subparsers(dest="cmd", required=True)
sub.add_parser("init", help="Initialise database and config")
sub.add_parser("collect", help="Run collector loop (foreground)")
sub.add_parser("debug", help="Test one poll cycle, print raw queue items (no DB write)")
sub.add_parser("serve", help="Run HTTP server only (no collector)")
start_p = sub.add_parser("start", help="Init + collect + serve in one command (recommended)")
start_p.add_argument("--no-browser", action="store_true", help="Do not open browser automatically")
quota_p = sub.add_parser("quota", help="Show current quota snapshots")
quota_p.add_argument("--force", action="store_true")
report_p = sub.add_parser("report", help="Print usage summary as JSON")
report_p.add_argument("range", choices=["today", "1h", "5h", "24h", "7d"])
args = parser.parse_args()
if args.cmd == "init":
init_db()
load_config()
print(DB_PATH)
elif args.cmd == "collect":
collect_forever()
elif args.cmd == "debug":
debug_collect()
elif args.cmd == "serve":
serve()
elif args.cmd == "start":
start(open_browser=not args.no_browser)
elif args.cmd == "quota":
init_db()
print(json.dumps({"quotas": latest_quotas(force=args.force)}, ensure_ascii=False, indent=2))
elif args.cmd == "report":
print_report(args.range)
if __name__ == "__main__":
main()