update crawl blog

This commit is contained in:
2026-06-12 11:37:39 +07:00
parent 3ebf6f450d
commit 65d2cae6ca
5 changed files with 982 additions and 71 deletions
+5 -1
View File
@@ -7,4 +7,8 @@ OPENAI_API_KEY=your_api_key_here
# LLM_BASE_URL=https://api.openai.com/v1 # LLM_BASE_URL=https://api.openai.com/v1
# Optional: Model name (default: gpt-4o-mini) # Optional: Model name (default: gpt-4o-mini)
# LLM_MODEL=gpt-4o-mini # LLM_MODEL=gpt-4o-mini
# Firecrawl API Configuration
# Get your API key from https://www.firecrawl.dev
FIRECRAWL_API_KEY=fc-...
+2
View File
@@ -0,0 +1,2 @@
.env
__pycache__/
+179 -70
View File
@@ -1,106 +1,215 @@
# OrangePi RAG Dataset # Blog RAG Toolkit
A **Vietnamese-language** RAG (Retrieval-Augmented Generation) data pipeline that crawls, extracts, and chunks blog articles from [orangepi.vn](https://orangepi.vn) — the official Orange Pi distributor in Vietnam. A complete RAG (Retrieval-Augmented Generation) pipeline: **crawl** any blog, **extract** keywords, **chunk** content, and **query** with an LLM.
## Dataset Summary ## Components
| Metric | Value | | File | Purpose |
|-------------|-------| |------|---------|
| Articles | 199 | | `crawl_blog.py` | Generic blog crawler (sitemap + Firecrawl) |
| Chunks | 472 | | `crawl_orangepi_blog.py` | OrangePi.vn-specific crawler |
| Models | 36 | | `rag_app.py` | RAG query application (FAISS + LLM) |
| Language | vi | | `keywords_example.json` | Sample keyword dictionary |
| Last crawl | 2026-06-11 |
## Output Files ## Quick Start
| File | Description | ### 1. Install
|------|-------------|
| `articles.jsonl` | Full article records (title, description, markdown, text, product mentions, topic, metadata) |
| `chunks.jsonl` | Overlapping text chunks (~650 words, ~100 overlap) with metadata for embedding |
| `urls.json` | Discovered sitemap URLs with `lastmod` timestamps |
| `raw/<slug>.json` | Raw Firecrawl API scrape response per article |
| `markdown/<slug>.md` | Cleaned markdown per article |
| `orangepi_models.json` | Canonical Orange Pi model dictionary with aliases |
| `errors.jsonl` | Failed URLs and error details |
| `summary.json` | Crawl summary statistics |
### Chunk metadata
Each chunk in `chunks.jsonl` includes:
- `chunk_id` — unique ID (`{article_id}__chunk_{seq}`)
- `article_id` — source article reference
- `content` — chunk text (markdown)
- `section` — nearest heading context
- `metadata.product_mentions` — canonical Orange Pi models mentioned
- `metadata.topic` — inferred topic (e.g., "home assistant", "linux", "docker")
## Usage
### Prerequisites
- Python 3.10+
- A [Firecrawl](https://www.firecrawl.dev) API key
### Install
```bash ```bash
git clone <repo-url> pip install -r requirements.txt
cd orangepi-rag
# No external dependencies beyond Python stdlib
``` ```
### Set API key ### 2. Set API key
```bash ```bash
export FIRECRAWL_API_KEY="fc-..." export FIRECRAWL_API_KEY="fc-..."
# or put in .env file:
echo "FIRECRAWL_API_KEY=fc-..." > .env
``` ```
Or place it in `/home/admin/.hermes/.env`: ### 3. Crawl a blog
```
FIRECRAWL_API_KEY=fc-...
```
### Run crawl
```bash ```bash
# Quick test — process first 5 articles # Crawl 5 articles from any WordPress blog
python3 crawl_orangepi_blog.py --limit 5 python crawl_blog.py --sitemap https://example.com/post-sitemap.xml --limit 5
# Full crawl all discovered articles # Crawl all articles with custom keywords
python3 crawl_orangepi_blog.py --all python crawl_blog.py --sitemap https://example.com/post-sitemap.xml --all --keywords keywords.json
# Re-scrape everything (overwrites existing raw files) # Output to custom directory
python3 crawl_orangepi_blog.py --all --force python crawl_blog.py --sitemap https://example.com/post-sitemap.xml --all --out-dir ./my_blog_data
```
### 4. Build index & query
```bash
# Build FAISS index
python rag_app.py --build --data-dir ./my_blog_data --index-dir ./my_index
# Query (requires OPENAI_API_KEY)
export OPENAI_API_KEY="sk-..."
python rag_app.py --query "How to install Docker?" --data-dir ./my_blog_data --index-dir ./my_index
# Interactive chat
python rag_app.py --interactive --data-dir ./my_blog_data --index-dir ./my_index
```
---
## crawl_blog.py — Generic Blog Crawler
Crawls any blog that exposes a sitemap (WordPress, Yoast, etc.).
### Usage
```bash
python crawl_blog.py --sitemap <SITEMAP_URL> [options]
``` ```
### Options ### Options
| Argument | Default | Description | | Argument | Default | Description |
|----------|---------|-------------| |----------|---------|-------------|
| `--sitemap` | (required) | Sitemap URL |
| `--out-dir` | `./blog_data` | Output directory |
| `--keywords` | `<out-dir>/keywords.json` | Keywords JSON path |
| `--limit N` | 5 | Process first N articles | | `--limit N` | 5 | Process first N articles |
| `--all` | — | Process all discovered articles | | `--all` | — | Process all articles |
| `--out-dir PATH` | `/mnt/ssd/orangepi-rag` | Output directory |
| `--models PATH` | `<out-dir>/orangepi_models.json` | Model dictionary path |
| `--sitemap URL` | `https://orangepi.vn/post-sitemap.xml` | Sitemap URL |
| `--sleep SEC` | 1.0 | Delay between Firecrawl calls | | `--sleep SEC` | 1.0 | Delay between Firecrawl calls |
| `--force` | — | Re-scrape cached articles | | `--force` | — | Re-scrape cached articles |
| `--max-words N` | 650 | Target words per chunk | | `--max-words N` | 650 | Target words per chunk |
| `--overlap-words N` | 100 | Overlap words between chunks | | `--overlap-words N` | 100 | Overlap words between chunks |
| `--language` | `en` | Default language code |
## Model Detection ### Output files
The pipeline uses `orangepi_models.json` to detect canonical Orange Pi product names in article text. The dictionary supports aliases per model (e.g., `"Orange Pi 5"`, `"OrangePi 5"`, `"OPi 5"`) and longest-match-first resolution to prevent false double-counts. | File | Description |
|------|-------------|
| `articles.jsonl` | Article records with keyword mentions |
| `chunks.jsonl` | Chunked content for embedding |
| `keywords.json` | Keyword dictionary used |
| `urls.json` | Discovered URLs |
| `raw/<slug>.json` | Raw Firecrawl responses |
| `markdown/<slug>.md` | Cleaned markdown |
| `errors.jsonl` | Failed URLs |
| `summary.json` | Crawl summary |
## Use Cases ---
- **Semantic search** over Vietnamese Orange Pi knowledge ## keywords.json — Keyword Dictionary
- **Q&A bots** for Orange Pi tutorials, OS installs, hardware guides
- **Product recommendation** based on article content Defines keywords to extract from crawled content. Supports categorized or flat format.
- **Fine-tuning** Vietnamese embedding models on SBC/embedded computing content
### Categorized format (recommended)
```json
[
{
"category": "hardware",
"keywords": ["Raspberry Pi", "Arduino", "ESP32"]
},
{
"category": "software",
"keywords": ["Docker", "Ubuntu", "Home Assistant"]
}
]
```
### Flat format
```json
["Raspberry Pi", "Docker", "Home Assistant", "MQTT"]
```
See `keywords_example.json` for a complete template.
---
## rag_app.py — RAG Query Application
FAISS-based vector search + LLM generation.
### Usage
```bash
# Build index (one-time)
python rag_app.py --build --data-dir ./blog_data --index-dir ./index
# Single query
python rag_app.py --query "Câu hỏi của bạn" --data-dir ./blog_data --index-dir ./index
# Interactive chat
python rag_app.py --interactive --data-dir ./blog_data --index-dir ./index
# Test retrieval only (no LLM needed)
python rag_app.py --query "test" --retrieve-only --data-dir ./blog_data --index-dir ./index
```
### Options
| Argument | Default | Description |
|----------|---------|-------------|
| `--data-dir` | `.` | Directory with chunks.jsonl |
| `--index-dir` | `./rag_index` | FAISS index directory |
| `--build` | — | Build index from chunks |
| `--query` | — | Query to answer |
| `--interactive` | — | Interactive chat mode |
| `--retrieve-only` | — | Test retrieval without LLM |
| `--top-k` | 5 | Number of chunks to retrieve |
| `--embed-model` | `paraphrase-multilingual-MiniLM-L12-v2` | Embedding model |
| `--llm-model` | `gpt-4o-mini` | LLM model name |
| `--llm-base-url` | `https://api.openai.com/v1` | LLM API base URL |
### LLM API configuration
Set in `.env`:
```bash
OPENAI_API_KEY=sk-...
# Or for other providers:
# LLM_BASE_URL=https://api.together.xyz/v1
# LLM_MODEL=meta-llama/Llama-3-70b-chat-hf
```
Compatible with any OpenAI-format API: OpenAI, Together.ai, Groq, Ollama, etc.
---
## crawl_orangepi_blog.py — OrangePi-specific Crawler
Specialized crawler for orangepi.vn with Orange Pi model detection.
```bash
python crawl_orangepi_blog.py --limit 5
python crawl_orangepi_blog.py --all
```
Uses `orangepi_models.json` for product mention detection (36 Orange Pi models with aliases).
---
## Architecture
```
Blog (sitemap)
crawl_blog.py ──► Firecrawl API ──► articles.jsonl
│ chunks.jsonl
│ keywords.json
│ raw/*.json
│ markdown/*.md
rag_app.py
├──► SentenceTransformer (embeddings)
├──► FAISS (vector index)
└──► LLM API (generation)
Answer + sources
```
## License ## License
Data sourced from [orangepi.vn](https://orangepi.vn). Check their site for content usage terms. Data sourced from respective blogs. Check each site for content usage terms.
+710
View File
@@ -0,0 +1,710 @@
#!/usr/bin/env python3
"""Generic blog crawler for RAG using Firecrawl.
Discovery is sitemap-first (Yoast/WordPress), extraction is Firecrawl
single-page scrape. Outputs:
articles.jsonl article-level structured records
chunks.jsonl chunk-level records for embedding/RAG
keywords.json keyword dictionary for extraction
urls.json discovered URL list with sitemap lastmod
raw/<slug>.json raw Firecrawl response per article
markdown/<slug>.md extracted markdown per article
errors.jsonl failed URLs/errors
summary.json crawl summary
Usage:
python3 crawl_blog.py --sitemap https://example.com/post-sitemap.xml --limit 5
python3 crawl_blog.py --sitemap https://example.com/post-sitemap.xml --all
python3 crawl_blog.py --sitemap https://example.com/post-sitemap.xml --all --keywords my_keywords.json
Requires FIRECRAWL_API_KEY in environment or .env file.
"""
from __future__ import annotations
import argparse
import datetime as dt
import hashlib
import html
import json
import os
import re
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
import xml.etree.ElementTree as ET
from pathlib import Path
from typing import Any
FIRECRAWL_SCRAPE_URL = "https://api.firecrawl.dev/v1/scrape"
WORD_RE = re.compile(r"\S+", re.UNICODE)
HEADING_RE = re.compile(r"^(#{1,6})\s+(.+?)\s*$")
MD_LINK_RE = re.compile(r"\[([^\]]+)\]\(([^)]+)\)")
MD_IMAGE_RE = re.compile(r"!\[[^\]]*\]\([^)]+\)")
HTML_TAG_RE = re.compile(r"<[^>]+>")
MULTI_SPACE_RE = re.compile(r"[ \t]+")
# ---------------------------------------------------------------------------
# Environment / dotenv
# ---------------------------------------------------------------------------
def load_dotenv(path: Path | None = None) -> None:
"""Load .env from project root or given path."""
candidates = [Path(__file__).parent / ".env"]
if path:
candidates.insert(0, path)
for env_path in candidates:
if not env_path.exists():
continue
for raw in env_path.read_text(encoding="utf-8", errors="ignore").splitlines():
line = raw.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, val = line.split("=", 1)
key = key.strip()
val = val.strip().strip('"').strip("'")
if key and key not in os.environ:
os.environ[key] = val
break
def now_iso() -> str:
return dt.datetime.now(dt.timezone.utc).isoformat()
# ---------------------------------------------------------------------------
# URL / sitemap helpers
# ---------------------------------------------------------------------------
def fetch_bytes(url: str, timeout: int = 30) -> bytes:
req = urllib.request.Request(
url,
headers={
"User-Agent": "BlogCrawler-RAG/1.0",
"Accept": "application/xml,text/xml,text/html,*/*",
},
)
with urllib.request.urlopen(req, timeout=timeout) as resp:
return resp.read()
def parse_sitemap(url: str, domain_filter: str | None = None) -> list[dict[str, str | None]]:
"""Return [{'url': ..., 'lastmod': ...}] from a sitemap URL.
If domain_filter is provided, only include URLs matching that domain.
Works with Yoast (post-sitemap.xml) and generic WordPress sitemaps.
"""
data = fetch_bytes(url)
root = ET.fromstring(data)
ns = {"sm": "http://www.sitemaps.org/schemas/sitemap/0.9"}
# Check if this is a sitemap index (contains other sitemaps)
sitemaps = root.findall(".//sm:sitemap/sm:loc", ns)
if sitemaps:
# This is a sitemap index - recursively fetch child sitemaps
out: list[dict[str, str | None]] = []
for loc_el in sitemaps:
child_url = loc_el.text
if child_url:
try:
child_results = parse_sitemap(child_url, domain_filter)
out.extend(child_results)
except Exception as e:
print(f" WARN: failed to fetch child sitemap {child_url}: {e}", file=sys.stderr)
return out
# Regular sitemap - extract URLs
out: list[dict[str, str | None]] = []
for url_el in root.findall(".//sm:url", ns):
loc_el = url_el.find("sm:loc", ns)
if loc_el is None or not loc_el.text:
continue
lastmod_el = url_el.find("sm:lastmod", ns)
loc = loc_el.text.strip()
lastmod = lastmod_el.text.strip() if lastmod_el is not None and lastmod_el.text else None
if domain_filter and domain_filter not in loc:
continue
out.append({"url": loc, "lastmod": lastmod})
return out
def slug_from_url(url: str) -> str:
path = urllib.parse.urlparse(url).path.strip("/")
if not path:
path = "index"
slug = re.sub(r"\.(html?|php)$", "", path)
slug = re.sub(r"[^a-zA-Z0-9_-]+", "-", slug).strip("-").lower()
if not slug:
slug = hashlib.sha1(url.encode()).hexdigest()[:12]
return slug[:160]
def article_id_from_url(url: str, prefix: str = "blog") -> str:
return f"{prefix}_" + slug_from_url(url).replace("-", "_")
def source_from_url(url: str) -> str:
"""Extract domain name from URL as source identifier."""
parsed = urllib.parse.urlparse(url)
domain = parsed.netloc
if domain.startswith("www."):
domain = domain[4:]
return domain
# ---------------------------------------------------------------------------
# Firecrawl API
# ---------------------------------------------------------------------------
def firecrawl_scrape(url: str, api_key: str, timeout: int = 120) -> tuple[int, dict[str, Any]]:
payload = {
"url": url,
"formats": ["markdown"],
"onlyMainContent": True,
"waitFor": 1000,
"timeout": timeout * 1000,
}
body = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
FIRECRAWL_SCRAPE_URL,
data=body,
method="POST",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"User-Agent": "BlogCrawler-RAG/1.0",
},
)
try:
with urllib.request.urlopen(req, timeout=timeout + 20) as resp:
raw = resp.read().decode("utf-8", errors="replace")
return resp.status, json.loads(raw)
except urllib.error.HTTPError as e:
raw = e.read().decode("utf-8", errors="replace")
try:
data = json.loads(raw)
except Exception:
data = {"error": raw}
return e.code, data
# ---------------------------------------------------------------------------
# Markdown / text processing
# ---------------------------------------------------------------------------
def strip_markdown_to_text(markdown: str) -> str:
text = markdown.replace("\r\n", "\n")
text = MD_IMAGE_RE.sub("", text)
text = MD_LINK_RE.sub(r"\1", text)
text = re.sub(r"```.*?```", lambda m: m.group(0), text, flags=re.S)
text = re.sub(r"^#{1,6}\s+", "", text, flags=re.M)
text = re.sub(r"[*_`~]", "", text)
text = HTML_TAG_RE.sub(" ", text)
text = html.unescape(text)
text = MULTI_SPACE_RE.sub(" ", text)
text = re.sub(r"\n{3,}", "\n\n", text)
return text.strip()
def clean_markdown(markdown: str) -> str:
"""Light cleanup for common boilerplate while preserving content."""
lines = markdown.replace("\r\n", "\n").split("\n")
drop_contains = [
"Press enter for Accessibility",
"Accessibility menu",
"Popup heading",
"Skip to main",
"Bỏ qua nội dung",
"close",
]
cleaned: list[str] = []
for line in lines:
s = line.strip()
if any(x.lower() in s.lower() for x in drop_contains):
continue
cleaned.append(line.rstrip())
text = "\n".join(cleaned)
text = re.sub(r"\n{4,}", "\n\n\n", text)
return text.strip()
def extract_title(data: dict[str, Any], markdown: str, fallback_url: str) -> str:
meta = data.get("metadata") or {}
for key in ("title", "ogTitle"):
val = meta.get(key)
if isinstance(val, str) and val.strip():
return html.unescape(val.strip())
for line in markdown.splitlines():
m = HEADING_RE.match(line.strip())
if m:
return m.group(2).strip()
return slug_from_url(fallback_url).replace("-", " ").title()
def extract_description(data: dict[str, Any], text: str) -> str | None:
meta = data.get("metadata") or {}
for key in ("description", "ogDescription"):
val = meta.get(key)
if isinstance(val, str) and val.strip():
return html.unescape(val.strip())
return text[:300].strip() if text else None
# ---------------------------------------------------------------------------
# Keyword extraction (replaces product mentions from original)
# ---------------------------------------------------------------------------
def load_keywords(path: Path | None) -> list[dict[str, Any]]:
"""Load keyword dictionary from JSON.
Expected JSON shape (list of categories):
[
{
"category": "hardware",
"keywords": ["Raspberry Pi", "Arduino", "ESP32"]
},
{
"category": "software",
"keywords": ["Docker", "Ubuntu", "Debian"]
}
]
Or a flat list of keyword strings:
["Raspberry Pi", "Docker", "Home Assistant"]
"""
if path is None or not path.exists():
return []
data = json.loads(path.read_text(encoding="utf-8"))
if isinstance(data, list) and all(isinstance(x, str) for x in data):
# Flat list of strings -> wrap into single category
return [{"category": "general", "keywords": data}]
if not isinstance(data, list):
raise ValueError(f"keywords JSON must be a list: {path}")
categories: list[dict[str, Any]] = []
for row in data:
if not isinstance(row, dict):
continue
category = str(row.get("category") or "general").strip()
kw_list = row.get("keywords") or []
if not isinstance(kw_list, list):
continue
keywords = sorted({str(k).strip() for k in kw_list if str(k).strip()}, key=len, reverse=True)
if keywords:
categories.append({"category": category, "keywords": keywords})
return categories
def _alias_to_regex(alias: str) -> re.Pattern[str]:
"""Compile a keyword regex with flexible whitespace and safe boundaries."""
alias = html.unescape(alias or "").strip()
alias = alias.replace("\u00a0", " ")
alias = re.sub(r"[\u2010-\u2015]", "-", alias)
pat = re.escape(alias).replace(r"\ ", r"\s+")
return re.compile(rf"(?<![A-Za-z0-9]){pat}(?![A-Za-z0-9])", re.I | re.U)
def spans_overlap(a: tuple[int, int], b: tuple[int, int]) -> bool:
return a[0] < b[1] and b[0] < a[1]
def keyword_mentions_detail(text: str, categories: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Return keyword mentions grouped by category.
Longer keywords are processed first and reserve their character spans.
This prevents false double-counts such as "Orange Pi 5" and "Orange Pi"
matching the same text span.
"""
if not categories:
return []
hay = text
hay = html.unescape(hay or "")
hay = hay.replace("\u00a0", " ")
hay = re.sub(r"[\u2010-\u2015]", "-", hay)
hay = re.sub(r"\s+", " ", hay)
# Build flat list of (category, keyword, regex) sorted by keyword length desc
all_kw: list[tuple[str, str, re.Pattern[str]]] = []
for cat in categories:
for kw in cat["keywords"]:
all_kw.append((cat["category"], kw, _alias_to_regex(kw)))
all_kw.sort(key=lambda x: len(x[1]), reverse=True)
details: list[dict[str, Any]] = []
occupied: list[tuple[int, int]] = []
# Group results by category
cat_results: dict[str, dict[str, Any]] = {}
for category, keyword, rx in all_kw:
for m in rx.finditer(hay):
span = (m.start(), m.end())
if any(spans_overlap(span, used) for used in occupied):
continue
occupied.append(span)
if category not in cat_results:
cat_results[category] = {
"category": category,
"matched_keywords": {},
"total_count": 0,
}
entry = cat_results[category]
entry["matched_keywords"].setdefault(keyword, 0)
entry["matched_keywords"][keyword] += 1
entry["total_count"] += 1
for cat_data in cat_results.values():
cat_data["matched_keywords"] = dict(
sorted(cat_data["matched_keywords"].items(), key=lambda x: -x[1])
)
details.append(cat_data)
return sorted(details, key=lambda d: -d["total_count"])
def keyword_mentions(text: str, categories: list[dict[str, Any]]) -> list[str]:
"""Return flat list of all matched keywords."""
if not categories:
return []
all_matched = []
for detail in keyword_mentions_detail(text, categories):
all_matched.extend(detail["matched_keywords"].keys())
return all_matched
# ---------------------------------------------------------------------------
# Topic inference
# ---------------------------------------------------------------------------
def infer_topic(title: str, text: str, categories: list[dict[str, Any]] | None = None) -> str | None:
"""Infer topic from content. Uses keyword categories if available."""
hay = (title + "\n" + text[:2000]).lower()
# If categories are provided, use them for topic inference
if categories:
best_category = None
best_count = 0
for cat in categories:
count = sum(1 for kw in cat["keywords"] if kw.lower() in hay)
if count > best_count:
best_count = count
best_category = cat["category"]
if best_category and best_count > 0:
return best_category
# Fallback: common topic rules
rules = [
("docker", "docker"),
("kubernetes", "kubernetes"),
("linux", "linux"),
("ubuntu", "linux"),
("debian", "linux"),
("python", "programming"),
("javascript", "programming"),
("home assistant", "home assistant"),
("iot", "iot"),
("ai", "ai"),
("machine learning", "ai"),
]
for needle, topic in rules:
if needle in hay:
return topic
return None
# ---------------------------------------------------------------------------
# Chunking
# ---------------------------------------------------------------------------
def chunk_markdown(
markdown: str,
article: dict[str, Any],
categories: list[dict[str, Any]] | None = None,
max_words: int = 650,
overlap_words: int = 100,
) -> list[dict[str, Any]]:
"""Chunk markdown by paragraphs/headings with approximate word limits."""
blocks = re.split(r"\n\s*\n", markdown.strip()) if markdown.strip() else []
chunks: list[dict[str, Any]] = []
current: list[str] = []
current_words = 0
section = None
current_section = None
def words_of(s: str) -> list[str]:
return WORD_RE.findall(s)
def flush() -> None:
nonlocal current, current_words, current_section
content = "\n\n".join(current).strip()
if not content:
current = []
current_words = 0
return
idx = len(chunks)
chunk_kw = keyword_mentions(content, categories or []) if categories else []
chunk_kw_detail = keyword_mentions_detail(content, categories or []) if categories else []
chunks.append({
"chunk_id": f"{article['id']}__chunk_{idx:04d}",
"article_id": article["id"],
"url": article["url"],
"title": article["title"],
"section": current_section,
"language": article.get("language", "en"),
"content": content,
"metadata": {
"source": article.get("source"),
"type": article.get("type"),
"keyword_mentions": chunk_kw,
"keyword_mentions_detail": chunk_kw_detail,
"article_keyword_mentions": article.get("keywords", []),
"topic": article.get("topic"),
"modified_at": article.get("modified_at"),
},
})
if overlap_words > 0:
tail: list[str] = []
count = 0
for b in reversed(current):
bw = len(words_of(b))
if tail and count + bw > overlap_words:
break
tail.insert(0, b)
count += bw
current = tail
current_words = count
else:
current = []
current_words = 0
for block in blocks:
b = block.strip()
if not b:
continue
m = HEADING_RE.match(b.splitlines()[0].strip())
if m:
section = m.group(2).strip()
bw = len(words_of(b))
if current and current_words + bw > max_words:
flush()
if not current:
current_section = section
if bw > max_words * 1.5:
words = words_of(b)
start = 0
while start < len(words):
part = " ".join(words[start:start + max_words])
if current and current_words + len(words_of(part)) > max_words:
flush()
current.append(part)
current_words += len(words_of(part))
flush()
start += max_words - overlap_words
continue
current.append(b)
current_words += bw
if current:
flush()
return chunks
# ---------------------------------------------------------------------------
# JSONL helpers
# ---------------------------------------------------------------------------
def append_jsonl(path: Path, record: dict[str, Any]) -> None:
with path.open("a", encoding="utf-8") as f:
f.write(json.dumps(record, ensure_ascii=False, sort_keys=True) + "\n")
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(
description="Generic blog crawler for RAG (Firecrawl + sitemap)",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Crawl 5 articles from a blog
python3 crawl_blog.py --sitemap https://example.com/post-sitemap.xml --limit 5
# Crawl all articles
python3 crawl_blog.py --sitemap https://example.com/post-sitemap.xml --all
# Use custom keywords for extraction
python3 crawl_blog.py --sitemap https://example.com/post-sitemap.xml --all --keywords keywords.json
# Output to custom directory
python3 crawl_blog.py --sitemap https://example.com/post-sitemap.xml --all --out-dir ./my_data
""",
)
parser.add_argument("--sitemap", required=True, help="Sitemap URL (e.g. https://example.com/post-sitemap.xml)")
parser.add_argument("--out-dir", type=Path, default=Path("./blog_data"), help="Output directory")
parser.add_argument("--keywords", type=Path, default=None, help="Keywords JSON path; defaults to <out-dir>/keywords.json")
parser.add_argument("--limit", type=int, default=None, help="Only process first N article URLs")
parser.add_argument("--all", action="store_true", help="Process all discovered article URLs")
parser.add_argument("--sleep", type=float, default=1.0, help="Delay between Firecrawl calls (seconds)")
parser.add_argument("--force", action="store_true", help="Re-scrape even if raw file exists")
parser.add_argument("--max-words", type=int, default=650, help="Target words per chunk")
parser.add_argument("--overlap-words", type=int, default=100, help="Overlap words between chunks")
parser.add_argument("--language", default="en", help="Default language code for articles")
args = parser.parse_args(argv)
if not args.all and args.limit is None:
args.limit = 5
load_dotenv()
api_key = os.environ.get("FIRECRAWL_API_KEY")
if not api_key:
print("ERROR: FIRECRAWL_API_KEY is not set in environment or .env file", file=sys.stderr)
return 2
out_dir: Path = args.out_dir
out_dir.mkdir(parents=True, exist_ok=True)
# Load keywords
keywords_path: Path = args.keywords or (out_dir / "keywords.json")
categories = load_keywords(keywords_path)
# Create output directories
raw_dir = out_dir / "raw"
md_dir = out_dir / "markdown"
raw_dir.mkdir(parents=True, exist_ok=True)
md_dir.mkdir(parents=True, exist_ok=True)
# Discover URLs from sitemap
source_domain = source_from_url(args.sitemap)
urls = parse_sitemap(args.sitemap)
urls_path = out_dir / "urls.json"
urls_path.write_text(
json.dumps({"sitemap": args.sitemap, "source": source_domain, "count": len(urls), "urls": urls}, ensure_ascii=False, indent=2),
encoding="utf-8",
)
selected = urls if args.all else urls[: args.limit]
articles_path = out_dir / "articles.jsonl"
chunks_path = out_dir / "chunks.jsonl"
errors_path = out_dir / "errors.jsonl"
if args.force:
articles_path.unlink(missing_ok=True)
chunks_path.unlink(missing_ok=True)
errors_path.unlink(missing_ok=True)
print(f"Source: {source_domain}")
print(f"Discovered article URLs: {len(urls)}")
print(f"Processing: {len(selected)}")
print(f"Output: {out_dir}")
print(f"Keywords: {keywords_path} ({sum(len(c['keywords']) for c in categories)} keywords in {len(categories)} categories)")
ok = 0
failed = 0
total_chunks = 0
for idx, item in enumerate(selected, 1):
url = str(item["url"])
lastmod = item.get("lastmod")
slug = slug_from_url(url)
raw_path = raw_dir / f"{slug}.json"
md_path = md_dir / f"{slug}.md"
print(f"[{idx}/{len(selected)}] {url}")
# Use cached raw if available
if raw_path.exists() and not args.force:
try:
raw_data = json.loads(raw_path.read_text(encoding="utf-8"))
status = int(raw_data.get("_http_status", 200))
except Exception as e:
append_jsonl(errors_path, {"url": url, "error": f"read cached raw failed: {e}", "at": now_iso()})
failed += 1
continue
else:
status, raw_data = firecrawl_scrape(url, api_key)
raw_data["_http_status"] = status
raw_data["_source_url"] = url
raw_data["_scraped_at"] = now_iso()
raw_path.write_text(json.dumps(raw_data, ensure_ascii=False, indent=2), encoding="utf-8")
time.sleep(args.sleep)
if status >= 400 or not raw_data.get("success"):
append_jsonl(errors_path, {"url": url, "http_status": status, "error": raw_data, "at": now_iso()})
print(f" ERROR status={status} success={raw_data.get('success')}")
failed += 1
continue
data = raw_data.get("data") or {}
markdown = data.get("markdown") or ""
markdown = clean_markdown(markdown)
text = strip_markdown_to_text(markdown)
if len(text) < 100:
append_jsonl(errors_path, {"url": url, "http_status": status, "error": "too little text", "text_chars": len(text), "at": now_iso()})
print(f" ERROR too little text chars={len(text)}")
failed += 1
continue
title = extract_title(data, markdown, url)
desc = extract_description(data, text)
kw_detail = keyword_mentions_detail(title + "\n" + text, categories)
kw_list = [kw for cat in kw_detail for kw in cat["matched_keywords"].keys()]
article = {
"id": article_id_from_url(url),
"url": url,
"source": source_domain,
"type": "blog_article",
"title": title,
"description": desc,
"published_at": None,
"modified_at": lastmod,
"language": args.language,
"markdown": markdown,
"text": text,
"tags": [],
"keywords": kw_list,
"keyword_mentions_detail": kw_detail,
"topic": infer_topic(title, text, categories),
"scraped_at": raw_data.get("_scraped_at") or now_iso(),
"metadata": data.get("metadata") or {},
}
chunks = chunk_markdown(markdown, article, categories=categories, max_words=args.max_words, overlap_words=args.overlap_words)
md_path.write_text(markdown, encoding="utf-8")
append_jsonl(articles_path, article)
for chunk in chunks:
append_jsonl(chunks_path, chunk)
ok += 1
total_chunks += len(chunks)
print(f" OK title={title!r} markdown_chars={len(markdown)} chunks={len(chunks)} keywords={kw_list[:5]}")
summary = {
"sitemap": args.sitemap,
"source": source_domain,
"discovered": len(urls),
"processed": len(selected),
"ok": ok,
"failed": failed,
"chunks": total_chunks,
"out_dir": str(out_dir),
"keywords_path": str(keywords_path),
"keywords_loaded": sum(len(c["keywords"]) for c in categories),
"finished_at": now_iso(),
}
(out_dir / "summary.json").write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8")
print("SUMMARY", json.dumps(summary, ensure_ascii=False))
return 0 if failed == 0 else 1
if __name__ == "__main__":
raise SystemExit(main())
+86
View File
@@ -0,0 +1,86 @@
[
{
"category": "hardware",
"keywords": [
"Raspberry Pi",
"Orange Pi",
"Arduino",
"ESP32",
"ESP8266",
"BeagleBone",
"NVIDIA Jetson",
"STM32",
"GPIO",
"SPI",
"I2C",
"UART"
]
},
{
"category": "operating_system",
"keywords": [
"Linux",
"Ubuntu",
"Debian",
"Raspberry Pi OS",
"Armbian",
"DietPi",
"Manjaro",
"Fedora",
"CentOS",
"FreeBSD",
"OpenWrt",
"Home Assistant OS"
]
},
{
"category": "software",
"keywords": [
"Docker",
"Kubernetes",
"Home Assistant",
"MQTT",
"Node-RED",
"Jellyfin",
"Plex",
"Pi-hole",
"AdGuard",
"Nginx",
"Apache",
"Samba",
"Kodi",
"OctoPrint"
]
},
{
"category": "ai_ml",
"keywords": [
"TensorFlow",
"PyTorch",
"OpenCV",
"YOLO",
"LLM",
"GPT",
"machine learning",
"deep learning",
"neural network",
"NPU",
"inference"
]
},
{
"category": "networking",
"keywords": [
"VPN",
"WireGuard",
"ZeroTier",
"Tailscale",
"firewall",
"router",
"WiFi",
"Bluetooth",
"Zigbee",
"LoRa"
]
}
]