Files
orangepi-rag/crawl_orangepi_blog.py
T
2026-06-11 23:53:48 +07:00

575 lines
21 KiB
Python

#!/usr/bin/env python3
"""Crawl OrangePi.vn blog posts into structured JSONL for RAG.
Discovery is sitemap-first (Yoast/WordPress post-sitemap.xml), extraction is
Firecrawl single-page scrape. Outputs:
articles.jsonl article-level structured records
chunks.jsonl chunk-level records for embedding/RAG
orangepi_models.json Orange Pi canonical model dictionary + aliases
urls.json discovered URL list with sitemap lastmod
raw/<slug>.json raw Firecrawl response per article
markdown/<slug>.md extracted markdown per article
errors.jsonl failed URLs/errors
Usage:
python3 crawl_orangepi_blog.py --limit 5
python3 crawl_orangepi_blog.py --all
Requires FIRECRAWL_API_KEY in environment or /home/admin/.hermes/.env.
"""
from __future__ import annotations
import argparse
import datetime as dt
import hashlib
import html
import json
import os
import re
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
import xml.etree.ElementTree as ET
from pathlib import Path
from typing import Any
DEFAULT_OUT_DIR = Path("/mnt/ssd/orangepi-rag")
DEFAULT_MODELS_PATH = DEFAULT_OUT_DIR / "orangepi_models.json"
SITEMAP_URL = "https://orangepi.vn/post-sitemap.xml"
FIRECRAWL_SCRAPE_URL = "https://api.firecrawl.dev/v1/scrape"
ENV_PATH = Path("/home/admin/.hermes/.env")
ARTICLE_URL_RE = re.compile(r"^https://orangepi\.vn/(?!blog/?(?:$|page/))(?!wp-)(?!cart/?$)(?!checkout/?$).+\.html/?$")
WORD_RE = re.compile(r"\S+", re.UNICODE)
HEADING_RE = re.compile(r"^(#{1,6})\s+(.+?)\s*$")
MD_LINK_RE = re.compile(r"\[([^\]]+)\]\(([^)]+)\)")
MD_IMAGE_RE = re.compile(r"!\[[^\]]*\]\([^)]+\)")
HTML_TAG_RE = re.compile(r"<[^>]+>")
MULTI_SPACE_RE = re.compile(r"[ \t]+")
def load_dotenv(path: Path = ENV_PATH) -> None:
"""Tiny .env loader; does not print secrets."""
if not path.exists():
return
for raw in path.read_text(encoding="utf-8", errors="ignore").splitlines():
line = raw.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, val = line.split("=", 1)
key = key.strip()
val = val.strip().strip('"').strip("'")
if key and key not in os.environ:
os.environ[key] = val
def now_iso() -> str:
return dt.datetime.now(dt.timezone.utc).isoformat()
def fetch_bytes(url: str, timeout: int = 30) -> bytes:
req = urllib.request.Request(
url,
headers={
"User-Agent": "OrangePiVN-RAG-Crawler/1.0 (+https://orangepi.vn)",
"Accept": "application/xml,text/xml,text/html,*/*",
},
)
with urllib.request.urlopen(req, timeout=timeout) as resp:
return resp.read()
def parse_post_sitemap(url: str = SITEMAP_URL) -> list[dict[str, str | None]]:
"""Return [{'url': ..., 'lastmod': ...}] from post sitemap."""
# Cloudflare occasionally 403s Python urllib; curl-like UA usually works,
# but fall back to urllib request already includes UA.
data = fetch_bytes(url)
root = ET.fromstring(data)
ns = {"sm": "http://www.sitemaps.org/schemas/sitemap/0.9"}
out: list[dict[str, str | None]] = []
for url_el in root.findall(".//sm:url", ns):
loc_el = url_el.find("sm:loc", ns)
if loc_el is None or not loc_el.text:
continue
lastmod_el = url_el.find("sm:lastmod", ns)
loc = loc_el.text.strip()
lastmod = lastmod_el.text.strip() if lastmod_el is not None and lastmod_el.text else None
if is_article_url(loc):
out.append({"url": loc, "lastmod": lastmod})
return out
def is_article_url(url: str) -> bool:
return bool(ARTICLE_URL_RE.match(url.rstrip("/")))
def slug_from_url(url: str) -> str:
path = urllib.parse.urlparse(url).path.strip("/")
if not path:
path = "index"
slug = re.sub(r"\.html$", "", path)
slug = re.sub(r"[^a-zA-Z0-9_-]+", "-", slug).strip("-").lower()
if not slug:
slug = hashlib.sha1(url.encode()).hexdigest()[:12]
return slug[:160]
def article_id_from_url(url: str) -> str:
return "orangepi_blog_" + slug_from_url(url).replace("-", "_")
def firecrawl_scrape(url: str, api_key: str, timeout: int = 120) -> tuple[int, dict[str, Any]]:
payload = {
"url": url,
"formats": ["markdown"],
"onlyMainContent": True,
"waitFor": 1000,
"timeout": timeout * 1000,
}
body = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
FIRECRAWL_SCRAPE_URL,
data=body,
method="POST",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"User-Agent": "OrangePiVN-RAG-Crawler/1.0",
},
)
try:
with urllib.request.urlopen(req, timeout=timeout + 20) as resp:
raw = resp.read().decode("utf-8", errors="replace")
return resp.status, json.loads(raw)
except urllib.error.HTTPError as e:
raw = e.read().decode("utf-8", errors="replace")
try:
data = json.loads(raw)
except Exception:
data = {"error": raw}
return e.code, data
def strip_markdown_to_text(markdown: str) -> str:
text = markdown.replace("\r\n", "\n")
text = MD_IMAGE_RE.sub("", text)
text = MD_LINK_RE.sub(r"\1", text)
text = re.sub(r"```.*?```", lambda m: m.group(0), text, flags=re.S)
text = re.sub(r"^#{1,6}\s+", "", text, flags=re.M)
text = re.sub(r"[*_`~]", "", text)
text = HTML_TAG_RE.sub(" ", text)
text = html.unescape(text)
text = MULTI_SPACE_RE.sub(" ", text)
text = re.sub(r"\n{3,}", "\n\n", text)
return text.strip()
def clean_markdown(markdown: str) -> str:
"""Light cleanup for common boilerplate while preserving content."""
lines = markdown.replace("\r\n", "\n").split("\n")
drop_contains = [
"Press enter for Accessibility",
"Accessibility menu",
"Popup heading",
"Skip to main",
"Bỏ qua nội dung",
"close",
]
cleaned: list[str] = []
for line in lines:
s = line.strip()
if any(x.lower() in s.lower() for x in drop_contains):
continue
cleaned.append(line.rstrip())
text = "\n".join(cleaned)
text = re.sub(r"\n{4,}", "\n\n\n", text)
return text.strip()
def extract_title(data: dict[str, Any], markdown: str, fallback_url: str) -> str:
meta = data.get("metadata") or {}
for key in ("title", "ogTitle"):
val = meta.get(key)
if isinstance(val, str) and val.strip():
return html.unescape(val.strip())
for line in markdown.splitlines():
m = HEADING_RE.match(line.strip())
if m:
return m.group(2).strip()
return slug_from_url(fallback_url).replace("-", " ").title()
def extract_description(data: dict[str, Any], text: str) -> str | None:
meta = data.get("metadata") or {}
for key in ("description", "ogDescription"):
val = meta.get(key)
if isinstance(val, str) and val.strip():
return html.unescape(val.strip())
return text[:300].strip() if text else None
def normalize_for_match(text: str) -> str:
"""Normalize text for dictionary matching without losing Vietnamese content."""
text = html.unescape(text or "")
text = text.replace("\u00a0", " ")
text = re.sub(r"[\u2010-\u2015]", "-", text)
text = re.sub(r"\s+", " ", text)
return text
def alias_to_regex(alias: str) -> re.Pattern[str]:
"""Compile an alias regex with flexible whitespace and safe boundaries."""
alias = normalize_for_match(alias).strip()
# Escape, then make spaces flexible. Boundaries avoid matching inside longer words.
pat = re.escape(alias).replace(r"\ ", r"\s+")
return re.compile(rf"(?<![A-Za-z0-9]){pat}(?![A-Za-z0-9])", re.I | re.U)
def load_model_dictionary(path: Path | None) -> list[dict[str, Any]]:
"""Load canonical Orange Pi model dictionary.
Expected JSON shape:
[{"canonical": "Orange Pi Zero", "aliases": ["Orange Pi Zero", "OrangePi Zero"]}]
Aliases are sorted longest-first so "Orange Pi Zero LTS" wins before
"Orange Pi Zero" during evidence collection.
"""
if path is None or not path.exists():
return []
data = json.loads(path.read_text(encoding="utf-8"))
if not isinstance(data, list):
raise ValueError(f"model dictionary must be a JSON list: {path}")
models: list[dict[str, Any]] = []
for row in data:
if not isinstance(row, dict):
continue
canonical = str(row.get("canonical") or "").strip()
aliases = row.get("aliases") or []
if not canonical:
continue
if not isinstance(aliases, list):
aliases = []
alias_set = {canonical, *[str(a).strip() for a in aliases if str(a).strip()]}
compiled = []
for alias in sorted(alias_set, key=len, reverse=True):
compiled.append({"alias": alias, "regex": alias_to_regex(alias)})
models.append({"canonical": canonical, "aliases": sorted(alias_set), "compiled": compiled})
# Longest canonical first helps deterministic output for families.
return sorted(models, key=lambda m: len(m["canonical"]), reverse=True)
def spans_overlap(a: tuple[int, int], b: tuple[int, int]) -> bool:
return a[0] < b[1] and b[0] < a[1]
def product_mentions_detail(text: str, models: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Return canonical product mentions using the model dictionary.
Longer model names are processed first and reserve their character spans.
This prevents false double-counts such as both "Orange Pi Plus 2" and
"Orange Pi Plus" matching the same text span.
"""
if not models:
return []
hay = normalize_for_match(text)
details: list[dict[str, Any]] = []
occupied: list[tuple[int, int]] = []
for model in models:
span_aliases: dict[tuple[int, int], set[str]] = {}
for item in model.get("compiled", []):
alias = item["alias"]
rx = item["regex"]
for m in rx.finditer(hay):
span = (m.start(), m.end())
# Skip if a longer/different canonical model already claimed this text.
if any(spans_overlap(span, used) for used in occupied):
continue
span_aliases.setdefault(span, set()).add(alias)
if span_aliases:
aliases_matched: set[str] = set()
for aliases in span_aliases.values():
aliases_matched.update(aliases)
spans = sorted(span_aliases)
occupied.extend(spans)
details.append({
"canonical": model["canonical"],
"count": len(spans),
"aliases_matched": sorted(aliases_matched, key=str.lower),
})
return sorted(details, key=lambda d: (-int(d["count"]), str(d["canonical"]).lower()))
def product_mentions(text: str, models: list[dict[str, Any]]) -> list[str]:
return [d["canonical"] for d in product_mentions_detail(text, models)]
def infer_topic(title: str, text: str) -> str | None:
hay = (title + "\n" + text[:2000]).lower()
rules = [
("camera", "camera"),
("vnc", "remote access"),
("android", "android"),
("emmc", "storage"),
("sata", "storage"),
("home assistant", "home assistant"),
("gpio", "gpio"),
("ubuntu", "linux"),
("debian", "linux"),
("armbian", "linux"),
("docker", "docker"),
]
for needle, topic in rules:
if needle in hay:
return topic
return None
def chunk_markdown(markdown: str, article: dict[str, Any], models: list[dict[str, Any]] | None = None, max_words: int = 650, overlap_words: int = 100) -> list[dict[str, Any]]:
"""Chunk markdown by paragraphs/headings with approximate word limits."""
blocks = re.split(r"\n\s*\n", markdown.strip()) if markdown.strip() else []
chunks: list[dict[str, Any]] = []
current: list[str] = []
current_words = 0
section = None
current_section = None
def words_of(s: str) -> list[str]:
return WORD_RE.findall(s)
def flush() -> None:
nonlocal current, current_words, current_section
content = "\n\n".join(current).strip()
if not content:
current = []
current_words = 0
return
idx = len(chunks)
chunk_products = product_mentions(content, models or []) if models else []
chunk_product_detail = product_mentions_detail(content, models or []) if models else []
chunks.append({
"chunk_id": f"{article['id']}__chunk_{idx:04d}",
"article_id": article["id"],
"url": article["url"],
"title": article["title"],
"section": current_section,
"language": article.get("language", "vi"),
"content": content,
"metadata": {
"source": article.get("source"),
"type": article.get("type"),
"product_mentions": chunk_products,
"product_mentions_detail": chunk_product_detail,
"article_product_mentions": article.get("products", []),
"topic": article.get("topic"),
"modified_at": article.get("modified_at"),
},
})
# paragraph-level overlap, keeping whole blocks where possible
if overlap_words > 0:
tail: list[str] = []
count = 0
for b in reversed(current):
bw = len(words_of(b))
if tail and count + bw > overlap_words:
break
tail.insert(0, b)
count += bw
current = tail
current_words = count
else:
current = []
current_words = 0
for block in blocks:
b = block.strip()
if not b:
continue
m = HEADING_RE.match(b.splitlines()[0].strip())
if m:
section = m.group(2).strip()
bw = len(words_of(b))
if current and current_words + bw > max_words:
flush()
if not current:
current_section = section
# Very large block: split by words only as fallback.
if bw > max_words * 1.5:
words = words_of(b)
start = 0
while start < len(words):
part = " ".join(words[start:start + max_words])
if current and current_words + len(words_of(part)) > max_words:
flush()
current.append(part)
current_words += len(words_of(part))
flush()
start += max_words - overlap_words
continue
current.append(b)
current_words += bw
if current:
flush()
return chunks
def append_jsonl(path: Path, record: dict[str, Any]) -> None:
with path.open("a", encoding="utf-8") as f:
f.write(json.dumps(record, ensure_ascii=False, sort_keys=True) + "\n")
def already_done(raw_dir: Path, slug: str) -> bool:
return (raw_dir / f"{slug}.json").exists()
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Crawl OrangePi.vn blog into RAG JSONL data")
parser.add_argument("--out-dir", type=Path, default=DEFAULT_OUT_DIR)
parser.add_argument("--models", type=Path, default=None, help="Path to Orange Pi model dictionary JSON; defaults to <out-dir>/orangepi_models.json")
parser.add_argument("--sitemap", default=SITEMAP_URL)
parser.add_argument("--limit", type=int, default=None, help="Only process first N article URLs")
parser.add_argument("--all", action="store_true", help="Process all discovered article URLs")
parser.add_argument("--sleep", type=float, default=1.0, help="Delay between Firecrawl calls")
parser.add_argument("--force", action="store_true", help="Re-scrape even if raw file exists")
parser.add_argument("--max-words", type=int, default=650)
parser.add_argument("--overlap-words", type=int, default=100)
args = parser.parse_args(argv)
if not args.all and args.limit is None:
args.limit = 5
load_dotenv()
api_key = os.environ.get("FIRECRAWL_API_KEY")
if not api_key:
print("ERROR: FIRECRAWL_API_KEY is not set in environment or /home/admin/.hermes/.env", file=sys.stderr)
return 2
out_dir: Path = args.out_dir
models_path: Path = args.models or (out_dir / "orangepi_models.json")
models = load_model_dictionary(models_path)
raw_dir = out_dir / "raw"
md_dir = out_dir / "markdown"
raw_dir.mkdir(parents=True, exist_ok=True)
md_dir.mkdir(parents=True, exist_ok=True)
urls = parse_post_sitemap(args.sitemap)
urls_path = out_dir / "urls.json"
urls_path.write_text(json.dumps({"sitemap": args.sitemap, "count": len(urls), "urls": urls}, ensure_ascii=False, indent=2), encoding="utf-8")
selected = urls if args.all else urls[: args.limit]
articles_path = out_dir / "articles.jsonl"
chunks_path = out_dir / "chunks.jsonl"
errors_path = out_dir / "errors.jsonl"
# For force runs, make smoke-test output deterministic for selected set.
if args.force:
articles_path.unlink(missing_ok=True)
chunks_path.unlink(missing_ok=True)
errors_path.unlink(missing_ok=True)
print(f"Discovered article URLs: {len(urls)}")
print(f"Processing: {len(selected)}")
print(f"Output: {out_dir}")
print(f"Model dictionary: {models_path} ({len(models)} models)")
ok = 0
failed = 0
total_chunks = 0
for idx, item in enumerate(selected, 1):
url = str(item["url"])
lastmod = item.get("lastmod")
slug = slug_from_url(url)
raw_path = raw_dir / f"{slug}.json"
md_path = md_dir / f"{slug}.md"
print(f"[{idx}/{len(selected)}] {url}")
if raw_path.exists() and not args.force:
try:
raw_data = json.loads(raw_path.read_text(encoding="utf-8"))
status = int(raw_data.get("_http_status", 200))
except Exception as e:
append_jsonl(errors_path, {"url": url, "error": f"read cached raw failed: {e}", "at": now_iso()})
failed += 1
continue
else:
status, raw_data = firecrawl_scrape(url, api_key)
raw_data["_http_status"] = status
raw_data["_source_url"] = url
raw_data["_scraped_at"] = now_iso()
raw_path.write_text(json.dumps(raw_data, ensure_ascii=False, indent=2), encoding="utf-8")
time.sleep(args.sleep)
if status >= 400 or not raw_data.get("success"):
append_jsonl(errors_path, {"url": url, "http_status": status, "error": raw_data, "at": now_iso()})
print(f" ERROR status={status} success={raw_data.get('success')}")
failed += 1
continue
data = raw_data.get("data") or {}
markdown = data.get("markdown") or ""
markdown = clean_markdown(markdown)
text = strip_markdown_to_text(markdown)
if len(text) < 100:
append_jsonl(errors_path, {"url": url, "http_status": status, "error": "too little text", "text_chars": len(text), "at": now_iso()})
print(f" ERROR too little text chars={len(text)}")
failed += 1
continue
title = extract_title(data, markdown, url)
desc = extract_description(data, text)
product_detail = product_mentions_detail(title + "\n" + text, models)
products = [d["canonical"] for d in product_detail]
article = {
"id": article_id_from_url(url),
"url": url,
"source": "orangepi.vn",
"type": "blog_article",
"title": title,
"description": desc,
"published_at": None,
"modified_at": lastmod,
"language": "vi",
"markdown": markdown,
"text": text,
"tags": [],
"products": products,
"product_mentions_detail": product_detail,
"topic": infer_topic(title, text),
"scraped_at": raw_data.get("_scraped_at") or now_iso(),
"metadata": data.get("metadata") or {},
}
chunks = chunk_markdown(markdown, article, models=models, max_words=args.max_words, overlap_words=args.overlap_words)
md_path.write_text(markdown, encoding="utf-8")
append_jsonl(articles_path, article)
for chunk in chunks:
append_jsonl(chunks_path, chunk)
ok += 1
total_chunks += len(chunks)
print(f" OK title={title!r} markdown_chars={len(markdown)} chunks={len(chunks)}")
summary = {
"sitemap": args.sitemap,
"discovered": len(urls),
"processed": len(selected),
"ok": ok,
"failed": failed,
"chunks": total_chunks,
"out_dir": str(out_dir),
"models_path": str(models_path),
"models_loaded": len(models),
"finished_at": now_iso(),
}
(out_dir / "summary.json").write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8")
print("SUMMARY", json.dumps(summary, ensure_ascii=False))
return 0 if failed == 0 else 1
if __name__ == "__main__":
raise SystemExit(main())