575 lines
21 KiB
Python
575 lines
21 KiB
Python
#!/usr/bin/env python3
|
|
"""Crawl OrangePi.vn blog posts into structured JSONL for RAG.
|
|
|
|
Discovery is sitemap-first (Yoast/WordPress post-sitemap.xml), extraction is
|
|
Firecrawl single-page scrape. Outputs:
|
|
|
|
articles.jsonl article-level structured records
|
|
chunks.jsonl chunk-level records for embedding/RAG
|
|
orangepi_models.json Orange Pi canonical model dictionary + aliases
|
|
urls.json discovered URL list with sitemap lastmod
|
|
raw/<slug>.json raw Firecrawl response per article
|
|
markdown/<slug>.md extracted markdown per article
|
|
errors.jsonl failed URLs/errors
|
|
|
|
Usage:
|
|
python3 crawl_orangepi_blog.py --limit 5
|
|
python3 crawl_orangepi_blog.py --all
|
|
|
|
Requires FIRECRAWL_API_KEY in environment or /home/admin/.hermes/.env.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import datetime as dt
|
|
import hashlib
|
|
import html
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
import xml.etree.ElementTree as ET
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
DEFAULT_OUT_DIR = Path("/mnt/ssd/orangepi-rag")
|
|
DEFAULT_MODELS_PATH = DEFAULT_OUT_DIR / "orangepi_models.json"
|
|
SITEMAP_URL = "https://orangepi.vn/post-sitemap.xml"
|
|
FIRECRAWL_SCRAPE_URL = "https://api.firecrawl.dev/v1/scrape"
|
|
ENV_PATH = Path("/home/admin/.hermes/.env")
|
|
|
|
ARTICLE_URL_RE = re.compile(r"^https://orangepi\.vn/(?!blog/?(?:$|page/))(?!wp-)(?!cart/?$)(?!checkout/?$).+\.html/?$")
|
|
WORD_RE = re.compile(r"\S+", re.UNICODE)
|
|
HEADING_RE = re.compile(r"^(#{1,6})\s+(.+?)\s*$")
|
|
MD_LINK_RE = re.compile(r"\[([^\]]+)\]\(([^)]+)\)")
|
|
MD_IMAGE_RE = re.compile(r"!\[[^\]]*\]\([^)]+\)")
|
|
HTML_TAG_RE = re.compile(r"<[^>]+>")
|
|
MULTI_SPACE_RE = re.compile(r"[ \t]+")
|
|
|
|
|
|
def load_dotenv(path: Path = ENV_PATH) -> None:
|
|
"""Tiny .env loader; does not print secrets."""
|
|
if not path.exists():
|
|
return
|
|
for raw in path.read_text(encoding="utf-8", errors="ignore").splitlines():
|
|
line = raw.strip()
|
|
if not line or line.startswith("#") or "=" not in line:
|
|
continue
|
|
key, val = line.split("=", 1)
|
|
key = key.strip()
|
|
val = val.strip().strip('"').strip("'")
|
|
if key and key not in os.environ:
|
|
os.environ[key] = val
|
|
|
|
|
|
def now_iso() -> str:
|
|
return dt.datetime.now(dt.timezone.utc).isoformat()
|
|
|
|
|
|
def fetch_bytes(url: str, timeout: int = 30) -> bytes:
|
|
req = urllib.request.Request(
|
|
url,
|
|
headers={
|
|
"User-Agent": "OrangePiVN-RAG-Crawler/1.0 (+https://orangepi.vn)",
|
|
"Accept": "application/xml,text/xml,text/html,*/*",
|
|
},
|
|
)
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
return resp.read()
|
|
|
|
|
|
def parse_post_sitemap(url: str = SITEMAP_URL) -> list[dict[str, str | None]]:
|
|
"""Return [{'url': ..., 'lastmod': ...}] from post sitemap."""
|
|
# Cloudflare occasionally 403s Python urllib; curl-like UA usually works,
|
|
# but fall back to urllib request already includes UA.
|
|
data = fetch_bytes(url)
|
|
root = ET.fromstring(data)
|
|
ns = {"sm": "http://www.sitemaps.org/schemas/sitemap/0.9"}
|
|
out: list[dict[str, str | None]] = []
|
|
for url_el in root.findall(".//sm:url", ns):
|
|
loc_el = url_el.find("sm:loc", ns)
|
|
if loc_el is None or not loc_el.text:
|
|
continue
|
|
lastmod_el = url_el.find("sm:lastmod", ns)
|
|
loc = loc_el.text.strip()
|
|
lastmod = lastmod_el.text.strip() if lastmod_el is not None and lastmod_el.text else None
|
|
if is_article_url(loc):
|
|
out.append({"url": loc, "lastmod": lastmod})
|
|
return out
|
|
|
|
|
|
def is_article_url(url: str) -> bool:
|
|
return bool(ARTICLE_URL_RE.match(url.rstrip("/")))
|
|
|
|
|
|
def slug_from_url(url: str) -> str:
|
|
path = urllib.parse.urlparse(url).path.strip("/")
|
|
if not path:
|
|
path = "index"
|
|
slug = re.sub(r"\.html$", "", path)
|
|
slug = re.sub(r"[^a-zA-Z0-9_-]+", "-", slug).strip("-").lower()
|
|
if not slug:
|
|
slug = hashlib.sha1(url.encode()).hexdigest()[:12]
|
|
return slug[:160]
|
|
|
|
|
|
def article_id_from_url(url: str) -> str:
|
|
return "orangepi_blog_" + slug_from_url(url).replace("-", "_")
|
|
|
|
|
|
def firecrawl_scrape(url: str, api_key: str, timeout: int = 120) -> tuple[int, dict[str, Any]]:
|
|
payload = {
|
|
"url": url,
|
|
"formats": ["markdown"],
|
|
"onlyMainContent": True,
|
|
"waitFor": 1000,
|
|
"timeout": timeout * 1000,
|
|
}
|
|
body = json.dumps(payload).encode("utf-8")
|
|
req = urllib.request.Request(
|
|
FIRECRAWL_SCRAPE_URL,
|
|
data=body,
|
|
method="POST",
|
|
headers={
|
|
"Authorization": f"Bearer {api_key}",
|
|
"Content-Type": "application/json",
|
|
"User-Agent": "OrangePiVN-RAG-Crawler/1.0",
|
|
},
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=timeout + 20) as resp:
|
|
raw = resp.read().decode("utf-8", errors="replace")
|
|
return resp.status, json.loads(raw)
|
|
except urllib.error.HTTPError as e:
|
|
raw = e.read().decode("utf-8", errors="replace")
|
|
try:
|
|
data = json.loads(raw)
|
|
except Exception:
|
|
data = {"error": raw}
|
|
return e.code, data
|
|
|
|
|
|
def strip_markdown_to_text(markdown: str) -> str:
|
|
text = markdown.replace("\r\n", "\n")
|
|
text = MD_IMAGE_RE.sub("", text)
|
|
text = MD_LINK_RE.sub(r"\1", text)
|
|
text = re.sub(r"```.*?```", lambda m: m.group(0), text, flags=re.S)
|
|
text = re.sub(r"^#{1,6}\s+", "", text, flags=re.M)
|
|
text = re.sub(r"[*_`~]", "", text)
|
|
text = HTML_TAG_RE.sub(" ", text)
|
|
text = html.unescape(text)
|
|
text = MULTI_SPACE_RE.sub(" ", text)
|
|
text = re.sub(r"\n{3,}", "\n\n", text)
|
|
return text.strip()
|
|
|
|
|
|
def clean_markdown(markdown: str) -> str:
|
|
"""Light cleanup for common boilerplate while preserving content."""
|
|
lines = markdown.replace("\r\n", "\n").split("\n")
|
|
drop_contains = [
|
|
"Press enter for Accessibility",
|
|
"Accessibility menu",
|
|
"Popup heading",
|
|
"Skip to main",
|
|
"Bỏ qua nội dung",
|
|
"close",
|
|
]
|
|
cleaned: list[str] = []
|
|
for line in lines:
|
|
s = line.strip()
|
|
if any(x.lower() in s.lower() for x in drop_contains):
|
|
continue
|
|
cleaned.append(line.rstrip())
|
|
text = "\n".join(cleaned)
|
|
text = re.sub(r"\n{4,}", "\n\n\n", text)
|
|
return text.strip()
|
|
|
|
|
|
def extract_title(data: dict[str, Any], markdown: str, fallback_url: str) -> str:
|
|
meta = data.get("metadata") or {}
|
|
for key in ("title", "ogTitle"):
|
|
val = meta.get(key)
|
|
if isinstance(val, str) and val.strip():
|
|
return html.unescape(val.strip())
|
|
for line in markdown.splitlines():
|
|
m = HEADING_RE.match(line.strip())
|
|
if m:
|
|
return m.group(2).strip()
|
|
return slug_from_url(fallback_url).replace("-", " ").title()
|
|
|
|
|
|
def extract_description(data: dict[str, Any], text: str) -> str | None:
|
|
meta = data.get("metadata") or {}
|
|
for key in ("description", "ogDescription"):
|
|
val = meta.get(key)
|
|
if isinstance(val, str) and val.strip():
|
|
return html.unescape(val.strip())
|
|
return text[:300].strip() if text else None
|
|
|
|
|
|
def normalize_for_match(text: str) -> str:
|
|
"""Normalize text for dictionary matching without losing Vietnamese content."""
|
|
text = html.unescape(text or "")
|
|
text = text.replace("\u00a0", " ")
|
|
text = re.sub(r"[\u2010-\u2015]", "-", text)
|
|
text = re.sub(r"\s+", " ", text)
|
|
return text
|
|
|
|
|
|
def alias_to_regex(alias: str) -> re.Pattern[str]:
|
|
"""Compile an alias regex with flexible whitespace and safe boundaries."""
|
|
alias = normalize_for_match(alias).strip()
|
|
# Escape, then make spaces flexible. Boundaries avoid matching inside longer words.
|
|
pat = re.escape(alias).replace(r"\ ", r"\s+")
|
|
return re.compile(rf"(?<![A-Za-z0-9]){pat}(?![A-Za-z0-9])", re.I | re.U)
|
|
|
|
|
|
def load_model_dictionary(path: Path | None) -> list[dict[str, Any]]:
|
|
"""Load canonical Orange Pi model dictionary.
|
|
|
|
Expected JSON shape:
|
|
[{"canonical": "Orange Pi Zero", "aliases": ["Orange Pi Zero", "OrangePi Zero"]}]
|
|
|
|
Aliases are sorted longest-first so "Orange Pi Zero LTS" wins before
|
|
"Orange Pi Zero" during evidence collection.
|
|
"""
|
|
if path is None or not path.exists():
|
|
return []
|
|
data = json.loads(path.read_text(encoding="utf-8"))
|
|
if not isinstance(data, list):
|
|
raise ValueError(f"model dictionary must be a JSON list: {path}")
|
|
models: list[dict[str, Any]] = []
|
|
for row in data:
|
|
if not isinstance(row, dict):
|
|
continue
|
|
canonical = str(row.get("canonical") or "").strip()
|
|
aliases = row.get("aliases") or []
|
|
if not canonical:
|
|
continue
|
|
if not isinstance(aliases, list):
|
|
aliases = []
|
|
alias_set = {canonical, *[str(a).strip() for a in aliases if str(a).strip()]}
|
|
compiled = []
|
|
for alias in sorted(alias_set, key=len, reverse=True):
|
|
compiled.append({"alias": alias, "regex": alias_to_regex(alias)})
|
|
models.append({"canonical": canonical, "aliases": sorted(alias_set), "compiled": compiled})
|
|
# Longest canonical first helps deterministic output for families.
|
|
return sorted(models, key=lambda m: len(m["canonical"]), reverse=True)
|
|
|
|
|
|
def spans_overlap(a: tuple[int, int], b: tuple[int, int]) -> bool:
|
|
return a[0] < b[1] and b[0] < a[1]
|
|
|
|
|
|
def product_mentions_detail(text: str, models: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
|
"""Return canonical product mentions using the model dictionary.
|
|
|
|
Longer model names are processed first and reserve their character spans.
|
|
This prevents false double-counts such as both "Orange Pi Plus 2" and
|
|
"Orange Pi Plus" matching the same text span.
|
|
"""
|
|
if not models:
|
|
return []
|
|
hay = normalize_for_match(text)
|
|
details: list[dict[str, Any]] = []
|
|
occupied: list[tuple[int, int]] = []
|
|
for model in models:
|
|
span_aliases: dict[tuple[int, int], set[str]] = {}
|
|
for item in model.get("compiled", []):
|
|
alias = item["alias"]
|
|
rx = item["regex"]
|
|
for m in rx.finditer(hay):
|
|
span = (m.start(), m.end())
|
|
# Skip if a longer/different canonical model already claimed this text.
|
|
if any(spans_overlap(span, used) for used in occupied):
|
|
continue
|
|
span_aliases.setdefault(span, set()).add(alias)
|
|
if span_aliases:
|
|
aliases_matched: set[str] = set()
|
|
for aliases in span_aliases.values():
|
|
aliases_matched.update(aliases)
|
|
spans = sorted(span_aliases)
|
|
occupied.extend(spans)
|
|
details.append({
|
|
"canonical": model["canonical"],
|
|
"count": len(spans),
|
|
"aliases_matched": sorted(aliases_matched, key=str.lower),
|
|
})
|
|
return sorted(details, key=lambda d: (-int(d["count"]), str(d["canonical"]).lower()))
|
|
|
|
|
|
def product_mentions(text: str, models: list[dict[str, Any]]) -> list[str]:
|
|
return [d["canonical"] for d in product_mentions_detail(text, models)]
|
|
|
|
|
|
def infer_topic(title: str, text: str) -> str | None:
|
|
hay = (title + "\n" + text[:2000]).lower()
|
|
rules = [
|
|
("camera", "camera"),
|
|
("vnc", "remote access"),
|
|
("android", "android"),
|
|
("emmc", "storage"),
|
|
("sata", "storage"),
|
|
("home assistant", "home assistant"),
|
|
("gpio", "gpio"),
|
|
("ubuntu", "linux"),
|
|
("debian", "linux"),
|
|
("armbian", "linux"),
|
|
("docker", "docker"),
|
|
]
|
|
for needle, topic in rules:
|
|
if needle in hay:
|
|
return topic
|
|
return None
|
|
|
|
|
|
def chunk_markdown(markdown: str, article: dict[str, Any], models: list[dict[str, Any]] | None = None, max_words: int = 650, overlap_words: int = 100) -> list[dict[str, Any]]:
|
|
"""Chunk markdown by paragraphs/headings with approximate word limits."""
|
|
blocks = re.split(r"\n\s*\n", markdown.strip()) if markdown.strip() else []
|
|
chunks: list[dict[str, Any]] = []
|
|
current: list[str] = []
|
|
current_words = 0
|
|
section = None
|
|
current_section = None
|
|
|
|
def words_of(s: str) -> list[str]:
|
|
return WORD_RE.findall(s)
|
|
|
|
def flush() -> None:
|
|
nonlocal current, current_words, current_section
|
|
content = "\n\n".join(current).strip()
|
|
if not content:
|
|
current = []
|
|
current_words = 0
|
|
return
|
|
idx = len(chunks)
|
|
chunk_products = product_mentions(content, models or []) if models else []
|
|
chunk_product_detail = product_mentions_detail(content, models or []) if models else []
|
|
chunks.append({
|
|
"chunk_id": f"{article['id']}__chunk_{idx:04d}",
|
|
"article_id": article["id"],
|
|
"url": article["url"],
|
|
"title": article["title"],
|
|
"section": current_section,
|
|
"language": article.get("language", "vi"),
|
|
"content": content,
|
|
"metadata": {
|
|
"source": article.get("source"),
|
|
"type": article.get("type"),
|
|
"product_mentions": chunk_products,
|
|
"product_mentions_detail": chunk_product_detail,
|
|
"article_product_mentions": article.get("products", []),
|
|
"topic": article.get("topic"),
|
|
"modified_at": article.get("modified_at"),
|
|
},
|
|
})
|
|
# paragraph-level overlap, keeping whole blocks where possible
|
|
if overlap_words > 0:
|
|
tail: list[str] = []
|
|
count = 0
|
|
for b in reversed(current):
|
|
bw = len(words_of(b))
|
|
if tail and count + bw > overlap_words:
|
|
break
|
|
tail.insert(0, b)
|
|
count += bw
|
|
current = tail
|
|
current_words = count
|
|
else:
|
|
current = []
|
|
current_words = 0
|
|
|
|
for block in blocks:
|
|
b = block.strip()
|
|
if not b:
|
|
continue
|
|
m = HEADING_RE.match(b.splitlines()[0].strip())
|
|
if m:
|
|
section = m.group(2).strip()
|
|
bw = len(words_of(b))
|
|
if current and current_words + bw > max_words:
|
|
flush()
|
|
if not current:
|
|
current_section = section
|
|
# Very large block: split by words only as fallback.
|
|
if bw > max_words * 1.5:
|
|
words = words_of(b)
|
|
start = 0
|
|
while start < len(words):
|
|
part = " ".join(words[start:start + max_words])
|
|
if current and current_words + len(words_of(part)) > max_words:
|
|
flush()
|
|
current.append(part)
|
|
current_words += len(words_of(part))
|
|
flush()
|
|
start += max_words - overlap_words
|
|
continue
|
|
current.append(b)
|
|
current_words += bw
|
|
if current:
|
|
flush()
|
|
return chunks
|
|
|
|
|
|
def append_jsonl(path: Path, record: dict[str, Any]) -> None:
|
|
with path.open("a", encoding="utf-8") as f:
|
|
f.write(json.dumps(record, ensure_ascii=False, sort_keys=True) + "\n")
|
|
|
|
|
|
def already_done(raw_dir: Path, slug: str) -> bool:
|
|
return (raw_dir / f"{slug}.json").exists()
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
parser = argparse.ArgumentParser(description="Crawl OrangePi.vn blog into RAG JSONL data")
|
|
parser.add_argument("--out-dir", type=Path, default=DEFAULT_OUT_DIR)
|
|
parser.add_argument("--models", type=Path, default=None, help="Path to Orange Pi model dictionary JSON; defaults to <out-dir>/orangepi_models.json")
|
|
parser.add_argument("--sitemap", default=SITEMAP_URL)
|
|
parser.add_argument("--limit", type=int, default=None, help="Only process first N article URLs")
|
|
parser.add_argument("--all", action="store_true", help="Process all discovered article URLs")
|
|
parser.add_argument("--sleep", type=float, default=1.0, help="Delay between Firecrawl calls")
|
|
parser.add_argument("--force", action="store_true", help="Re-scrape even if raw file exists")
|
|
parser.add_argument("--max-words", type=int, default=650)
|
|
parser.add_argument("--overlap-words", type=int, default=100)
|
|
args = parser.parse_args(argv)
|
|
|
|
if not args.all and args.limit is None:
|
|
args.limit = 5
|
|
|
|
load_dotenv()
|
|
api_key = os.environ.get("FIRECRAWL_API_KEY")
|
|
if not api_key:
|
|
print("ERROR: FIRECRAWL_API_KEY is not set in environment or /home/admin/.hermes/.env", file=sys.stderr)
|
|
return 2
|
|
|
|
out_dir: Path = args.out_dir
|
|
models_path: Path = args.models or (out_dir / "orangepi_models.json")
|
|
models = load_model_dictionary(models_path)
|
|
raw_dir = out_dir / "raw"
|
|
md_dir = out_dir / "markdown"
|
|
raw_dir.mkdir(parents=True, exist_ok=True)
|
|
md_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
urls = parse_post_sitemap(args.sitemap)
|
|
urls_path = out_dir / "urls.json"
|
|
urls_path.write_text(json.dumps({"sitemap": args.sitemap, "count": len(urls), "urls": urls}, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
|
|
selected = urls if args.all else urls[: args.limit]
|
|
articles_path = out_dir / "articles.jsonl"
|
|
chunks_path = out_dir / "chunks.jsonl"
|
|
errors_path = out_dir / "errors.jsonl"
|
|
|
|
# For force runs, make smoke-test output deterministic for selected set.
|
|
if args.force:
|
|
articles_path.unlink(missing_ok=True)
|
|
chunks_path.unlink(missing_ok=True)
|
|
errors_path.unlink(missing_ok=True)
|
|
|
|
print(f"Discovered article URLs: {len(urls)}")
|
|
print(f"Processing: {len(selected)}")
|
|
print(f"Output: {out_dir}")
|
|
print(f"Model dictionary: {models_path} ({len(models)} models)")
|
|
|
|
ok = 0
|
|
failed = 0
|
|
total_chunks = 0
|
|
|
|
for idx, item in enumerate(selected, 1):
|
|
url = str(item["url"])
|
|
lastmod = item.get("lastmod")
|
|
slug = slug_from_url(url)
|
|
raw_path = raw_dir / f"{slug}.json"
|
|
md_path = md_dir / f"{slug}.md"
|
|
print(f"[{idx}/{len(selected)}] {url}")
|
|
|
|
if raw_path.exists() and not args.force:
|
|
try:
|
|
raw_data = json.loads(raw_path.read_text(encoding="utf-8"))
|
|
status = int(raw_data.get("_http_status", 200))
|
|
except Exception as e:
|
|
append_jsonl(errors_path, {"url": url, "error": f"read cached raw failed: {e}", "at": now_iso()})
|
|
failed += 1
|
|
continue
|
|
else:
|
|
status, raw_data = firecrawl_scrape(url, api_key)
|
|
raw_data["_http_status"] = status
|
|
raw_data["_source_url"] = url
|
|
raw_data["_scraped_at"] = now_iso()
|
|
raw_path.write_text(json.dumps(raw_data, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
time.sleep(args.sleep)
|
|
|
|
if status >= 400 or not raw_data.get("success"):
|
|
append_jsonl(errors_path, {"url": url, "http_status": status, "error": raw_data, "at": now_iso()})
|
|
print(f" ERROR status={status} success={raw_data.get('success')}")
|
|
failed += 1
|
|
continue
|
|
|
|
data = raw_data.get("data") or {}
|
|
markdown = data.get("markdown") or ""
|
|
markdown = clean_markdown(markdown)
|
|
text = strip_markdown_to_text(markdown)
|
|
if len(text) < 100:
|
|
append_jsonl(errors_path, {"url": url, "http_status": status, "error": "too little text", "text_chars": len(text), "at": now_iso()})
|
|
print(f" ERROR too little text chars={len(text)}")
|
|
failed += 1
|
|
continue
|
|
|
|
title = extract_title(data, markdown, url)
|
|
desc = extract_description(data, text)
|
|
product_detail = product_mentions_detail(title + "\n" + text, models)
|
|
products = [d["canonical"] for d in product_detail]
|
|
article = {
|
|
"id": article_id_from_url(url),
|
|
"url": url,
|
|
"source": "orangepi.vn",
|
|
"type": "blog_article",
|
|
"title": title,
|
|
"description": desc,
|
|
"published_at": None,
|
|
"modified_at": lastmod,
|
|
"language": "vi",
|
|
"markdown": markdown,
|
|
"text": text,
|
|
"tags": [],
|
|
"products": products,
|
|
"product_mentions_detail": product_detail,
|
|
"topic": infer_topic(title, text),
|
|
"scraped_at": raw_data.get("_scraped_at") or now_iso(),
|
|
"metadata": data.get("metadata") or {},
|
|
}
|
|
chunks = chunk_markdown(markdown, article, models=models, max_words=args.max_words, overlap_words=args.overlap_words)
|
|
|
|
md_path.write_text(markdown, encoding="utf-8")
|
|
append_jsonl(articles_path, article)
|
|
for chunk in chunks:
|
|
append_jsonl(chunks_path, chunk)
|
|
|
|
ok += 1
|
|
total_chunks += len(chunks)
|
|
print(f" OK title={title!r} markdown_chars={len(markdown)} chunks={len(chunks)}")
|
|
|
|
summary = {
|
|
"sitemap": args.sitemap,
|
|
"discovered": len(urls),
|
|
"processed": len(selected),
|
|
"ok": ok,
|
|
"failed": failed,
|
|
"chunks": total_chunks,
|
|
"out_dir": str(out_dir),
|
|
"models_path": str(models_path),
|
|
"models_loaded": len(models),
|
|
"finished_at": now_iso(),
|
|
}
|
|
(out_dir / "summary.json").write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
print("SUMMARY", json.dumps(summary, ensure_ascii=False))
|
|
return 0 if failed == 0 else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|