diff --git a/.env.example b/.env.example index 905cd33..04ae452 100644 --- a/.env.example +++ b/.env.example @@ -7,4 +7,8 @@ OPENAI_API_KEY=your_api_key_here # LLM_BASE_URL=https://api.openai.com/v1 # Optional: Model name (default: gpt-4o-mini) -# LLM_MODEL=gpt-4o-mini \ No newline at end of file +# LLM_MODEL=gpt-4o-mini + +# Firecrawl API Configuration +# Get your API key from https://www.firecrawl.dev +FIRECRAWL_API_KEY=fc-... \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a1b9cbb --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.env +__pycache__/ \ No newline at end of file diff --git a/README.md b/README.md index 5b57863..88bbf73 100644 --- a/README.md +++ b/README.md @@ -1,106 +1,215 @@ -# OrangePi RAG Dataset +# Blog RAG Toolkit -A **Vietnamese-language** RAG (Retrieval-Augmented Generation) data pipeline that crawls, extracts, and chunks blog articles from [orangepi.vn](https://orangepi.vn) — the official Orange Pi distributor in Vietnam. +A complete RAG (Retrieval-Augmented Generation) pipeline: **crawl** any blog, **extract** keywords, **chunk** content, and **query** with an LLM. -## Dataset Summary +## Components -| Metric | Value | -|-------------|-------| -| Articles | 199 | -| Chunks | 472 | -| Models | 36 | -| Language | vi | -| Last crawl | 2026-06-11 | +| File | Purpose | +|------|---------| +| `crawl_blog.py` | Generic blog crawler (sitemap + Firecrawl) | +| `crawl_orangepi_blog.py` | OrangePi.vn-specific crawler | +| `rag_app.py` | RAG query application (FAISS + LLM) | +| `keywords_example.json` | Sample keyword dictionary | -## Output Files +## Quick Start -| File | Description | -|------|-------------| -| `articles.jsonl` | Full article records (title, description, markdown, text, product mentions, topic, metadata) | -| `chunks.jsonl` | Overlapping text chunks (~650 words, ~100 overlap) with metadata for embedding | -| `urls.json` | Discovered sitemap URLs with `lastmod` timestamps | -| `raw/.json` | Raw Firecrawl API scrape response per article | -| `markdown/.md` | Cleaned markdown per article | -| `orangepi_models.json` | Canonical Orange Pi model dictionary with aliases | -| `errors.jsonl` | Failed URLs and error details | -| `summary.json` | Crawl summary statistics | - -### Chunk metadata - -Each chunk in `chunks.jsonl` includes: - -- `chunk_id` — unique ID (`{article_id}__chunk_{seq}`) -- `article_id` — source article reference -- `content` — chunk text (markdown) -- `section` — nearest heading context -- `metadata.product_mentions` — canonical Orange Pi models mentioned -- `metadata.topic` — inferred topic (e.g., "home assistant", "linux", "docker") - -## Usage - -### Prerequisites - -- Python 3.10+ -- A [Firecrawl](https://www.firecrawl.dev) API key - -### Install +### 1. Install ```bash -git clone -cd orangepi-rag -# No external dependencies beyond Python stdlib +pip install -r requirements.txt ``` -### Set API key +### 2. Set API key ```bash export FIRECRAWL_API_KEY="fc-..." +# or put in .env file: +echo "FIRECRAWL_API_KEY=fc-..." > .env ``` -Or place it in `/home/admin/.hermes/.env`: - -``` -FIRECRAWL_API_KEY=fc-... -``` - -### Run crawl +### 3. Crawl a blog ```bash -# Quick test — process first 5 articles -python3 crawl_orangepi_blog.py --limit 5 +# Crawl 5 articles from any WordPress blog +python crawl_blog.py --sitemap https://example.com/post-sitemap.xml --limit 5 -# Full crawl — all discovered articles -python3 crawl_orangepi_blog.py --all +# Crawl all articles with custom keywords +python crawl_blog.py --sitemap https://example.com/post-sitemap.xml --all --keywords keywords.json -# Re-scrape everything (overwrites existing raw files) -python3 crawl_orangepi_blog.py --all --force +# Output to custom directory +python crawl_blog.py --sitemap https://example.com/post-sitemap.xml --all --out-dir ./my_blog_data +``` + +### 4. Build index & query + +```bash +# Build FAISS index +python rag_app.py --build --data-dir ./my_blog_data --index-dir ./my_index + +# Query (requires OPENAI_API_KEY) +export OPENAI_API_KEY="sk-..." +python rag_app.py --query "How to install Docker?" --data-dir ./my_blog_data --index-dir ./my_index + +# Interactive chat +python rag_app.py --interactive --data-dir ./my_blog_data --index-dir ./my_index +``` + +--- + +## crawl_blog.py — Generic Blog Crawler + +Crawls any blog that exposes a sitemap (WordPress, Yoast, etc.). + +### Usage + +```bash +python crawl_blog.py --sitemap [options] ``` ### Options | Argument | Default | Description | |----------|---------|-------------| +| `--sitemap` | (required) | Sitemap URL | +| `--out-dir` | `./blog_data` | Output directory | +| `--keywords` | `/keywords.json` | Keywords JSON path | | `--limit N` | 5 | Process first N articles | -| `--all` | — | Process all discovered articles | -| `--out-dir PATH` | `/mnt/ssd/orangepi-rag` | Output directory | -| `--models PATH` | `/orangepi_models.json` | Model dictionary path | -| `--sitemap URL` | `https://orangepi.vn/post-sitemap.xml` | Sitemap URL | +| `--all` | — | Process all articles | | `--sleep SEC` | 1.0 | Delay between Firecrawl calls | | `--force` | — | Re-scrape cached articles | | `--max-words N` | 650 | Target words per chunk | | `--overlap-words N` | 100 | Overlap words between chunks | +| `--language` | `en` | Default language code | -## Model Detection +### Output files -The pipeline uses `orangepi_models.json` to detect canonical Orange Pi product names in article text. The dictionary supports aliases per model (e.g., `"Orange Pi 5"`, `"OrangePi 5"`, `"OPi 5"`) and longest-match-first resolution to prevent false double-counts. +| File | Description | +|------|-------------| +| `articles.jsonl` | Article records with keyword mentions | +| `chunks.jsonl` | Chunked content for embedding | +| `keywords.json` | Keyword dictionary used | +| `urls.json` | Discovered URLs | +| `raw/.json` | Raw Firecrawl responses | +| `markdown/.md` | Cleaned markdown | +| `errors.jsonl` | Failed URLs | +| `summary.json` | Crawl summary | -## Use Cases +--- -- **Semantic search** over Vietnamese Orange Pi knowledge -- **Q&A bots** for Orange Pi tutorials, OS installs, hardware guides -- **Product recommendation** based on article content -- **Fine-tuning** Vietnamese embedding models on SBC/embedded computing content +## keywords.json — Keyword Dictionary + +Defines keywords to extract from crawled content. Supports categorized or flat format. + +### Categorized format (recommended) + +```json +[ + { + "category": "hardware", + "keywords": ["Raspberry Pi", "Arduino", "ESP32"] + }, + { + "category": "software", + "keywords": ["Docker", "Ubuntu", "Home Assistant"] + } +] +``` + +### Flat format + +```json +["Raspberry Pi", "Docker", "Home Assistant", "MQTT"] +``` + +See `keywords_example.json` for a complete template. + +--- + +## rag_app.py — RAG Query Application + +FAISS-based vector search + LLM generation. + +### Usage + +```bash +# Build index (one-time) +python rag_app.py --build --data-dir ./blog_data --index-dir ./index + +# Single query +python rag_app.py --query "Câu hỏi của bạn" --data-dir ./blog_data --index-dir ./index + +# Interactive chat +python rag_app.py --interactive --data-dir ./blog_data --index-dir ./index + +# Test retrieval only (no LLM needed) +python rag_app.py --query "test" --retrieve-only --data-dir ./blog_data --index-dir ./index +``` + +### Options + +| Argument | Default | Description | +|----------|---------|-------------| +| `--data-dir` | `.` | Directory with chunks.jsonl | +| `--index-dir` | `./rag_index` | FAISS index directory | +| `--build` | — | Build index from chunks | +| `--query` | — | Query to answer | +| `--interactive` | — | Interactive chat mode | +| `--retrieve-only` | — | Test retrieval without LLM | +| `--top-k` | 5 | Number of chunks to retrieve | +| `--embed-model` | `paraphrase-multilingual-MiniLM-L12-v2` | Embedding model | +| `--llm-model` | `gpt-4o-mini` | LLM model name | +| `--llm-base-url` | `https://api.openai.com/v1` | LLM API base URL | + +### LLM API configuration + +Set in `.env`: + +```bash +OPENAI_API_KEY=sk-... +# Or for other providers: +# LLM_BASE_URL=https://api.together.xyz/v1 +# LLM_MODEL=meta-llama/Llama-3-70b-chat-hf +``` + +Compatible with any OpenAI-format API: OpenAI, Together.ai, Groq, Ollama, etc. + +--- + +## crawl_orangepi_blog.py — OrangePi-specific Crawler + +Specialized crawler for orangepi.vn with Orange Pi model detection. + +```bash +python crawl_orangepi_blog.py --limit 5 +python crawl_orangepi_blog.py --all +``` + +Uses `orangepi_models.json` for product mention detection (36 Orange Pi models with aliases). + +--- + +## Architecture + +``` +Blog (sitemap) + │ + ▼ +crawl_blog.py ──► Firecrawl API ──► articles.jsonl + │ chunks.jsonl + │ keywords.json + │ raw/*.json + │ markdown/*.md + ▼ +rag_app.py + │ + ├──► SentenceTransformer (embeddings) + ├──► FAISS (vector index) + └──► LLM API (generation) + │ + ▼ + Answer + sources +``` ## License -Data sourced from [orangepi.vn](https://orangepi.vn). Check their site for content usage terms. +Data sourced from respective blogs. Check each site for content usage terms. diff --git a/crawl_blog.py b/crawl_blog.py new file mode 100644 index 0000000..041c47b --- /dev/null +++ b/crawl_blog.py @@ -0,0 +1,710 @@ +#!/usr/bin/env python3 +"""Generic blog crawler for RAG using Firecrawl. + +Discovery is sitemap-first (Yoast/WordPress), extraction is Firecrawl +single-page scrape. Outputs: + + articles.jsonl article-level structured records + chunks.jsonl chunk-level records for embedding/RAG + keywords.json keyword dictionary for extraction + urls.json discovered URL list with sitemap lastmod + raw/.json raw Firecrawl response per article + markdown/.md extracted markdown per article + errors.jsonl failed URLs/errors + summary.json crawl summary + +Usage: + python3 crawl_blog.py --sitemap https://example.com/post-sitemap.xml --limit 5 + python3 crawl_blog.py --sitemap https://example.com/post-sitemap.xml --all + python3 crawl_blog.py --sitemap https://example.com/post-sitemap.xml --all --keywords my_keywords.json + +Requires FIRECRAWL_API_KEY in environment or .env file. +""" + +from __future__ import annotations + +import argparse +import datetime as dt +import hashlib +import html +import json +import os +import re +import sys +import time +import urllib.error +import urllib.parse +import urllib.request +import xml.etree.ElementTree as ET +from pathlib import Path +from typing import Any + +FIRECRAWL_SCRAPE_URL = "https://api.firecrawl.dev/v1/scrape" + +WORD_RE = re.compile(r"\S+", re.UNICODE) +HEADING_RE = re.compile(r"^(#{1,6})\s+(.+?)\s*$") +MD_LINK_RE = re.compile(r"\[([^\]]+)\]\(([^)]+)\)") +MD_IMAGE_RE = re.compile(r"!\[[^\]]*\]\([^)]+\)") +HTML_TAG_RE = re.compile(r"<[^>]+>") +MULTI_SPACE_RE = re.compile(r"[ \t]+") + + +# --------------------------------------------------------------------------- +# Environment / dotenv +# --------------------------------------------------------------------------- + +def load_dotenv(path: Path | None = None) -> None: + """Load .env from project root or given path.""" + candidates = [Path(__file__).parent / ".env"] + if path: + candidates.insert(0, path) + for env_path in candidates: + if not env_path.exists(): + continue + for raw in env_path.read_text(encoding="utf-8", errors="ignore").splitlines(): + line = raw.strip() + if not line or line.startswith("#") or "=" not in line: + continue + key, val = line.split("=", 1) + key = key.strip() + val = val.strip().strip('"').strip("'") + if key and key not in os.environ: + os.environ[key] = val + break + + +def now_iso() -> str: + return dt.datetime.now(dt.timezone.utc).isoformat() + + +# --------------------------------------------------------------------------- +# URL / sitemap helpers +# --------------------------------------------------------------------------- + +def fetch_bytes(url: str, timeout: int = 30) -> bytes: + req = urllib.request.Request( + url, + headers={ + "User-Agent": "BlogCrawler-RAG/1.0", + "Accept": "application/xml,text/xml,text/html,*/*", + }, + ) + with urllib.request.urlopen(req, timeout=timeout) as resp: + return resp.read() + + +def parse_sitemap(url: str, domain_filter: str | None = None) -> list[dict[str, str | None]]: + """Return [{'url': ..., 'lastmod': ...}] from a sitemap URL. + + If domain_filter is provided, only include URLs matching that domain. + Works with Yoast (post-sitemap.xml) and generic WordPress sitemaps. + """ + data = fetch_bytes(url) + root = ET.fromstring(data) + ns = {"sm": "http://www.sitemaps.org/schemas/sitemap/0.9"} + + # Check if this is a sitemap index (contains other sitemaps) + sitemaps = root.findall(".//sm:sitemap/sm:loc", ns) + if sitemaps: + # This is a sitemap index - recursively fetch child sitemaps + out: list[dict[str, str | None]] = [] + for loc_el in sitemaps: + child_url = loc_el.text + if child_url: + try: + child_results = parse_sitemap(child_url, domain_filter) + out.extend(child_results) + except Exception as e: + print(f" WARN: failed to fetch child sitemap {child_url}: {e}", file=sys.stderr) + return out + + # Regular sitemap - extract URLs + out: list[dict[str, str | None]] = [] + for url_el in root.findall(".//sm:url", ns): + loc_el = url_el.find("sm:loc", ns) + if loc_el is None or not loc_el.text: + continue + lastmod_el = url_el.find("sm:lastmod", ns) + loc = loc_el.text.strip() + lastmod = lastmod_el.text.strip() if lastmod_el is not None and lastmod_el.text else None + if domain_filter and domain_filter not in loc: + continue + out.append({"url": loc, "lastmod": lastmod}) + return out + + +def slug_from_url(url: str) -> str: + path = urllib.parse.urlparse(url).path.strip("/") + if not path: + path = "index" + slug = re.sub(r"\.(html?|php)$", "", path) + slug = re.sub(r"[^a-zA-Z0-9_-]+", "-", slug).strip("-").lower() + if not slug: + slug = hashlib.sha1(url.encode()).hexdigest()[:12] + return slug[:160] + + +def article_id_from_url(url: str, prefix: str = "blog") -> str: + return f"{prefix}_" + slug_from_url(url).replace("-", "_") + + +def source_from_url(url: str) -> str: + """Extract domain name from URL as source identifier.""" + parsed = urllib.parse.urlparse(url) + domain = parsed.netloc + if domain.startswith("www."): + domain = domain[4:] + return domain + + +# --------------------------------------------------------------------------- +# Firecrawl API +# --------------------------------------------------------------------------- + +def firecrawl_scrape(url: str, api_key: str, timeout: int = 120) -> tuple[int, dict[str, Any]]: + payload = { + "url": url, + "formats": ["markdown"], + "onlyMainContent": True, + "waitFor": 1000, + "timeout": timeout * 1000, + } + body = json.dumps(payload).encode("utf-8") + req = urllib.request.Request( + FIRECRAWL_SCRAPE_URL, + data=body, + method="POST", + headers={ + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json", + "User-Agent": "BlogCrawler-RAG/1.0", + }, + ) + try: + with urllib.request.urlopen(req, timeout=timeout + 20) as resp: + raw = resp.read().decode("utf-8", errors="replace") + return resp.status, json.loads(raw) + except urllib.error.HTTPError as e: + raw = e.read().decode("utf-8", errors="replace") + try: + data = json.loads(raw) + except Exception: + data = {"error": raw} + return e.code, data + + +# --------------------------------------------------------------------------- +# Markdown / text processing +# --------------------------------------------------------------------------- + +def strip_markdown_to_text(markdown: str) -> str: + text = markdown.replace("\r\n", "\n") + text = MD_IMAGE_RE.sub("", text) + text = MD_LINK_RE.sub(r"\1", text) + text = re.sub(r"```.*?```", lambda m: m.group(0), text, flags=re.S) + text = re.sub(r"^#{1,6}\s+", "", text, flags=re.M) + text = re.sub(r"[*_`~]", "", text) + text = HTML_TAG_RE.sub(" ", text) + text = html.unescape(text) + text = MULTI_SPACE_RE.sub(" ", text) + text = re.sub(r"\n{3,}", "\n\n", text) + return text.strip() + + +def clean_markdown(markdown: str) -> str: + """Light cleanup for common boilerplate while preserving content.""" + lines = markdown.replace("\r\n", "\n").split("\n") + drop_contains = [ + "Press enter for Accessibility", + "Accessibility menu", + "Popup heading", + "Skip to main", + "Bỏ qua nội dung", + "close", + ] + cleaned: list[str] = [] + for line in lines: + s = line.strip() + if any(x.lower() in s.lower() for x in drop_contains): + continue + cleaned.append(line.rstrip()) + text = "\n".join(cleaned) + text = re.sub(r"\n{4,}", "\n\n\n", text) + return text.strip() + + +def extract_title(data: dict[str, Any], markdown: str, fallback_url: str) -> str: + meta = data.get("metadata") or {} + for key in ("title", "ogTitle"): + val = meta.get(key) + if isinstance(val, str) and val.strip(): + return html.unescape(val.strip()) + for line in markdown.splitlines(): + m = HEADING_RE.match(line.strip()) + if m: + return m.group(2).strip() + return slug_from_url(fallback_url).replace("-", " ").title() + + +def extract_description(data: dict[str, Any], text: str) -> str | None: + meta = data.get("metadata") or {} + for key in ("description", "ogDescription"): + val = meta.get(key) + if isinstance(val, str) and val.strip(): + return html.unescape(val.strip()) + return text[:300].strip() if text else None + + +# --------------------------------------------------------------------------- +# Keyword extraction (replaces product mentions from original) +# --------------------------------------------------------------------------- + +def load_keywords(path: Path | None) -> list[dict[str, Any]]: + """Load keyword dictionary from JSON. + + Expected JSON shape (list of categories): + [ + { + "category": "hardware", + "keywords": ["Raspberry Pi", "Arduino", "ESP32"] + }, + { + "category": "software", + "keywords": ["Docker", "Ubuntu", "Debian"] + } + ] + + Or a flat list of keyword strings: + ["Raspberry Pi", "Docker", "Home Assistant"] + """ + if path is None or not path.exists(): + return [] + data = json.loads(path.read_text(encoding="utf-8")) + + if isinstance(data, list) and all(isinstance(x, str) for x in data): + # Flat list of strings -> wrap into single category + return [{"category": "general", "keywords": data}] + + if not isinstance(data, list): + raise ValueError(f"keywords JSON must be a list: {path}") + + categories: list[dict[str, Any]] = [] + for row in data: + if not isinstance(row, dict): + continue + category = str(row.get("category") or "general").strip() + kw_list = row.get("keywords") or [] + if not isinstance(kw_list, list): + continue + keywords = sorted({str(k).strip() for k in kw_list if str(k).strip()}, key=len, reverse=True) + if keywords: + categories.append({"category": category, "keywords": keywords}) + return categories + + +def _alias_to_regex(alias: str) -> re.Pattern[str]: + """Compile a keyword regex with flexible whitespace and safe boundaries.""" + alias = html.unescape(alias or "").strip() + alias = alias.replace("\u00a0", " ") + alias = re.sub(r"[\u2010-\u2015]", "-", alias) + pat = re.escape(alias).replace(r"\ ", r"\s+") + return re.compile(rf"(? bool: + return a[0] < b[1] and b[0] < a[1] + + +def keyword_mentions_detail(text: str, categories: list[dict[str, Any]]) -> list[dict[str, Any]]: + """Return keyword mentions grouped by category. + + Longer keywords are processed first and reserve their character spans. + This prevents false double-counts such as "Orange Pi 5" and "Orange Pi" + matching the same text span. + """ + if not categories: + return [] + + hay = text + hay = html.unescape(hay or "") + hay = hay.replace("\u00a0", " ") + hay = re.sub(r"[\u2010-\u2015]", "-", hay) + hay = re.sub(r"\s+", " ", hay) + + # Build flat list of (category, keyword, regex) sorted by keyword length desc + all_kw: list[tuple[str, str, re.Pattern[str]]] = [] + for cat in categories: + for kw in cat["keywords"]: + all_kw.append((cat["category"], kw, _alias_to_regex(kw))) + all_kw.sort(key=lambda x: len(x[1]), reverse=True) + + details: list[dict[str, Any]] = [] + occupied: list[tuple[int, int]] = [] + + # Group results by category + cat_results: dict[str, dict[str, Any]] = {} + + for category, keyword, rx in all_kw: + for m in rx.finditer(hay): + span = (m.start(), m.end()) + if any(spans_overlap(span, used) for used in occupied): + continue + occupied.append(span) + + if category not in cat_results: + cat_results[category] = { + "category": category, + "matched_keywords": {}, + "total_count": 0, + } + entry = cat_results[category] + entry["matched_keywords"].setdefault(keyword, 0) + entry["matched_keywords"][keyword] += 1 + entry["total_count"] += 1 + + for cat_data in cat_results.values(): + cat_data["matched_keywords"] = dict( + sorted(cat_data["matched_keywords"].items(), key=lambda x: -x[1]) + ) + details.append(cat_data) + + return sorted(details, key=lambda d: -d["total_count"]) + + +def keyword_mentions(text: str, categories: list[dict[str, Any]]) -> list[str]: + """Return flat list of all matched keywords.""" + if not categories: + return [] + all_matched = [] + for detail in keyword_mentions_detail(text, categories): + all_matched.extend(detail["matched_keywords"].keys()) + return all_matched + + +# --------------------------------------------------------------------------- +# Topic inference +# --------------------------------------------------------------------------- + +def infer_topic(title: str, text: str, categories: list[dict[str, Any]] | None = None) -> str | None: + """Infer topic from content. Uses keyword categories if available.""" + hay = (title + "\n" + text[:2000]).lower() + + # If categories are provided, use them for topic inference + if categories: + best_category = None + best_count = 0 + for cat in categories: + count = sum(1 for kw in cat["keywords"] if kw.lower() in hay) + if count > best_count: + best_count = count + best_category = cat["category"] + if best_category and best_count > 0: + return best_category + + # Fallback: common topic rules + rules = [ + ("docker", "docker"), + ("kubernetes", "kubernetes"), + ("linux", "linux"), + ("ubuntu", "linux"), + ("debian", "linux"), + ("python", "programming"), + ("javascript", "programming"), + ("home assistant", "home assistant"), + ("iot", "iot"), + ("ai", "ai"), + ("machine learning", "ai"), + ] + for needle, topic in rules: + if needle in hay: + return topic + return None + + +# --------------------------------------------------------------------------- +# Chunking +# --------------------------------------------------------------------------- + +def chunk_markdown( + markdown: str, + article: dict[str, Any], + categories: list[dict[str, Any]] | None = None, + max_words: int = 650, + overlap_words: int = 100, +) -> list[dict[str, Any]]: + """Chunk markdown by paragraphs/headings with approximate word limits.""" + blocks = re.split(r"\n\s*\n", markdown.strip()) if markdown.strip() else [] + chunks: list[dict[str, Any]] = [] + current: list[str] = [] + current_words = 0 + section = None + current_section = None + + def words_of(s: str) -> list[str]: + return WORD_RE.findall(s) + + def flush() -> None: + nonlocal current, current_words, current_section + content = "\n\n".join(current).strip() + if not content: + current = [] + current_words = 0 + return + idx = len(chunks) + chunk_kw = keyword_mentions(content, categories or []) if categories else [] + chunk_kw_detail = keyword_mentions_detail(content, categories or []) if categories else [] + chunks.append({ + "chunk_id": f"{article['id']}__chunk_{idx:04d}", + "article_id": article["id"], + "url": article["url"], + "title": article["title"], + "section": current_section, + "language": article.get("language", "en"), + "content": content, + "metadata": { + "source": article.get("source"), + "type": article.get("type"), + "keyword_mentions": chunk_kw, + "keyword_mentions_detail": chunk_kw_detail, + "article_keyword_mentions": article.get("keywords", []), + "topic": article.get("topic"), + "modified_at": article.get("modified_at"), + }, + }) + if overlap_words > 0: + tail: list[str] = [] + count = 0 + for b in reversed(current): + bw = len(words_of(b)) + if tail and count + bw > overlap_words: + break + tail.insert(0, b) + count += bw + current = tail + current_words = count + else: + current = [] + current_words = 0 + + for block in blocks: + b = block.strip() + if not b: + continue + m = HEADING_RE.match(b.splitlines()[0].strip()) + if m: + section = m.group(2).strip() + bw = len(words_of(b)) + if current and current_words + bw > max_words: + flush() + if not current: + current_section = section + if bw > max_words * 1.5: + words = words_of(b) + start = 0 + while start < len(words): + part = " ".join(words[start:start + max_words]) + if current and current_words + len(words_of(part)) > max_words: + flush() + current.append(part) + current_words += len(words_of(part)) + flush() + start += max_words - overlap_words + continue + current.append(b) + current_words += bw + if current: + flush() + return chunks + + +# --------------------------------------------------------------------------- +# JSONL helpers +# --------------------------------------------------------------------------- + +def append_jsonl(path: Path, record: dict[str, Any]) -> None: + with path.open("a", encoding="utf-8") as f: + f.write(json.dumps(record, ensure_ascii=False, sort_keys=True) + "\n") + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser( + description="Generic blog crawler for RAG (Firecrawl + sitemap)", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Crawl 5 articles from a blog + python3 crawl_blog.py --sitemap https://example.com/post-sitemap.xml --limit 5 + + # Crawl all articles + python3 crawl_blog.py --sitemap https://example.com/post-sitemap.xml --all + + # Use custom keywords for extraction + python3 crawl_blog.py --sitemap https://example.com/post-sitemap.xml --all --keywords keywords.json + + # Output to custom directory + python3 crawl_blog.py --sitemap https://example.com/post-sitemap.xml --all --out-dir ./my_data + """, + ) + parser.add_argument("--sitemap", required=True, help="Sitemap URL (e.g. https://example.com/post-sitemap.xml)") + parser.add_argument("--out-dir", type=Path, default=Path("./blog_data"), help="Output directory") + parser.add_argument("--keywords", type=Path, default=None, help="Keywords JSON path; defaults to /keywords.json") + parser.add_argument("--limit", type=int, default=None, help="Only process first N article URLs") + parser.add_argument("--all", action="store_true", help="Process all discovered article URLs") + parser.add_argument("--sleep", type=float, default=1.0, help="Delay between Firecrawl calls (seconds)") + parser.add_argument("--force", action="store_true", help="Re-scrape even if raw file exists") + parser.add_argument("--max-words", type=int, default=650, help="Target words per chunk") + parser.add_argument("--overlap-words", type=int, default=100, help="Overlap words between chunks") + parser.add_argument("--language", default="en", help="Default language code for articles") + args = parser.parse_args(argv) + + if not args.all and args.limit is None: + args.limit = 5 + + load_dotenv() + api_key = os.environ.get("FIRECRAWL_API_KEY") + if not api_key: + print("ERROR: FIRECRAWL_API_KEY is not set in environment or .env file", file=sys.stderr) + return 2 + + out_dir: Path = args.out_dir + out_dir.mkdir(parents=True, exist_ok=True) + + # Load keywords + keywords_path: Path = args.keywords or (out_dir / "keywords.json") + categories = load_keywords(keywords_path) + + # Create output directories + raw_dir = out_dir / "raw" + md_dir = out_dir / "markdown" + raw_dir.mkdir(parents=True, exist_ok=True) + md_dir.mkdir(parents=True, exist_ok=True) + + # Discover URLs from sitemap + source_domain = source_from_url(args.sitemap) + urls = parse_sitemap(args.sitemap) + urls_path = out_dir / "urls.json" + urls_path.write_text( + json.dumps({"sitemap": args.sitemap, "source": source_domain, "count": len(urls), "urls": urls}, ensure_ascii=False, indent=2), + encoding="utf-8", + ) + + selected = urls if args.all else urls[: args.limit] + articles_path = out_dir / "articles.jsonl" + chunks_path = out_dir / "chunks.jsonl" + errors_path = out_dir / "errors.jsonl" + + if args.force: + articles_path.unlink(missing_ok=True) + chunks_path.unlink(missing_ok=True) + errors_path.unlink(missing_ok=True) + + print(f"Source: {source_domain}") + print(f"Discovered article URLs: {len(urls)}") + print(f"Processing: {len(selected)}") + print(f"Output: {out_dir}") + print(f"Keywords: {keywords_path} ({sum(len(c['keywords']) for c in categories)} keywords in {len(categories)} categories)") + + ok = 0 + failed = 0 + total_chunks = 0 + + for idx, item in enumerate(selected, 1): + url = str(item["url"]) + lastmod = item.get("lastmod") + slug = slug_from_url(url) + raw_path = raw_dir / f"{slug}.json" + md_path = md_dir / f"{slug}.md" + print(f"[{idx}/{len(selected)}] {url}") + + # Use cached raw if available + if raw_path.exists() and not args.force: + try: + raw_data = json.loads(raw_path.read_text(encoding="utf-8")) + status = int(raw_data.get("_http_status", 200)) + except Exception as e: + append_jsonl(errors_path, {"url": url, "error": f"read cached raw failed: {e}", "at": now_iso()}) + failed += 1 + continue + else: + status, raw_data = firecrawl_scrape(url, api_key) + raw_data["_http_status"] = status + raw_data["_source_url"] = url + raw_data["_scraped_at"] = now_iso() + raw_path.write_text(json.dumps(raw_data, ensure_ascii=False, indent=2), encoding="utf-8") + time.sleep(args.sleep) + + if status >= 400 or not raw_data.get("success"): + append_jsonl(errors_path, {"url": url, "http_status": status, "error": raw_data, "at": now_iso()}) + print(f" ERROR status={status} success={raw_data.get('success')}") + failed += 1 + continue + + data = raw_data.get("data") or {} + markdown = data.get("markdown") or "" + markdown = clean_markdown(markdown) + text = strip_markdown_to_text(markdown) + if len(text) < 100: + append_jsonl(errors_path, {"url": url, "http_status": status, "error": "too little text", "text_chars": len(text), "at": now_iso()}) + print(f" ERROR too little text chars={len(text)}") + failed += 1 + continue + + title = extract_title(data, markdown, url) + desc = extract_description(data, text) + kw_detail = keyword_mentions_detail(title + "\n" + text, categories) + kw_list = [kw for cat in kw_detail for kw in cat["matched_keywords"].keys()] + + article = { + "id": article_id_from_url(url), + "url": url, + "source": source_domain, + "type": "blog_article", + "title": title, + "description": desc, + "published_at": None, + "modified_at": lastmod, + "language": args.language, + "markdown": markdown, + "text": text, + "tags": [], + "keywords": kw_list, + "keyword_mentions_detail": kw_detail, + "topic": infer_topic(title, text, categories), + "scraped_at": raw_data.get("_scraped_at") or now_iso(), + "metadata": data.get("metadata") or {}, + } + chunks = chunk_markdown(markdown, article, categories=categories, max_words=args.max_words, overlap_words=args.overlap_words) + + md_path.write_text(markdown, encoding="utf-8") + append_jsonl(articles_path, article) + for chunk in chunks: + append_jsonl(chunks_path, chunk) + + ok += 1 + total_chunks += len(chunks) + print(f" OK title={title!r} markdown_chars={len(markdown)} chunks={len(chunks)} keywords={kw_list[:5]}") + + summary = { + "sitemap": args.sitemap, + "source": source_domain, + "discovered": len(urls), + "processed": len(selected), + "ok": ok, + "failed": failed, + "chunks": total_chunks, + "out_dir": str(out_dir), + "keywords_path": str(keywords_path), + "keywords_loaded": sum(len(c["keywords"]) for c in categories), + "finished_at": now_iso(), + } + (out_dir / "summary.json").write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8") + print("SUMMARY", json.dumps(summary, ensure_ascii=False)) + return 0 if failed == 0 else 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/keywords_example.json b/keywords_example.json new file mode 100644 index 0000000..b51f855 --- /dev/null +++ b/keywords_example.json @@ -0,0 +1,86 @@ +[ + { + "category": "hardware", + "keywords": [ + "Raspberry Pi", + "Orange Pi", + "Arduino", + "ESP32", + "ESP8266", + "BeagleBone", + "NVIDIA Jetson", + "STM32", + "GPIO", + "SPI", + "I2C", + "UART" + ] + }, + { + "category": "operating_system", + "keywords": [ + "Linux", + "Ubuntu", + "Debian", + "Raspberry Pi OS", + "Armbian", + "DietPi", + "Manjaro", + "Fedora", + "CentOS", + "FreeBSD", + "OpenWrt", + "Home Assistant OS" + ] + }, + { + "category": "software", + "keywords": [ + "Docker", + "Kubernetes", + "Home Assistant", + "MQTT", + "Node-RED", + "Jellyfin", + "Plex", + "Pi-hole", + "AdGuard", + "Nginx", + "Apache", + "Samba", + "Kodi", + "OctoPrint" + ] + }, + { + "category": "ai_ml", + "keywords": [ + "TensorFlow", + "PyTorch", + "OpenCV", + "YOLO", + "LLM", + "GPT", + "machine learning", + "deep learning", + "neural network", + "NPU", + "inference" + ] + }, + { + "category": "networking", + "keywords": [ + "VPN", + "WireGuard", + "ZeroTier", + "Tailscale", + "firewall", + "router", + "WiFi", + "Bluetooth", + "Zigbee", + "LoRa" + ] + } +] \ No newline at end of file