#!/usr/bin/env python3 """Generic blog crawler for RAG using Firecrawl. Discovery is sitemap-first (Yoast/WordPress), extraction is Firecrawl single-page scrape. Outputs: articles.jsonl article-level structured records chunks.jsonl chunk-level records for embedding/RAG keywords.json keyword dictionary for extraction urls.json discovered URL list with sitemap lastmod raw/.json raw Firecrawl response per article markdown/.md extracted markdown per article errors.jsonl failed URLs/errors summary.json crawl summary Usage: python3 crawl_blog.py --sitemap https://example.com/post-sitemap.xml --limit 5 python3 crawl_blog.py --sitemap https://example.com/post-sitemap.xml --all python3 crawl_blog.py --sitemap https://example.com/post-sitemap.xml --all --keywords my_keywords.json Requires FIRECRAWL_API_KEY in environment or .env file. """ from __future__ import annotations import argparse import datetime as dt import hashlib import html import json import os import re import sys import time import urllib.error import urllib.parse import urllib.request import xml.etree.ElementTree as ET from pathlib import Path from typing import Any FIRECRAWL_SCRAPE_URL = "https://api.firecrawl.dev/v1/scrape" WORD_RE = re.compile(r"\S+", re.UNICODE) HEADING_RE = re.compile(r"^(#{1,6})\s+(.+?)\s*$") MD_LINK_RE = re.compile(r"\[([^\]]+)\]\(([^)]+)\)") MD_IMAGE_RE = re.compile(r"!\[[^\]]*\]\([^)]+\)") HTML_TAG_RE = re.compile(r"<[^>]+>") MULTI_SPACE_RE = re.compile(r"[ \t]+") # --------------------------------------------------------------------------- # Environment / dotenv # --------------------------------------------------------------------------- def load_dotenv(path: Path | None = None) -> None: """Load .env from project root or given path.""" candidates = [Path(__file__).parent / ".env"] if path: candidates.insert(0, path) for env_path in candidates: if not env_path.exists(): continue for raw in env_path.read_text(encoding="utf-8", errors="ignore").splitlines(): line = raw.strip() if not line or line.startswith("#") or "=" not in line: continue key, val = line.split("=", 1) key = key.strip() val = val.strip().strip('"').strip("'") if key and key not in os.environ: os.environ[key] = val break def now_iso() -> str: return dt.datetime.now(dt.timezone.utc).isoformat() # --------------------------------------------------------------------------- # URL / sitemap helpers # --------------------------------------------------------------------------- def fetch_bytes(url: str, timeout: int = 30) -> bytes: req = urllib.request.Request( url, headers={ "User-Agent": "BlogCrawler-RAG/1.0", "Accept": "application/xml,text/xml,text/html,*/*", }, ) with urllib.request.urlopen(req, timeout=timeout) as resp: return resp.read() def parse_sitemap(url: str, domain_filter: str | None = None) -> list[dict[str, str | None]]: """Return [{'url': ..., 'lastmod': ...}] from a sitemap URL. If domain_filter is provided, only include URLs matching that domain. Works with Yoast (post-sitemap.xml) and generic WordPress sitemaps. """ data = fetch_bytes(url) root = ET.fromstring(data) ns = {"sm": "http://www.sitemaps.org/schemas/sitemap/0.9"} # Check if this is a sitemap index (contains other sitemaps) sitemaps = root.findall(".//sm:sitemap/sm:loc", ns) if sitemaps: # This is a sitemap index - recursively fetch child sitemaps out: list[dict[str, str | None]] = [] for loc_el in sitemaps: child_url = loc_el.text if child_url: try: child_results = parse_sitemap(child_url, domain_filter) out.extend(child_results) except Exception as e: print(f" WARN: failed to fetch child sitemap {child_url}: {e}", file=sys.stderr) return out # Regular sitemap - extract URLs out: list[dict[str, str | None]] = [] for url_el in root.findall(".//sm:url", ns): loc_el = url_el.find("sm:loc", ns) if loc_el is None or not loc_el.text: continue lastmod_el = url_el.find("sm:lastmod", ns) loc = loc_el.text.strip() lastmod = lastmod_el.text.strip() if lastmod_el is not None and lastmod_el.text else None if domain_filter and domain_filter not in loc: continue out.append({"url": loc, "lastmod": lastmod}) return out def slug_from_url(url: str) -> str: path = urllib.parse.urlparse(url).path.strip("/") if not path: path = "index" slug = re.sub(r"\.(html?|php)$", "", path) slug = re.sub(r"[^a-zA-Z0-9_-]+", "-", slug).strip("-").lower() if not slug: slug = hashlib.sha1(url.encode()).hexdigest()[:12] return slug[:160] def article_id_from_url(url: str, prefix: str = "blog") -> str: return f"{prefix}_" + slug_from_url(url).replace("-", "_") def source_from_url(url: str) -> str: """Extract domain name from URL as source identifier.""" parsed = urllib.parse.urlparse(url) domain = parsed.netloc if domain.startswith("www."): domain = domain[4:] return domain # --------------------------------------------------------------------------- # Firecrawl API # --------------------------------------------------------------------------- def firecrawl_scrape(url: str, api_key: str, timeout: int = 120) -> tuple[int, dict[str, Any]]: payload = { "url": url, "formats": ["markdown"], "onlyMainContent": True, "waitFor": 1000, "timeout": timeout * 1000, } body = json.dumps(payload).encode("utf-8") req = urllib.request.Request( FIRECRAWL_SCRAPE_URL, data=body, method="POST", headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", "User-Agent": "BlogCrawler-RAG/1.0", }, ) try: with urllib.request.urlopen(req, timeout=timeout + 20) as resp: raw = resp.read().decode("utf-8", errors="replace") return resp.status, json.loads(raw) except urllib.error.HTTPError as e: raw = e.read().decode("utf-8", errors="replace") try: data = json.loads(raw) except Exception: data = {"error": raw} return e.code, data # --------------------------------------------------------------------------- # Markdown / text processing # --------------------------------------------------------------------------- def strip_markdown_to_text(markdown: str) -> str: text = markdown.replace("\r\n", "\n") text = MD_IMAGE_RE.sub("", text) text = MD_LINK_RE.sub(r"\1", text) text = re.sub(r"```.*?```", lambda m: m.group(0), text, flags=re.S) text = re.sub(r"^#{1,6}\s+", "", text, flags=re.M) text = re.sub(r"[*_`~]", "", text) text = HTML_TAG_RE.sub(" ", text) text = html.unescape(text) text = MULTI_SPACE_RE.sub(" ", text) text = re.sub(r"\n{3,}", "\n\n", text) return text.strip() def clean_markdown(markdown: str) -> str: """Light cleanup for common boilerplate while preserving content.""" lines = markdown.replace("\r\n", "\n").split("\n") drop_contains = [ "Press enter for Accessibility", "Accessibility menu", "Popup heading", "Skip to main", "Bỏ qua nội dung", "close", ] cleaned: list[str] = [] for line in lines: s = line.strip() if any(x.lower() in s.lower() for x in drop_contains): continue cleaned.append(line.rstrip()) text = "\n".join(cleaned) text = re.sub(r"\n{4,}", "\n\n\n", text) return text.strip() def extract_title(data: dict[str, Any], markdown: str, fallback_url: str) -> str: meta = data.get("metadata") or {} for key in ("title", "ogTitle"): val = meta.get(key) if isinstance(val, str) and val.strip(): return html.unescape(val.strip()) for line in markdown.splitlines(): m = HEADING_RE.match(line.strip()) if m: return m.group(2).strip() return slug_from_url(fallback_url).replace("-", " ").title() def extract_description(data: dict[str, Any], text: str) -> str | None: meta = data.get("metadata") or {} for key in ("description", "ogDescription"): val = meta.get(key) if isinstance(val, str) and val.strip(): return html.unescape(val.strip()) return text[:300].strip() if text else None # --------------------------------------------------------------------------- # Keyword extraction (replaces product mentions from original) # --------------------------------------------------------------------------- def load_keywords(path: Path | None) -> list[dict[str, Any]]: """Load keyword dictionary from JSON. Expected JSON shape (list of categories): [ { "category": "hardware", "keywords": ["Raspberry Pi", "Arduino", "ESP32"] }, { "category": "software", "keywords": ["Docker", "Ubuntu", "Debian"] } ] Or a flat list of keyword strings: ["Raspberry Pi", "Docker", "Home Assistant"] """ if path is None or not path.exists(): return [] data = json.loads(path.read_text(encoding="utf-8")) if isinstance(data, list) and all(isinstance(x, str) for x in data): # Flat list of strings -> wrap into single category return [{"category": "general", "keywords": data}] if not isinstance(data, list): raise ValueError(f"keywords JSON must be a list: {path}") categories: list[dict[str, Any]] = [] for row in data: if not isinstance(row, dict): continue category = str(row.get("category") or "general").strip() kw_list = row.get("keywords") or [] if not isinstance(kw_list, list): continue keywords = sorted({str(k).strip() for k in kw_list if str(k).strip()}, key=len, reverse=True) if keywords: categories.append({"category": category, "keywords": keywords}) return categories def _alias_to_regex(alias: str) -> re.Pattern[str]: """Compile a keyword regex with flexible whitespace and safe boundaries.""" alias = html.unescape(alias or "").strip() alias = alias.replace("\u00a0", " ") alias = re.sub(r"[\u2010-\u2015]", "-", alias) pat = re.escape(alias).replace(r"\ ", r"\s+") return re.compile(rf"(? bool: return a[0] < b[1] and b[0] < a[1] def keyword_mentions_detail(text: str, categories: list[dict[str, Any]]) -> list[dict[str, Any]]: """Return keyword mentions grouped by category. Longer keywords are processed first and reserve their character spans. This prevents false double-counts such as "Orange Pi 5" and "Orange Pi" matching the same text span. """ if not categories: return [] hay = text hay = html.unescape(hay or "") hay = hay.replace("\u00a0", " ") hay = re.sub(r"[\u2010-\u2015]", "-", hay) hay = re.sub(r"\s+", " ", hay) # Build flat list of (category, keyword, regex) sorted by keyword length desc all_kw: list[tuple[str, str, re.Pattern[str]]] = [] for cat in categories: for kw in cat["keywords"]: all_kw.append((cat["category"], kw, _alias_to_regex(kw))) all_kw.sort(key=lambda x: len(x[1]), reverse=True) details: list[dict[str, Any]] = [] occupied: list[tuple[int, int]] = [] # Group results by category cat_results: dict[str, dict[str, Any]] = {} for category, keyword, rx in all_kw: for m in rx.finditer(hay): span = (m.start(), m.end()) if any(spans_overlap(span, used) for used in occupied): continue occupied.append(span) if category not in cat_results: cat_results[category] = { "category": category, "matched_keywords": {}, "total_count": 0, } entry = cat_results[category] entry["matched_keywords"].setdefault(keyword, 0) entry["matched_keywords"][keyword] += 1 entry["total_count"] += 1 for cat_data in cat_results.values(): cat_data["matched_keywords"] = dict( sorted(cat_data["matched_keywords"].items(), key=lambda x: -x[1]) ) details.append(cat_data) return sorted(details, key=lambda d: -d["total_count"]) def keyword_mentions(text: str, categories: list[dict[str, Any]]) -> list[str]: """Return flat list of all matched keywords.""" if not categories: return [] all_matched = [] for detail in keyword_mentions_detail(text, categories): all_matched.extend(detail["matched_keywords"].keys()) return all_matched # --------------------------------------------------------------------------- # Topic inference # --------------------------------------------------------------------------- def infer_topic(title: str, text: str, categories: list[dict[str, Any]] | None = None) -> str | None: """Infer topic from content. Uses keyword categories if available.""" hay = (title + "\n" + text[:2000]).lower() # If categories are provided, use them for topic inference if categories: best_category = None best_count = 0 for cat in categories: count = sum(1 for kw in cat["keywords"] if kw.lower() in hay) if count > best_count: best_count = count best_category = cat["category"] if best_category and best_count > 0: return best_category # Fallback: common topic rules rules = [ ("docker", "docker"), ("kubernetes", "kubernetes"), ("linux", "linux"), ("ubuntu", "linux"), ("debian", "linux"), ("python", "programming"), ("javascript", "programming"), ("home assistant", "home assistant"), ("iot", "iot"), ("ai", "ai"), ("machine learning", "ai"), ] for needle, topic in rules: if needle in hay: return topic return None # --------------------------------------------------------------------------- # Chunking # --------------------------------------------------------------------------- def chunk_markdown( markdown: str, article: dict[str, Any], categories: list[dict[str, Any]] | None = None, max_words: int = 650, overlap_words: int = 100, ) -> list[dict[str, Any]]: """Chunk markdown by paragraphs/headings with approximate word limits.""" blocks = re.split(r"\n\s*\n", markdown.strip()) if markdown.strip() else [] chunks: list[dict[str, Any]] = [] current: list[str] = [] current_words = 0 section = None current_section = None def words_of(s: str) -> list[str]: return WORD_RE.findall(s) def flush() -> None: nonlocal current, current_words, current_section content = "\n\n".join(current).strip() if not content: current = [] current_words = 0 return idx = len(chunks) chunk_kw = keyword_mentions(content, categories or []) if categories else [] chunk_kw_detail = keyword_mentions_detail(content, categories or []) if categories else [] chunks.append({ "chunk_id": f"{article['id']}__chunk_{idx:04d}", "article_id": article["id"], "url": article["url"], "title": article["title"], "section": current_section, "language": article.get("language", "en"), "content": content, "metadata": { "source": article.get("source"), "type": article.get("type"), "keyword_mentions": chunk_kw, "keyword_mentions_detail": chunk_kw_detail, "article_keyword_mentions": article.get("keywords", []), "topic": article.get("topic"), "modified_at": article.get("modified_at"), }, }) if overlap_words > 0: tail: list[str] = [] count = 0 for b in reversed(current): bw = len(words_of(b)) if tail and count + bw > overlap_words: break tail.insert(0, b) count += bw current = tail current_words = count else: current = [] current_words = 0 for block in blocks: b = block.strip() if not b: continue m = HEADING_RE.match(b.splitlines()[0].strip()) if m: section = m.group(2).strip() bw = len(words_of(b)) if current and current_words + bw > max_words: flush() if not current: current_section = section if bw > max_words * 1.5: words = words_of(b) start = 0 while start < len(words): part = " ".join(words[start:start + max_words]) if current and current_words + len(words_of(part)) > max_words: flush() current.append(part) current_words += len(words_of(part)) flush() start += max_words - overlap_words continue current.append(b) current_words += bw if current: flush() return chunks # --------------------------------------------------------------------------- # JSONL helpers # --------------------------------------------------------------------------- def append_jsonl(path: Path, record: dict[str, Any]) -> None: with path.open("a", encoding="utf-8") as f: f.write(json.dumps(record, ensure_ascii=False, sort_keys=True) + "\n") # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser( description="Generic blog crawler for RAG (Firecrawl + sitemap)", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Crawl 5 articles from a blog python3 crawl_blog.py --sitemap https://example.com/post-sitemap.xml --limit 5 # Crawl all articles python3 crawl_blog.py --sitemap https://example.com/post-sitemap.xml --all # Use custom keywords for extraction python3 crawl_blog.py --sitemap https://example.com/post-sitemap.xml --all --keywords keywords.json # Output to custom directory python3 crawl_blog.py --sitemap https://example.com/post-sitemap.xml --all --out-dir ./my_data """, ) parser.add_argument("--sitemap", required=True, help="Sitemap URL (e.g. https://example.com/post-sitemap.xml)") parser.add_argument("--out-dir", type=Path, default=Path("./blog_data"), help="Output directory") parser.add_argument("--keywords", type=Path, default=None, help="Keywords JSON path; defaults to /keywords.json") parser.add_argument("--limit", type=int, default=None, help="Only process first N article URLs") parser.add_argument("--all", action="store_true", help="Process all discovered article URLs") parser.add_argument("--sleep", type=float, default=1.0, help="Delay between Firecrawl calls (seconds)") parser.add_argument("--force", action="store_true", help="Re-scrape even if raw file exists") parser.add_argument("--max-words", type=int, default=650, help="Target words per chunk") parser.add_argument("--overlap-words", type=int, default=100, help="Overlap words between chunks") parser.add_argument("--language", default="en", help="Default language code for articles") args = parser.parse_args(argv) if not args.all and args.limit is None: args.limit = 5 load_dotenv() api_key = os.environ.get("FIRECRAWL_API_KEY") if not api_key: print("ERROR: FIRECRAWL_API_KEY is not set in environment or .env file", file=sys.stderr) return 2 out_dir: Path = args.out_dir out_dir.mkdir(parents=True, exist_ok=True) # Load keywords keywords_path: Path = args.keywords or (out_dir / "keywords.json") categories = load_keywords(keywords_path) # Create output directories raw_dir = out_dir / "raw" md_dir = out_dir / "markdown" raw_dir.mkdir(parents=True, exist_ok=True) md_dir.mkdir(parents=True, exist_ok=True) # Discover URLs from sitemap source_domain = source_from_url(args.sitemap) urls = parse_sitemap(args.sitemap) urls_path = out_dir / "urls.json" urls_path.write_text( json.dumps({"sitemap": args.sitemap, "source": source_domain, "count": len(urls), "urls": urls}, ensure_ascii=False, indent=2), encoding="utf-8", ) selected = urls if args.all else urls[: args.limit] articles_path = out_dir / "articles.jsonl" chunks_path = out_dir / "chunks.jsonl" errors_path = out_dir / "errors.jsonl" if args.force: articles_path.unlink(missing_ok=True) chunks_path.unlink(missing_ok=True) errors_path.unlink(missing_ok=True) print(f"Source: {source_domain}") print(f"Discovered article URLs: {len(urls)}") print(f"Processing: {len(selected)}") print(f"Output: {out_dir}") print(f"Keywords: {keywords_path} ({sum(len(c['keywords']) for c in categories)} keywords in {len(categories)} categories)") ok = 0 failed = 0 total_chunks = 0 for idx, item in enumerate(selected, 1): url = str(item["url"]) lastmod = item.get("lastmod") slug = slug_from_url(url) raw_path = raw_dir / f"{slug}.json" md_path = md_dir / f"{slug}.md" print(f"[{idx}/{len(selected)}] {url}") # Use cached raw if available if raw_path.exists() and not args.force: try: raw_data = json.loads(raw_path.read_text(encoding="utf-8")) status = int(raw_data.get("_http_status", 200)) except Exception as e: append_jsonl(errors_path, {"url": url, "error": f"read cached raw failed: {e}", "at": now_iso()}) failed += 1 continue else: status, raw_data = firecrawl_scrape(url, api_key) raw_data["_http_status"] = status raw_data["_source_url"] = url raw_data["_scraped_at"] = now_iso() raw_path.write_text(json.dumps(raw_data, ensure_ascii=False, indent=2), encoding="utf-8") time.sleep(args.sleep) if status >= 400 or not raw_data.get("success"): append_jsonl(errors_path, {"url": url, "http_status": status, "error": raw_data, "at": now_iso()}) print(f" ERROR status={status} success={raw_data.get('success')}") failed += 1 continue data = raw_data.get("data") or {} markdown = data.get("markdown") or "" markdown = clean_markdown(markdown) text = strip_markdown_to_text(markdown) if len(text) < 100: append_jsonl(errors_path, {"url": url, "http_status": status, "error": "too little text", "text_chars": len(text), "at": now_iso()}) print(f" ERROR too little text chars={len(text)}") failed += 1 continue title = extract_title(data, markdown, url) desc = extract_description(data, text) kw_detail = keyword_mentions_detail(title + "\n" + text, categories) kw_list = [kw for cat in kw_detail for kw in cat["matched_keywords"].keys()] article = { "id": article_id_from_url(url), "url": url, "source": source_domain, "type": "blog_article", "title": title, "description": desc, "published_at": None, "modified_at": lastmod, "language": args.language, "markdown": markdown, "text": text, "tags": [], "keywords": kw_list, "keyword_mentions_detail": kw_detail, "topic": infer_topic(title, text, categories), "scraped_at": raw_data.get("_scraped_at") or now_iso(), "metadata": data.get("metadata") or {}, } chunks = chunk_markdown(markdown, article, categories=categories, max_words=args.max_words, overlap_words=args.overlap_words) md_path.write_text(markdown, encoding="utf-8") append_jsonl(articles_path, article) for chunk in chunks: append_jsonl(chunks_path, chunk) ok += 1 total_chunks += len(chunks) print(f" OK title={title!r} markdown_chars={len(markdown)} chunks={len(chunks)} keywords={kw_list[:5]}") summary = { "sitemap": args.sitemap, "source": source_domain, "discovered": len(urls), "processed": len(selected), "ok": ok, "failed": failed, "chunks": total_chunks, "out_dir": str(out_dir), "keywords_path": str(keywords_path), "keywords_loaded": sum(len(c["keywords"]) for c in categories), "finished_at": now_iso(), } (out_dir / "summary.json").write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8") print("SUMMARY", json.dumps(summary, ensure_ascii=False)) return 0 if failed == 0 else 1 if __name__ == "__main__": raise SystemExit(main())