#!/usr/bin/env python3 """Crawl OrangePi.vn blog posts into structured JSONL for RAG. Discovery is sitemap-first (Yoast/WordPress post-sitemap.xml), extraction is Firecrawl single-page scrape. Outputs: articles.jsonl article-level structured records chunks.jsonl chunk-level records for embedding/RAG orangepi_models.json Orange Pi canonical model dictionary + aliases urls.json discovered URL list with sitemap lastmod raw/.json raw Firecrawl response per article markdown/.md extracted markdown per article errors.jsonl failed URLs/errors Usage: python3 crawl_orangepi_blog.py --limit 5 python3 crawl_orangepi_blog.py --all Requires FIRECRAWL_API_KEY in environment or /home/admin/.hermes/.env. """ from __future__ import annotations import argparse import datetime as dt import hashlib import html import json import os import re import sys import time import urllib.error import urllib.parse import urllib.request import xml.etree.ElementTree as ET from pathlib import Path from typing import Any DEFAULT_OUT_DIR = Path("/mnt/ssd/orangepi-rag") DEFAULT_MODELS_PATH = DEFAULT_OUT_DIR / "orangepi_models.json" SITEMAP_URL = "https://orangepi.vn/post-sitemap.xml" FIRECRAWL_SCRAPE_URL = "https://api.firecrawl.dev/v1/scrape" ENV_PATH = Path("/home/admin/.hermes/.env") ARTICLE_URL_RE = re.compile(r"^https://orangepi\.vn/(?!blog/?(?:$|page/))(?!wp-)(?!cart/?$)(?!checkout/?$).+\.html/?$") WORD_RE = re.compile(r"\S+", re.UNICODE) HEADING_RE = re.compile(r"^(#{1,6})\s+(.+?)\s*$") MD_LINK_RE = re.compile(r"\[([^\]]+)\]\(([^)]+)\)") MD_IMAGE_RE = re.compile(r"!\[[^\]]*\]\([^)]+\)") HTML_TAG_RE = re.compile(r"<[^>]+>") MULTI_SPACE_RE = re.compile(r"[ \t]+") def load_dotenv(path: Path = ENV_PATH) -> None: """Tiny .env loader; does not print secrets.""" if not path.exists(): return for raw in path.read_text(encoding="utf-8", errors="ignore").splitlines(): line = raw.strip() if not line or line.startswith("#") or "=" not in line: continue key, val = line.split("=", 1) key = key.strip() val = val.strip().strip('"').strip("'") if key and key not in os.environ: os.environ[key] = val def now_iso() -> str: return dt.datetime.now(dt.timezone.utc).isoformat() def fetch_bytes(url: str, timeout: int = 30) -> bytes: req = urllib.request.Request( url, headers={ "User-Agent": "OrangePiVN-RAG-Crawler/1.0 (+https://orangepi.vn)", "Accept": "application/xml,text/xml,text/html,*/*", }, ) with urllib.request.urlopen(req, timeout=timeout) as resp: return resp.read() def parse_post_sitemap(url: str = SITEMAP_URL) -> list[dict[str, str | None]]: """Return [{'url': ..., 'lastmod': ...}] from post sitemap.""" # Cloudflare occasionally 403s Python urllib; curl-like UA usually works, # but fall back to urllib request already includes UA. data = fetch_bytes(url) root = ET.fromstring(data) ns = {"sm": "http://www.sitemaps.org/schemas/sitemap/0.9"} out: list[dict[str, str | None]] = [] for url_el in root.findall(".//sm:url", ns): loc_el = url_el.find("sm:loc", ns) if loc_el is None or not loc_el.text: continue lastmod_el = url_el.find("sm:lastmod", ns) loc = loc_el.text.strip() lastmod = lastmod_el.text.strip() if lastmod_el is not None and lastmod_el.text else None if is_article_url(loc): out.append({"url": loc, "lastmod": lastmod}) return out def is_article_url(url: str) -> bool: return bool(ARTICLE_URL_RE.match(url.rstrip("/"))) def slug_from_url(url: str) -> str: path = urllib.parse.urlparse(url).path.strip("/") if not path: path = "index" slug = re.sub(r"\.html$", "", path) slug = re.sub(r"[^a-zA-Z0-9_-]+", "-", slug).strip("-").lower() if not slug: slug = hashlib.sha1(url.encode()).hexdigest()[:12] return slug[:160] def article_id_from_url(url: str) -> str: return "orangepi_blog_" + slug_from_url(url).replace("-", "_") def firecrawl_scrape(url: str, api_key: str, timeout: int = 120) -> tuple[int, dict[str, Any]]: payload = { "url": url, "formats": ["markdown"], "onlyMainContent": True, "waitFor": 1000, "timeout": timeout * 1000, } body = json.dumps(payload).encode("utf-8") req = urllib.request.Request( FIRECRAWL_SCRAPE_URL, data=body, method="POST", headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", "User-Agent": "OrangePiVN-RAG-Crawler/1.0", }, ) try: with urllib.request.urlopen(req, timeout=timeout + 20) as resp: raw = resp.read().decode("utf-8", errors="replace") return resp.status, json.loads(raw) except urllib.error.HTTPError as e: raw = e.read().decode("utf-8", errors="replace") try: data = json.loads(raw) except Exception: data = {"error": raw} return e.code, data def strip_markdown_to_text(markdown: str) -> str: text = markdown.replace("\r\n", "\n") text = MD_IMAGE_RE.sub("", text) text = MD_LINK_RE.sub(r"\1", text) text = re.sub(r"```.*?```", lambda m: m.group(0), text, flags=re.S) text = re.sub(r"^#{1,6}\s+", "", text, flags=re.M) text = re.sub(r"[*_`~]", "", text) text = HTML_TAG_RE.sub(" ", text) text = html.unescape(text) text = MULTI_SPACE_RE.sub(" ", text) text = re.sub(r"\n{3,}", "\n\n", text) return text.strip() def clean_markdown(markdown: str) -> str: """Light cleanup for common boilerplate while preserving content.""" lines = markdown.replace("\r\n", "\n").split("\n") drop_contains = [ "Press enter for Accessibility", "Accessibility menu", "Popup heading", "Skip to main", "Bỏ qua nội dung", "close", ] cleaned: list[str] = [] for line in lines: s = line.strip() if any(x.lower() in s.lower() for x in drop_contains): continue cleaned.append(line.rstrip()) text = "\n".join(cleaned) text = re.sub(r"\n{4,}", "\n\n\n", text) return text.strip() def extract_title(data: dict[str, Any], markdown: str, fallback_url: str) -> str: meta = data.get("metadata") or {} for key in ("title", "ogTitle"): val = meta.get(key) if isinstance(val, str) and val.strip(): return html.unescape(val.strip()) for line in markdown.splitlines(): m = HEADING_RE.match(line.strip()) if m: return m.group(2).strip() return slug_from_url(fallback_url).replace("-", " ").title() def extract_description(data: dict[str, Any], text: str) -> str | None: meta = data.get("metadata") or {} for key in ("description", "ogDescription"): val = meta.get(key) if isinstance(val, str) and val.strip(): return html.unescape(val.strip()) return text[:300].strip() if text else None def normalize_for_match(text: str) -> str: """Normalize text for dictionary matching without losing Vietnamese content.""" text = html.unescape(text or "") text = text.replace("\u00a0", " ") text = re.sub(r"[\u2010-\u2015]", "-", text) text = re.sub(r"\s+", " ", text) return text def alias_to_regex(alias: str) -> re.Pattern[str]: """Compile an alias regex with flexible whitespace and safe boundaries.""" alias = normalize_for_match(alias).strip() # Escape, then make spaces flexible. Boundaries avoid matching inside longer words. pat = re.escape(alias).replace(r"\ ", r"\s+") return re.compile(rf"(? list[dict[str, Any]]: """Load canonical Orange Pi model dictionary. Expected JSON shape: [{"canonical": "Orange Pi Zero", "aliases": ["Orange Pi Zero", "OrangePi Zero"]}] Aliases are sorted longest-first so "Orange Pi Zero LTS" wins before "Orange Pi Zero" during evidence collection. """ if path is None or not path.exists(): return [] data = json.loads(path.read_text(encoding="utf-8")) if not isinstance(data, list): raise ValueError(f"model dictionary must be a JSON list: {path}") models: list[dict[str, Any]] = [] for row in data: if not isinstance(row, dict): continue canonical = str(row.get("canonical") or "").strip() aliases = row.get("aliases") or [] if not canonical: continue if not isinstance(aliases, list): aliases = [] alias_set = {canonical, *[str(a).strip() for a in aliases if str(a).strip()]} compiled = [] for alias in sorted(alias_set, key=len, reverse=True): compiled.append({"alias": alias, "regex": alias_to_regex(alias)}) models.append({"canonical": canonical, "aliases": sorted(alias_set), "compiled": compiled}) # Longest canonical first helps deterministic output for families. return sorted(models, key=lambda m: len(m["canonical"]), reverse=True) def spans_overlap(a: tuple[int, int], b: tuple[int, int]) -> bool: return a[0] < b[1] and b[0] < a[1] def product_mentions_detail(text: str, models: list[dict[str, Any]]) -> list[dict[str, Any]]: """Return canonical product mentions using the model dictionary. Longer model names are processed first and reserve their character spans. This prevents false double-counts such as both "Orange Pi Plus 2" and "Orange Pi Plus" matching the same text span. """ if not models: return [] hay = normalize_for_match(text) details: list[dict[str, Any]] = [] occupied: list[tuple[int, int]] = [] for model in models: span_aliases: dict[tuple[int, int], set[str]] = {} for item in model.get("compiled", []): alias = item["alias"] rx = item["regex"] for m in rx.finditer(hay): span = (m.start(), m.end()) # Skip if a longer/different canonical model already claimed this text. if any(spans_overlap(span, used) for used in occupied): continue span_aliases.setdefault(span, set()).add(alias) if span_aliases: aliases_matched: set[str] = set() for aliases in span_aliases.values(): aliases_matched.update(aliases) spans = sorted(span_aliases) occupied.extend(spans) details.append({ "canonical": model["canonical"], "count": len(spans), "aliases_matched": sorted(aliases_matched, key=str.lower), }) return sorted(details, key=lambda d: (-int(d["count"]), str(d["canonical"]).lower())) def product_mentions(text: str, models: list[dict[str, Any]]) -> list[str]: return [d["canonical"] for d in product_mentions_detail(text, models)] def infer_topic(title: str, text: str) -> str | None: hay = (title + "\n" + text[:2000]).lower() rules = [ ("camera", "camera"), ("vnc", "remote access"), ("android", "android"), ("emmc", "storage"), ("sata", "storage"), ("home assistant", "home assistant"), ("gpio", "gpio"), ("ubuntu", "linux"), ("debian", "linux"), ("armbian", "linux"), ("docker", "docker"), ] for needle, topic in rules: if needle in hay: return topic return None def chunk_markdown(markdown: str, article: dict[str, Any], models: list[dict[str, Any]] | None = None, max_words: int = 650, overlap_words: int = 100) -> list[dict[str, Any]]: """Chunk markdown by paragraphs/headings with approximate word limits.""" blocks = re.split(r"\n\s*\n", markdown.strip()) if markdown.strip() else [] chunks: list[dict[str, Any]] = [] current: list[str] = [] current_words = 0 section = None current_section = None def words_of(s: str) -> list[str]: return WORD_RE.findall(s) def flush() -> None: nonlocal current, current_words, current_section content = "\n\n".join(current).strip() if not content: current = [] current_words = 0 return idx = len(chunks) chunk_products = product_mentions(content, models or []) if models else [] chunk_product_detail = product_mentions_detail(content, models or []) if models else [] chunks.append({ "chunk_id": f"{article['id']}__chunk_{idx:04d}", "article_id": article["id"], "url": article["url"], "title": article["title"], "section": current_section, "language": article.get("language", "vi"), "content": content, "metadata": { "source": article.get("source"), "type": article.get("type"), "product_mentions": chunk_products, "product_mentions_detail": chunk_product_detail, "article_product_mentions": article.get("products", []), "topic": article.get("topic"), "modified_at": article.get("modified_at"), }, }) # paragraph-level overlap, keeping whole blocks where possible if overlap_words > 0: tail: list[str] = [] count = 0 for b in reversed(current): bw = len(words_of(b)) if tail and count + bw > overlap_words: break tail.insert(0, b) count += bw current = tail current_words = count else: current = [] current_words = 0 for block in blocks: b = block.strip() if not b: continue m = HEADING_RE.match(b.splitlines()[0].strip()) if m: section = m.group(2).strip() bw = len(words_of(b)) if current and current_words + bw > max_words: flush() if not current: current_section = section # Very large block: split by words only as fallback. if bw > max_words * 1.5: words = words_of(b) start = 0 while start < len(words): part = " ".join(words[start:start + max_words]) if current and current_words + len(words_of(part)) > max_words: flush() current.append(part) current_words += len(words_of(part)) flush() start += max_words - overlap_words continue current.append(b) current_words += bw if current: flush() return chunks def append_jsonl(path: Path, record: dict[str, Any]) -> None: with path.open("a", encoding="utf-8") as f: f.write(json.dumps(record, ensure_ascii=False, sort_keys=True) + "\n") def already_done(raw_dir: Path, slug: str) -> bool: return (raw_dir / f"{slug}.json").exists() def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser(description="Crawl OrangePi.vn blog into RAG JSONL data") parser.add_argument("--out-dir", type=Path, default=DEFAULT_OUT_DIR) parser.add_argument("--models", type=Path, default=None, help="Path to Orange Pi model dictionary JSON; defaults to /orangepi_models.json") parser.add_argument("--sitemap", default=SITEMAP_URL) parser.add_argument("--limit", type=int, default=None, help="Only process first N article URLs") parser.add_argument("--all", action="store_true", help="Process all discovered article URLs") parser.add_argument("--sleep", type=float, default=1.0, help="Delay between Firecrawl calls") parser.add_argument("--force", action="store_true", help="Re-scrape even if raw file exists") parser.add_argument("--max-words", type=int, default=650) parser.add_argument("--overlap-words", type=int, default=100) args = parser.parse_args(argv) if not args.all and args.limit is None: args.limit = 5 load_dotenv() api_key = os.environ.get("FIRECRAWL_API_KEY") if not api_key: print("ERROR: FIRECRAWL_API_KEY is not set in environment or /home/admin/.hermes/.env", file=sys.stderr) return 2 out_dir: Path = args.out_dir models_path: Path = args.models or (out_dir / "orangepi_models.json") models = load_model_dictionary(models_path) raw_dir = out_dir / "raw" md_dir = out_dir / "markdown" raw_dir.mkdir(parents=True, exist_ok=True) md_dir.mkdir(parents=True, exist_ok=True) urls = parse_post_sitemap(args.sitemap) urls_path = out_dir / "urls.json" urls_path.write_text(json.dumps({"sitemap": args.sitemap, "count": len(urls), "urls": urls}, ensure_ascii=False, indent=2), encoding="utf-8") selected = urls if args.all else urls[: args.limit] articles_path = out_dir / "articles.jsonl" chunks_path = out_dir / "chunks.jsonl" errors_path = out_dir / "errors.jsonl" # For force runs, make smoke-test output deterministic for selected set. if args.force: articles_path.unlink(missing_ok=True) chunks_path.unlink(missing_ok=True) errors_path.unlink(missing_ok=True) print(f"Discovered article URLs: {len(urls)}") print(f"Processing: {len(selected)}") print(f"Output: {out_dir}") print(f"Model dictionary: {models_path} ({len(models)} models)") ok = 0 failed = 0 total_chunks = 0 for idx, item in enumerate(selected, 1): url = str(item["url"]) lastmod = item.get("lastmod") slug = slug_from_url(url) raw_path = raw_dir / f"{slug}.json" md_path = md_dir / f"{slug}.md" print(f"[{idx}/{len(selected)}] {url}") if raw_path.exists() and not args.force: try: raw_data = json.loads(raw_path.read_text(encoding="utf-8")) status = int(raw_data.get("_http_status", 200)) except Exception as e: append_jsonl(errors_path, {"url": url, "error": f"read cached raw failed: {e}", "at": now_iso()}) failed += 1 continue else: status, raw_data = firecrawl_scrape(url, api_key) raw_data["_http_status"] = status raw_data["_source_url"] = url raw_data["_scraped_at"] = now_iso() raw_path.write_text(json.dumps(raw_data, ensure_ascii=False, indent=2), encoding="utf-8") time.sleep(args.sleep) if status >= 400 or not raw_data.get("success"): append_jsonl(errors_path, {"url": url, "http_status": status, "error": raw_data, "at": now_iso()}) print(f" ERROR status={status} success={raw_data.get('success')}") failed += 1 continue data = raw_data.get("data") or {} markdown = data.get("markdown") or "" markdown = clean_markdown(markdown) text = strip_markdown_to_text(markdown) if len(text) < 100: append_jsonl(errors_path, {"url": url, "http_status": status, "error": "too little text", "text_chars": len(text), "at": now_iso()}) print(f" ERROR too little text chars={len(text)}") failed += 1 continue title = extract_title(data, markdown, url) desc = extract_description(data, text) product_detail = product_mentions_detail(title + "\n" + text, models) products = [d["canonical"] for d in product_detail] article = { "id": article_id_from_url(url), "url": url, "source": "orangepi.vn", "type": "blog_article", "title": title, "description": desc, "published_at": None, "modified_at": lastmod, "language": "vi", "markdown": markdown, "text": text, "tags": [], "products": products, "product_mentions_detail": product_detail, "topic": infer_topic(title, text), "scraped_at": raw_data.get("_scraped_at") or now_iso(), "metadata": data.get("metadata") or {}, } chunks = chunk_markdown(markdown, article, models=models, max_words=args.max_words, overlap_words=args.overlap_words) md_path.write_text(markdown, encoding="utf-8") append_jsonl(articles_path, article) for chunk in chunks: append_jsonl(chunks_path, chunk) ok += 1 total_chunks += len(chunks) print(f" OK title={title!r} markdown_chars={len(markdown)} chunks={len(chunks)}") summary = { "sitemap": args.sitemap, "discovered": len(urls), "processed": len(selected), "ok": ok, "failed": failed, "chunks": total_chunks, "out_dir": str(out_dir), "models_path": str(models_path), "models_loaded": len(models), "finished_at": now_iso(), } (out_dir / "summary.json").write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8") print("SUMMARY", json.dumps(summary, ensure_ascii=False)) return 0 if failed == 0 else 1 if __name__ == "__main__": raise SystemExit(main())