711 lines
25 KiB
Python
711 lines
25 KiB
Python
#!/usr/bin/env python3
|
|
"""Generic blog crawler for RAG using Firecrawl.
|
|
|
|
Discovery is sitemap-first (Yoast/WordPress), extraction is Firecrawl
|
|
single-page scrape. Outputs:
|
|
|
|
articles.jsonl article-level structured records
|
|
chunks.jsonl chunk-level records for embedding/RAG
|
|
keywords.json keyword dictionary for extraction
|
|
urls.json discovered URL list with sitemap lastmod
|
|
raw/<slug>.json raw Firecrawl response per article
|
|
markdown/<slug>.md extracted markdown per article
|
|
errors.jsonl failed URLs/errors
|
|
summary.json crawl summary
|
|
|
|
Usage:
|
|
python3 crawl_blog.py --sitemap https://example.com/post-sitemap.xml --limit 5
|
|
python3 crawl_blog.py --sitemap https://example.com/post-sitemap.xml --all
|
|
python3 crawl_blog.py --sitemap https://example.com/post-sitemap.xml --all --keywords my_keywords.json
|
|
|
|
Requires FIRECRAWL_API_KEY in environment or .env file.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import datetime as dt
|
|
import hashlib
|
|
import html
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
import xml.etree.ElementTree as ET
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
FIRECRAWL_SCRAPE_URL = "https://api.firecrawl.dev/v1/scrape"
|
|
|
|
WORD_RE = re.compile(r"\S+", re.UNICODE)
|
|
HEADING_RE = re.compile(r"^(#{1,6})\s+(.+?)\s*$")
|
|
MD_LINK_RE = re.compile(r"\[([^\]]+)\]\(([^)]+)\)")
|
|
MD_IMAGE_RE = re.compile(r"!\[[^\]]*\]\([^)]+\)")
|
|
HTML_TAG_RE = re.compile(r"<[^>]+>")
|
|
MULTI_SPACE_RE = re.compile(r"[ \t]+")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Environment / dotenv
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def load_dotenv(path: Path | None = None) -> None:
|
|
"""Load .env from project root or given path."""
|
|
candidates = [Path(__file__).parent / ".env"]
|
|
if path:
|
|
candidates.insert(0, path)
|
|
for env_path in candidates:
|
|
if not env_path.exists():
|
|
continue
|
|
for raw in env_path.read_text(encoding="utf-8", errors="ignore").splitlines():
|
|
line = raw.strip()
|
|
if not line or line.startswith("#") or "=" not in line:
|
|
continue
|
|
key, val = line.split("=", 1)
|
|
key = key.strip()
|
|
val = val.strip().strip('"').strip("'")
|
|
if key and key not in os.environ:
|
|
os.environ[key] = val
|
|
break
|
|
|
|
|
|
def now_iso() -> str:
|
|
return dt.datetime.now(dt.timezone.utc).isoformat()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# URL / sitemap helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def fetch_bytes(url: str, timeout: int = 30) -> bytes:
|
|
req = urllib.request.Request(
|
|
url,
|
|
headers={
|
|
"User-Agent": "BlogCrawler-RAG/1.0",
|
|
"Accept": "application/xml,text/xml,text/html,*/*",
|
|
},
|
|
)
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
return resp.read()
|
|
|
|
|
|
def parse_sitemap(url: str, domain_filter: str | None = None) -> list[dict[str, str | None]]:
|
|
"""Return [{'url': ..., 'lastmod': ...}] from a sitemap URL.
|
|
|
|
If domain_filter is provided, only include URLs matching that domain.
|
|
Works with Yoast (post-sitemap.xml) and generic WordPress sitemaps.
|
|
"""
|
|
data = fetch_bytes(url)
|
|
root = ET.fromstring(data)
|
|
ns = {"sm": "http://www.sitemaps.org/schemas/sitemap/0.9"}
|
|
|
|
# Check if this is a sitemap index (contains other sitemaps)
|
|
sitemaps = root.findall(".//sm:sitemap/sm:loc", ns)
|
|
if sitemaps:
|
|
# This is a sitemap index - recursively fetch child sitemaps
|
|
out: list[dict[str, str | None]] = []
|
|
for loc_el in sitemaps:
|
|
child_url = loc_el.text
|
|
if child_url:
|
|
try:
|
|
child_results = parse_sitemap(child_url, domain_filter)
|
|
out.extend(child_results)
|
|
except Exception as e:
|
|
print(f" WARN: failed to fetch child sitemap {child_url}: {e}", file=sys.stderr)
|
|
return out
|
|
|
|
# Regular sitemap - extract URLs
|
|
out: list[dict[str, str | None]] = []
|
|
for url_el in root.findall(".//sm:url", ns):
|
|
loc_el = url_el.find("sm:loc", ns)
|
|
if loc_el is None or not loc_el.text:
|
|
continue
|
|
lastmod_el = url_el.find("sm:lastmod", ns)
|
|
loc = loc_el.text.strip()
|
|
lastmod = lastmod_el.text.strip() if lastmod_el is not None and lastmod_el.text else None
|
|
if domain_filter and domain_filter not in loc:
|
|
continue
|
|
out.append({"url": loc, "lastmod": lastmod})
|
|
return out
|
|
|
|
|
|
def slug_from_url(url: str) -> str:
|
|
path = urllib.parse.urlparse(url).path.strip("/")
|
|
if not path:
|
|
path = "index"
|
|
slug = re.sub(r"\.(html?|php)$", "", path)
|
|
slug = re.sub(r"[^a-zA-Z0-9_-]+", "-", slug).strip("-").lower()
|
|
if not slug:
|
|
slug = hashlib.sha1(url.encode()).hexdigest()[:12]
|
|
return slug[:160]
|
|
|
|
|
|
def article_id_from_url(url: str, prefix: str = "blog") -> str:
|
|
return f"{prefix}_" + slug_from_url(url).replace("-", "_")
|
|
|
|
|
|
def source_from_url(url: str) -> str:
|
|
"""Extract domain name from URL as source identifier."""
|
|
parsed = urllib.parse.urlparse(url)
|
|
domain = parsed.netloc
|
|
if domain.startswith("www."):
|
|
domain = domain[4:]
|
|
return domain
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Firecrawl API
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def firecrawl_scrape(url: str, api_key: str, timeout: int = 120) -> tuple[int, dict[str, Any]]:
|
|
payload = {
|
|
"url": url,
|
|
"formats": ["markdown"],
|
|
"onlyMainContent": True,
|
|
"waitFor": 1000,
|
|
"timeout": timeout * 1000,
|
|
}
|
|
body = json.dumps(payload).encode("utf-8")
|
|
req = urllib.request.Request(
|
|
FIRECRAWL_SCRAPE_URL,
|
|
data=body,
|
|
method="POST",
|
|
headers={
|
|
"Authorization": f"Bearer {api_key}",
|
|
"Content-Type": "application/json",
|
|
"User-Agent": "BlogCrawler-RAG/1.0",
|
|
},
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=timeout + 20) as resp:
|
|
raw = resp.read().decode("utf-8", errors="replace")
|
|
return resp.status, json.loads(raw)
|
|
except urllib.error.HTTPError as e:
|
|
raw = e.read().decode("utf-8", errors="replace")
|
|
try:
|
|
data = json.loads(raw)
|
|
except Exception:
|
|
data = {"error": raw}
|
|
return e.code, data
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Markdown / text processing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def strip_markdown_to_text(markdown: str) -> str:
|
|
text = markdown.replace("\r\n", "\n")
|
|
text = MD_IMAGE_RE.sub("", text)
|
|
text = MD_LINK_RE.sub(r"\1", text)
|
|
text = re.sub(r"```.*?```", lambda m: m.group(0), text, flags=re.S)
|
|
text = re.sub(r"^#{1,6}\s+", "", text, flags=re.M)
|
|
text = re.sub(r"[*_`~]", "", text)
|
|
text = HTML_TAG_RE.sub(" ", text)
|
|
text = html.unescape(text)
|
|
text = MULTI_SPACE_RE.sub(" ", text)
|
|
text = re.sub(r"\n{3,}", "\n\n", text)
|
|
return text.strip()
|
|
|
|
|
|
def clean_markdown(markdown: str) -> str:
|
|
"""Light cleanup for common boilerplate while preserving content."""
|
|
lines = markdown.replace("\r\n", "\n").split("\n")
|
|
drop_contains = [
|
|
"Press enter for Accessibility",
|
|
"Accessibility menu",
|
|
"Popup heading",
|
|
"Skip to main",
|
|
"Bỏ qua nội dung",
|
|
"close",
|
|
]
|
|
cleaned: list[str] = []
|
|
for line in lines:
|
|
s = line.strip()
|
|
if any(x.lower() in s.lower() for x in drop_contains):
|
|
continue
|
|
cleaned.append(line.rstrip())
|
|
text = "\n".join(cleaned)
|
|
text = re.sub(r"\n{4,}", "\n\n\n", text)
|
|
return text.strip()
|
|
|
|
|
|
def extract_title(data: dict[str, Any], markdown: str, fallback_url: str) -> str:
|
|
meta = data.get("metadata") or {}
|
|
for key in ("title", "ogTitle"):
|
|
val = meta.get(key)
|
|
if isinstance(val, str) and val.strip():
|
|
return html.unescape(val.strip())
|
|
for line in markdown.splitlines():
|
|
m = HEADING_RE.match(line.strip())
|
|
if m:
|
|
return m.group(2).strip()
|
|
return slug_from_url(fallback_url).replace("-", " ").title()
|
|
|
|
|
|
def extract_description(data: dict[str, Any], text: str) -> str | None:
|
|
meta = data.get("metadata") or {}
|
|
for key in ("description", "ogDescription"):
|
|
val = meta.get(key)
|
|
if isinstance(val, str) and val.strip():
|
|
return html.unescape(val.strip())
|
|
return text[:300].strip() if text else None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Keyword extraction (replaces product mentions from original)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def load_keywords(path: Path | None) -> list[dict[str, Any]]:
|
|
"""Load keyword dictionary from JSON.
|
|
|
|
Expected JSON shape (list of categories):
|
|
[
|
|
{
|
|
"category": "hardware",
|
|
"keywords": ["Raspberry Pi", "Arduino", "ESP32"]
|
|
},
|
|
{
|
|
"category": "software",
|
|
"keywords": ["Docker", "Ubuntu", "Debian"]
|
|
}
|
|
]
|
|
|
|
Or a flat list of keyword strings:
|
|
["Raspberry Pi", "Docker", "Home Assistant"]
|
|
"""
|
|
if path is None or not path.exists():
|
|
return []
|
|
data = json.loads(path.read_text(encoding="utf-8"))
|
|
|
|
if isinstance(data, list) and all(isinstance(x, str) for x in data):
|
|
# Flat list of strings -> wrap into single category
|
|
return [{"category": "general", "keywords": data}]
|
|
|
|
if not isinstance(data, list):
|
|
raise ValueError(f"keywords JSON must be a list: {path}")
|
|
|
|
categories: list[dict[str, Any]] = []
|
|
for row in data:
|
|
if not isinstance(row, dict):
|
|
continue
|
|
category = str(row.get("category") or "general").strip()
|
|
kw_list = row.get("keywords") or []
|
|
if not isinstance(kw_list, list):
|
|
continue
|
|
keywords = sorted({str(k).strip() for k in kw_list if str(k).strip()}, key=len, reverse=True)
|
|
if keywords:
|
|
categories.append({"category": category, "keywords": keywords})
|
|
return categories
|
|
|
|
|
|
def _alias_to_regex(alias: str) -> re.Pattern[str]:
|
|
"""Compile a keyword regex with flexible whitespace and safe boundaries."""
|
|
alias = html.unescape(alias or "").strip()
|
|
alias = alias.replace("\u00a0", " ")
|
|
alias = re.sub(r"[\u2010-\u2015]", "-", alias)
|
|
pat = re.escape(alias).replace(r"\ ", r"\s+")
|
|
return re.compile(rf"(?<![A-Za-z0-9]){pat}(?![A-Za-z0-9])", re.I | re.U)
|
|
|
|
|
|
def spans_overlap(a: tuple[int, int], b: tuple[int, int]) -> bool:
|
|
return a[0] < b[1] and b[0] < a[1]
|
|
|
|
|
|
def keyword_mentions_detail(text: str, categories: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
|
"""Return keyword mentions grouped by category.
|
|
|
|
Longer keywords are processed first and reserve their character spans.
|
|
This prevents false double-counts such as "Orange Pi 5" and "Orange Pi"
|
|
matching the same text span.
|
|
"""
|
|
if not categories:
|
|
return []
|
|
|
|
hay = text
|
|
hay = html.unescape(hay or "")
|
|
hay = hay.replace("\u00a0", " ")
|
|
hay = re.sub(r"[\u2010-\u2015]", "-", hay)
|
|
hay = re.sub(r"\s+", " ", hay)
|
|
|
|
# Build flat list of (category, keyword, regex) sorted by keyword length desc
|
|
all_kw: list[tuple[str, str, re.Pattern[str]]] = []
|
|
for cat in categories:
|
|
for kw in cat["keywords"]:
|
|
all_kw.append((cat["category"], kw, _alias_to_regex(kw)))
|
|
all_kw.sort(key=lambda x: len(x[1]), reverse=True)
|
|
|
|
details: list[dict[str, Any]] = []
|
|
occupied: list[tuple[int, int]] = []
|
|
|
|
# Group results by category
|
|
cat_results: dict[str, dict[str, Any]] = {}
|
|
|
|
for category, keyword, rx in all_kw:
|
|
for m in rx.finditer(hay):
|
|
span = (m.start(), m.end())
|
|
if any(spans_overlap(span, used) for used in occupied):
|
|
continue
|
|
occupied.append(span)
|
|
|
|
if category not in cat_results:
|
|
cat_results[category] = {
|
|
"category": category,
|
|
"matched_keywords": {},
|
|
"total_count": 0,
|
|
}
|
|
entry = cat_results[category]
|
|
entry["matched_keywords"].setdefault(keyword, 0)
|
|
entry["matched_keywords"][keyword] += 1
|
|
entry["total_count"] += 1
|
|
|
|
for cat_data in cat_results.values():
|
|
cat_data["matched_keywords"] = dict(
|
|
sorted(cat_data["matched_keywords"].items(), key=lambda x: -x[1])
|
|
)
|
|
details.append(cat_data)
|
|
|
|
return sorted(details, key=lambda d: -d["total_count"])
|
|
|
|
|
|
def keyword_mentions(text: str, categories: list[dict[str, Any]]) -> list[str]:
|
|
"""Return flat list of all matched keywords."""
|
|
if not categories:
|
|
return []
|
|
all_matched = []
|
|
for detail in keyword_mentions_detail(text, categories):
|
|
all_matched.extend(detail["matched_keywords"].keys())
|
|
return all_matched
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Topic inference
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def infer_topic(title: str, text: str, categories: list[dict[str, Any]] | None = None) -> str | None:
|
|
"""Infer topic from content. Uses keyword categories if available."""
|
|
hay = (title + "\n" + text[:2000]).lower()
|
|
|
|
# If categories are provided, use them for topic inference
|
|
if categories:
|
|
best_category = None
|
|
best_count = 0
|
|
for cat in categories:
|
|
count = sum(1 for kw in cat["keywords"] if kw.lower() in hay)
|
|
if count > best_count:
|
|
best_count = count
|
|
best_category = cat["category"]
|
|
if best_category and best_count > 0:
|
|
return best_category
|
|
|
|
# Fallback: common topic rules
|
|
rules = [
|
|
("docker", "docker"),
|
|
("kubernetes", "kubernetes"),
|
|
("linux", "linux"),
|
|
("ubuntu", "linux"),
|
|
("debian", "linux"),
|
|
("python", "programming"),
|
|
("javascript", "programming"),
|
|
("home assistant", "home assistant"),
|
|
("iot", "iot"),
|
|
("ai", "ai"),
|
|
("machine learning", "ai"),
|
|
]
|
|
for needle, topic in rules:
|
|
if needle in hay:
|
|
return topic
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Chunking
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def chunk_markdown(
|
|
markdown: str,
|
|
article: dict[str, Any],
|
|
categories: list[dict[str, Any]] | None = None,
|
|
max_words: int = 650,
|
|
overlap_words: int = 100,
|
|
) -> list[dict[str, Any]]:
|
|
"""Chunk markdown by paragraphs/headings with approximate word limits."""
|
|
blocks = re.split(r"\n\s*\n", markdown.strip()) if markdown.strip() else []
|
|
chunks: list[dict[str, Any]] = []
|
|
current: list[str] = []
|
|
current_words = 0
|
|
section = None
|
|
current_section = None
|
|
|
|
def words_of(s: str) -> list[str]:
|
|
return WORD_RE.findall(s)
|
|
|
|
def flush() -> None:
|
|
nonlocal current, current_words, current_section
|
|
content = "\n\n".join(current).strip()
|
|
if not content:
|
|
current = []
|
|
current_words = 0
|
|
return
|
|
idx = len(chunks)
|
|
chunk_kw = keyword_mentions(content, categories or []) if categories else []
|
|
chunk_kw_detail = keyword_mentions_detail(content, categories or []) if categories else []
|
|
chunks.append({
|
|
"chunk_id": f"{article['id']}__chunk_{idx:04d}",
|
|
"article_id": article["id"],
|
|
"url": article["url"],
|
|
"title": article["title"],
|
|
"section": current_section,
|
|
"language": article.get("language", "en"),
|
|
"content": content,
|
|
"metadata": {
|
|
"source": article.get("source"),
|
|
"type": article.get("type"),
|
|
"keyword_mentions": chunk_kw,
|
|
"keyword_mentions_detail": chunk_kw_detail,
|
|
"article_keyword_mentions": article.get("keywords", []),
|
|
"topic": article.get("topic"),
|
|
"modified_at": article.get("modified_at"),
|
|
},
|
|
})
|
|
if overlap_words > 0:
|
|
tail: list[str] = []
|
|
count = 0
|
|
for b in reversed(current):
|
|
bw = len(words_of(b))
|
|
if tail and count + bw > overlap_words:
|
|
break
|
|
tail.insert(0, b)
|
|
count += bw
|
|
current = tail
|
|
current_words = count
|
|
else:
|
|
current = []
|
|
current_words = 0
|
|
|
|
for block in blocks:
|
|
b = block.strip()
|
|
if not b:
|
|
continue
|
|
m = HEADING_RE.match(b.splitlines()[0].strip())
|
|
if m:
|
|
section = m.group(2).strip()
|
|
bw = len(words_of(b))
|
|
if current and current_words + bw > max_words:
|
|
flush()
|
|
if not current:
|
|
current_section = section
|
|
if bw > max_words * 1.5:
|
|
words = words_of(b)
|
|
start = 0
|
|
while start < len(words):
|
|
part = " ".join(words[start:start + max_words])
|
|
if current and current_words + len(words_of(part)) > max_words:
|
|
flush()
|
|
current.append(part)
|
|
current_words += len(words_of(part))
|
|
flush()
|
|
start += max_words - overlap_words
|
|
continue
|
|
current.append(b)
|
|
current_words += bw
|
|
if current:
|
|
flush()
|
|
return chunks
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# JSONL helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def append_jsonl(path: Path, record: dict[str, Any]) -> None:
|
|
with path.open("a", encoding="utf-8") as f:
|
|
f.write(json.dumps(record, ensure_ascii=False, sort_keys=True) + "\n")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
parser = argparse.ArgumentParser(
|
|
description="Generic blog crawler for RAG (Firecrawl + sitemap)",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
# Crawl 5 articles from a blog
|
|
python3 crawl_blog.py --sitemap https://example.com/post-sitemap.xml --limit 5
|
|
|
|
# Crawl all articles
|
|
python3 crawl_blog.py --sitemap https://example.com/post-sitemap.xml --all
|
|
|
|
# Use custom keywords for extraction
|
|
python3 crawl_blog.py --sitemap https://example.com/post-sitemap.xml --all --keywords keywords.json
|
|
|
|
# Output to custom directory
|
|
python3 crawl_blog.py --sitemap https://example.com/post-sitemap.xml --all --out-dir ./my_data
|
|
""",
|
|
)
|
|
parser.add_argument("--sitemap", required=True, help="Sitemap URL (e.g. https://example.com/post-sitemap.xml)")
|
|
parser.add_argument("--out-dir", type=Path, default=Path("./blog_data"), help="Output directory")
|
|
parser.add_argument("--keywords", type=Path, default=None, help="Keywords JSON path; defaults to <out-dir>/keywords.json")
|
|
parser.add_argument("--limit", type=int, default=None, help="Only process first N article URLs")
|
|
parser.add_argument("--all", action="store_true", help="Process all discovered article URLs")
|
|
parser.add_argument("--sleep", type=float, default=1.0, help="Delay between Firecrawl calls (seconds)")
|
|
parser.add_argument("--force", action="store_true", help="Re-scrape even if raw file exists")
|
|
parser.add_argument("--max-words", type=int, default=650, help="Target words per chunk")
|
|
parser.add_argument("--overlap-words", type=int, default=100, help="Overlap words between chunks")
|
|
parser.add_argument("--language", default="en", help="Default language code for articles")
|
|
args = parser.parse_args(argv)
|
|
|
|
if not args.all and args.limit is None:
|
|
args.limit = 5
|
|
|
|
load_dotenv()
|
|
api_key = os.environ.get("FIRECRAWL_API_KEY")
|
|
if not api_key:
|
|
print("ERROR: FIRECRAWL_API_KEY is not set in environment or .env file", file=sys.stderr)
|
|
return 2
|
|
|
|
out_dir: Path = args.out_dir
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Load keywords
|
|
keywords_path: Path = args.keywords or (out_dir / "keywords.json")
|
|
categories = load_keywords(keywords_path)
|
|
|
|
# Create output directories
|
|
raw_dir = out_dir / "raw"
|
|
md_dir = out_dir / "markdown"
|
|
raw_dir.mkdir(parents=True, exist_ok=True)
|
|
md_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Discover URLs from sitemap
|
|
source_domain = source_from_url(args.sitemap)
|
|
urls = parse_sitemap(args.sitemap)
|
|
urls_path = out_dir / "urls.json"
|
|
urls_path.write_text(
|
|
json.dumps({"sitemap": args.sitemap, "source": source_domain, "count": len(urls), "urls": urls}, ensure_ascii=False, indent=2),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
selected = urls if args.all else urls[: args.limit]
|
|
articles_path = out_dir / "articles.jsonl"
|
|
chunks_path = out_dir / "chunks.jsonl"
|
|
errors_path = out_dir / "errors.jsonl"
|
|
|
|
if args.force:
|
|
articles_path.unlink(missing_ok=True)
|
|
chunks_path.unlink(missing_ok=True)
|
|
errors_path.unlink(missing_ok=True)
|
|
|
|
print(f"Source: {source_domain}")
|
|
print(f"Discovered article URLs: {len(urls)}")
|
|
print(f"Processing: {len(selected)}")
|
|
print(f"Output: {out_dir}")
|
|
print(f"Keywords: {keywords_path} ({sum(len(c['keywords']) for c in categories)} keywords in {len(categories)} categories)")
|
|
|
|
ok = 0
|
|
failed = 0
|
|
total_chunks = 0
|
|
|
|
for idx, item in enumerate(selected, 1):
|
|
url = str(item["url"])
|
|
lastmod = item.get("lastmod")
|
|
slug = slug_from_url(url)
|
|
raw_path = raw_dir / f"{slug}.json"
|
|
md_path = md_dir / f"{slug}.md"
|
|
print(f"[{idx}/{len(selected)}] {url}")
|
|
|
|
# Use cached raw if available
|
|
if raw_path.exists() and not args.force:
|
|
try:
|
|
raw_data = json.loads(raw_path.read_text(encoding="utf-8"))
|
|
status = int(raw_data.get("_http_status", 200))
|
|
except Exception as e:
|
|
append_jsonl(errors_path, {"url": url, "error": f"read cached raw failed: {e}", "at": now_iso()})
|
|
failed += 1
|
|
continue
|
|
else:
|
|
status, raw_data = firecrawl_scrape(url, api_key)
|
|
raw_data["_http_status"] = status
|
|
raw_data["_source_url"] = url
|
|
raw_data["_scraped_at"] = now_iso()
|
|
raw_path.write_text(json.dumps(raw_data, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
time.sleep(args.sleep)
|
|
|
|
if status >= 400 or not raw_data.get("success"):
|
|
append_jsonl(errors_path, {"url": url, "http_status": status, "error": raw_data, "at": now_iso()})
|
|
print(f" ERROR status={status} success={raw_data.get('success')}")
|
|
failed += 1
|
|
continue
|
|
|
|
data = raw_data.get("data") or {}
|
|
markdown = data.get("markdown") or ""
|
|
markdown = clean_markdown(markdown)
|
|
text = strip_markdown_to_text(markdown)
|
|
if len(text) < 100:
|
|
append_jsonl(errors_path, {"url": url, "http_status": status, "error": "too little text", "text_chars": len(text), "at": now_iso()})
|
|
print(f" ERROR too little text chars={len(text)}")
|
|
failed += 1
|
|
continue
|
|
|
|
title = extract_title(data, markdown, url)
|
|
desc = extract_description(data, text)
|
|
kw_detail = keyword_mentions_detail(title + "\n" + text, categories)
|
|
kw_list = [kw for cat in kw_detail for kw in cat["matched_keywords"].keys()]
|
|
|
|
article = {
|
|
"id": article_id_from_url(url),
|
|
"url": url,
|
|
"source": source_domain,
|
|
"type": "blog_article",
|
|
"title": title,
|
|
"description": desc,
|
|
"published_at": None,
|
|
"modified_at": lastmod,
|
|
"language": args.language,
|
|
"markdown": markdown,
|
|
"text": text,
|
|
"tags": [],
|
|
"keywords": kw_list,
|
|
"keyword_mentions_detail": kw_detail,
|
|
"topic": infer_topic(title, text, categories),
|
|
"scraped_at": raw_data.get("_scraped_at") or now_iso(),
|
|
"metadata": data.get("metadata") or {},
|
|
}
|
|
chunks = chunk_markdown(markdown, article, categories=categories, max_words=args.max_words, overlap_words=args.overlap_words)
|
|
|
|
md_path.write_text(markdown, encoding="utf-8")
|
|
append_jsonl(articles_path, article)
|
|
for chunk in chunks:
|
|
append_jsonl(chunks_path, chunk)
|
|
|
|
ok += 1
|
|
total_chunks += len(chunks)
|
|
print(f" OK title={title!r} markdown_chars={len(markdown)} chunks={len(chunks)} keywords={kw_list[:5]}")
|
|
|
|
summary = {
|
|
"sitemap": args.sitemap,
|
|
"source": source_domain,
|
|
"discovered": len(urls),
|
|
"processed": len(selected),
|
|
"ok": ok,
|
|
"failed": failed,
|
|
"chunks": total_chunks,
|
|
"out_dir": str(out_dir),
|
|
"keywords_path": str(keywords_path),
|
|
"keywords_loaded": sum(len(c["keywords"]) for c in categories),
|
|
"finished_at": now_iso(),
|
|
}
|
|
(out_dir / "summary.json").write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
print("SUMMARY", json.dumps(summary, ensure_ascii=False))
|
|
return 0 if failed == 0 else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|