# -*- coding: utf-8 -*- """ 把几个公开帖子(小红书 / 微信公众号 / CSDN)的内容爬下来,归一化成 procedure-dsl/input/eval_case-*.json 那套 schema,并跑 SourceQualityEvaluator 打质量分。 每条 source 只要给 URL 即可(平台从域名自动识别),无需登录 / 无需后端: - 小红书 explore 页:解析页面里的 window.__INITIAL_STATE__(含 xsec_token 的 分享链最稳),拿 title / desc / 点赞 / 图片 / 时间。 - 微信公众号 mp.weixin.qq.com/s/:解析 og:title + #js_content 正文 + var ct 发布时间 + 正文图片。公众号的阅读/点赞需要登录态 cookie,拿不到, like_count 置 None。 - CSDN 文章页:解析 #content_views 正文 + 发布时间 + 正文图片。 输出 schema(与 eval_case-1.json 完全一致、字段同序): channel_content_id / title / content_type / body_text / like_count / publish_timestamp(字符串 "YYYY-MM-DD HH:MM:SS") / images / videos / channel / link / _quality_score / _quality_grade 用法: python scrape_selection_to_eval.py # 跑下面 SOURCES 里的全部链接 输出: ./scraped_selection/_.json # 每条一个文件 ./scraped_selection/_all.json # 合并成一个 list """ from __future__ import annotations import json import re import sys from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional import httpx from bs4 import BeautifulSoup # ── 让本脚本能 import 同仓的质量评估器(在上一级 script/ 目录)── SCRIPT_DIR = Path(__file__).resolve().parent # .../search_eval sys.path.insert(0, str(SCRIPT_DIR.parent)) # .../script from evaluate_source_quality import SourceQualityEvaluator # noqa: E402 OUT_DIR = SCRIPT_DIR / "scraped_selection" UA = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ) # ── 要爬的链接(按用户给的 4 条)── SOURCES: List[str] = [ "https://blog.csdn.net/2402_86571652/article/details/153465368", "https://mp.weixin.qq.com/s/3pZ3BAET1wPGwPV92zEu9A", "https://mp.weixin.qq.com/s/LWBNsgbwFl1NfDNOYEom-Q", "https://www.xiaohongshu.com/explore/6981939f000000000e03da6d?xsec_token=ABKp_PMFXV9IuxRL9a_48ovmBpnetn8v2aUefL9JW_E_U=&xsec_source=pc_search&source=web_explore_feed", ] # eval_case schema 的字段顺序 FIELD_ORDER = [ "channel_content_id", "title", "content_type", "body_text", "like_count", "publish_timestamp", "images", "videos", "channel", "link", "_quality_score", "_quality_grade", ] # ── 通用工具 ── def _get(url: str) -> str: """GET 一个页面,返回 HTML 文本。""" with httpx.Client(timeout=40, follow_redirects=True, headers={"User-Agent": UA}) as c: r = c.get(url) r.raise_for_status() return r.text def _fmt_ts(ms: Optional[int]) -> Optional[str]: """epoch 毫秒 -> 本地时间 'YYYY-MM-DD HH:MM:SS' 字符串。拿不到返回 None。""" if not ms: return None try: return datetime.fromtimestamp(int(ms) / 1000).strftime("%Y-%m-%d %H:%M:%S") except Exception: return None def detect_platform(url: str) -> str: if "xiaohongshu.com" in url: return "xhs" if "mp.weixin.qq.com" in url: return "gzh" if "csdn.net" in url: return "csdn" raise ValueError(f"不支持的链接(无法识别平台):{url}") # ── 小红书:解析 window.__INITIAL_STATE__ ── def _xhs_note_id(url: str) -> str: m = re.search(r"/(?:explore|discovery/item)/([0-9a-fA-F]+)", url) if not m: raise ValueError(f"无法从小红书链接解析 note id:{url}") return m.group(1) def _xhs_img_url(item: Dict[str, Any]) -> Optional[str]: """从 imageList 的一项里取一个可用图片 URL。""" if item.get("urlDefault"): return item["urlDefault"] for info in item.get("infoList") or []: if info.get("url"): return info["url"] return None def scrape_xhs(url: str) -> Dict[str, Any]: note_id = _xhs_note_id(url) html = _get(url) m = re.search(r"window\.__INITIAL_STATE__\s*=\s*(\{.*?\})", html, re.S) if not m: raise RuntimeError("小红书页面里没找到 __INITIAL_STATE__(可能被风控/需要登录)") # 页面 JSON 里会有裸 undefined,json 解析不了,替换成 null data = json.loads(m.group(1).replace("undefined", "null")) nd = (((data.get("note") or {}).get("noteDetailMap") or {}).get(note_id) or {}).get("note") or {} if not nd: raise RuntimeError(f"__INITIAL_STATE__ 里没有 note {note_id} 的详情") is_video = (nd.get("type") == "video") or bool(nd.get("video")) images = [] if not is_video: for it in nd.get("imageList") or []: u = _xhs_img_url(it) if u: images.append(u) videos = [] if is_video: # 视频地址藏得较深,能拿到就放,拿不到留空(不影响主流程) try: streams = nd["video"]["media"]["stream"] for codec in streams.values(): for s in codec: if s.get("masterUrl"): videos.append(s["masterUrl"]) break if videos: break except Exception: pass interact = nd.get("interactInfo") or {} like = interact.get("likedCount") try: like = int(like) except (TypeError, ValueError): like = None return { "channel_content_id": note_id, "title": (nd.get("title") or "").strip(), "content_type": "video" if is_video else "note", "body_text": (nd.get("desc") or "").strip(), "like_count": like, "_ts_ms": nd.get("time"), "images": images, "videos": videos, "channel": "xhs", "link": f"https://www.xiaohongshu.com/explore/{note_id}", } # ── 微信公众号:解析正文 HTML ── def scrape_gzh(url: str) -> Dict[str, Any]: html = _get(url) if "环境异常" in html and "js_content" not in html: raise RuntimeError("公众号返回风控页(环境异常)——换个网络/加 cookie 再试") soup = BeautifulSoup(html, "lxml") def og(prop: str) -> Optional[str]: tag = soup.find("meta", attrs={"property": prop}) return tag["content"].strip() if tag and tag.get("content") else None title = og("og:title") or "" if not title: h = soup.select_one("#activity-name") title = h.get_text(strip=True) if h else "" # 公众号名:页面里的 var nickname / #js_name acct = None mnick = re.search(r'var nickname\s*=\s*["\']([^"\']+)["\']', html) if mnick: acct = mnick.group(1).strip() if not acct: jn = soup.select_one("#js_name") acct = jn.get_text(strip=True) if jn else None body_el = soup.select_one("#js_content") body = body_el.get_text("\n", strip=True) if body_el else "" images: List[str] = [] if body_el: for img in body_el.find_all("img"): src = img.get("data-src") or img.get("src") if src and src.startswith("http"): images.append(src) # 发布时间:var ct = "<秒级 epoch>" ts_ms = None mct = re.search(r'var ct\s*=\s*["\'](\d+)["\']', html) if mct: ts_ms = int(mct.group(1)) * 1000 # 用链接里的短 token 作为稳定 id mtok = re.search(r"/s/([A-Za-z0-9_-]+)", url) cid = mtok.group(1) if mtok else url return { "channel_content_id": cid, "title": title, "content_type": "article", "body_text": body, "like_count": None, # 公众号阅读/点赞需登录态,拿不到 "_ts_ms": ts_ms, "images": images, "videos": [], "channel": "gzh", "channel_account_name": acct, # 额外信息,写到输出里方便溯源 "link": url.split("#")[0], } # ── CSDN:解析文章 HTML ── def scrape_csdn(url: str) -> Dict[str, Any]: html = _get(url) soup = BeautifulSoup(html, "lxml") def og(prop: str) -> Optional[str]: tag = soup.find("meta", attrs={"property": prop}) return tag["content"].strip() if tag and tag.get("content") else None title = og("og:title") or "" h = soup.select_one("h1.title-article, #articleContentId") if h: title = h.get_text(strip=True) or title title = re.sub(r"[-_]\s*CSDN.*$", "", title).strip() body_el = soup.select_one("#content_views") body = body_el.get_text("\n", strip=True) if body_el else "" images: List[str] = [] if body_el: for img in body_el.find_all("img"): src = img.get("src") or img.get("data-src") if src and src.startswith("http"): images.append(src) # 发布时间:meta article:published_time(ISO)或 span.time ts_ms = None pub = og("article:published_time") if pub: try: ts_ms = int(datetime.fromisoformat(pub.replace("Z", "+00:00")).timestamp() * 1000) except Exception: ts_ms = None if ts_ms is None: t = soup.select_one(".time, .article-bar-top .time") if t: mdt = re.search(r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}", t.get_text()) if mdt: try: ts_ms = int(datetime.strptime(mdt.group(0), "%Y-%m-%d %H:%M:%S").timestamp() * 1000) except Exception: ts_ms = None # 点赞数:CSDN 静态页常见 span#spanCount / .get-collection 之类,能抓到就抓 like = None for sel in ["#spanCount", "#is-like-imgactive + span", ".count"]: el = soup.select_one(sel) if el and el.get_text(strip=True).isdigit(): like = int(el.get_text(strip=True)) break mid = re.search(r"/article/details/(\d+)", url) cid = mid.group(1) if mid else url return { "channel_content_id": cid, "title": title, "content_type": "article", "body_text": body, "like_count": like, "_ts_ms": ts_ms, "images": images, "videos": [], "channel": "csdn", "link": url.split("#")[0].split("?")[0], } SCRAPERS = {"xhs": scrape_xhs, "gzh": scrape_gzh, "csdn": scrape_csdn} def normalize_and_score(raw: Dict[str, Any], evaluator: SourceQualityEvaluator) -> Dict[str, Any]: """跑质量分(用数值时间戳),再把时间戳转成字符串、按 schema 排好字段。""" ts_ms = raw.pop("_ts_ms", None) acct = raw.pop("channel_account_name", None) # 评估器读数值 publish_timestamp(与现有 pipeline 一致:传 epoch 毫秒) scoring_post = dict(raw) scoring_post["publish_timestamp"] = ts_ms or 0 res = evaluator.evaluate_post(scoring_post) raw["_quality_score"] = res["total_score"] raw["_quality_grade"] = res["grade"] # 输出用字符串时间戳 raw["publish_timestamp"] = _fmt_ts(ts_ms) ordered = {k: raw.get(k) for k in FIELD_ORDER} if acct: # 公众号名放在 schema 字段之后,作为溯源附注 ordered["channel_account_name"] = acct return ordered def main() -> None: OUT_DIR.mkdir(exist_ok=True) evaluator = SourceQualityEvaluator() results: List[Dict[str, Any]] = [] for url in SOURCES: plat = detect_platform(url) print(f"[{plat}] {url}") try: raw = SCRAPERS[plat](url) item = normalize_and_score(raw, evaluator) except Exception as e: print(f" !! 失败: {type(e).__name__}: {e}") continue fname = f"{item['channel']}_{item['channel_content_id']}.json" (OUT_DIR / fname).write_text( json.dumps(item, ensure_ascii=False, indent=2), encoding="utf-8" ) results.append(item) print(f" -> {fname} | {item['_quality_grade']} {item['_quality_score']} " f"| 正文 {len(item.get('body_text') or '')} 字 | 图 {len(item.get('images') or [])} " f"| 赞 {item.get('like_count')}") (OUT_DIR / "_all.json").write_text( json.dumps(results, ensure_ascii=False, indent=2), encoding="utf-8" ) print(f"\n完成:{len(results)}/{len(SOURCES)} 条 -> {OUT_DIR}") if __name__ == "__main__": main()