""" 从 raw_cases/case_*.json 中提取 source_url / 帖子链接, 解析 channel_content_id,再从 .cache/content_search 中查找对应的原始帖子数据。 主函数:extract_sources_to_json(raw_cases_dir) - 扫描该目录下所有 case_{platform}.json - 解析每个 "工序发现[].帖子链接"(新格式)或 "cases[].source_url"(旧格式) - 从项目根的 .cache/content_search/*.json 中匹配 channel_content_id - 把匹配到的完整 post 写入 {raw_cases_dir}/source.json """ import json import re from pathlib import Path from typing import Any, Dict, List, Optional, Tuple # ── URL → (platform, content_id) 解析 ──────────────────────────────── _URL_PATTERNS = [ # B站: https://www.bilibili.com/video/BV1xxx ("bili", re.compile(r"bilibili\.com/video/(BV[\w]+)")), # 小红书: https://www.xiaohongshu.com/explore/{id} 或 /discovery/item/{id} ("xhs", re.compile(r"xiaohongshu\.com/(?:explore|discovery/item)/([a-f0-9]+)")), # YouTube: https://www.youtube.com/watch?v={id} 或 https://youtu.be/{id} ("youtube", re.compile(r"(?:youtube\.com/watch\?v=|youtu\.be/)([\w-]+)")), # X/Twitter: https://x.com/{user}/status/{id} 或 twitter.com ("x", re.compile(r"(?:x\.com|twitter\.com)/[^/]+/status/(\d+)")), # 知乎: https://zhuanlan.zhihu.com/p/{id} 或 zhihu.com/question/{qid}/answer/{aid} ("zhihu", re.compile(r"zhuanlan\.zhihu\.com/p/(\d+)")), ("zhihu", re.compile(r"zhihu\.com/question/\d+/answer/(\d+)")), # 公众号: 通过 __biz 或整个 URL 作为 id(后备) ("gzh", re.compile(r"mp\.weixin\.qq\.com/s[/?]([^\s\"']+)")), ] def parse_url(url: str) -> Optional[Tuple[str, str]]: """从 URL 解析出 (platform, content_id)。返回 None 表示无法解析。""" if not url or not isinstance(url, str): return None for platform, pat in _URL_PATTERNS: m = pat.search(url) if m: return platform, m.group(1) return None # ── 从 case 文件中抽取所有链接 ──────────────────────────────── def extract_urls_from_case(case_data: Any) -> List[str]: """兼容新旧两种格式,返回 case 文件里出现的所有 URL。""" urls: List[str] = [] if not isinstance(case_data, dict): return urls # 新格式:工序发现[].帖子链接 for item in case_data.get("工序发现", []) or []: if isinstance(item, dict): link = item.get("帖子链接") or item.get("source_url") if link: urls.append(link) # 旧格式:cases[].source_url for item in case_data.get("cases", []) or []: if isinstance(item, dict): link = item.get("source_url") or item.get("帖子链接") if link: urls.append(link) return urls # ── 从 cache 中构建 (platform, content_id) → post 索引 ──────────────────────────────── def build_cache_index(cache_dir: Path, trace_ids: Optional[List[str]] = None) -> Dict[Tuple[str, str], Dict[str, Any]]: """ 构建 (platform, channel_content_id) -> post 映射。 Args: cache_dir: cache 目录路径 trace_ids: 可选的 trace_id 列表。如果提供,只加载这些特定的 cache 文件; 否则扫描所有 cache 文件 Returns: (platform, content_id) -> post 的映射字典 """ index: Dict[Tuple[str, str], Dict[str, Any]] = {} if not cache_dir.exists(): return index # 如果提供了 trace_ids,只加载这些特定文件 if trace_ids: cache_files = [cache_dir / f"{tid}.json" for tid in trace_ids if tid] cache_files = [f for f in cache_files if f.exists()] else: # 否则扫描所有 cache 文件 cache_files = list(cache_dir.glob("*.json")) for cache_file in cache_files: try: with open(cache_file, "r", encoding="utf-8") as f: data = json.load(f) except Exception: continue for key, entry in data.items(): if not key.startswith("search:"): continue platform = key.split(":", 1)[1] # 新格式:entry = {"history": [...], "latest_index": n} # 旧格式:entry = {"keyword": ..., "posts": [...]} if isinstance(entry, dict) and "history" in entry: post_lists = [h.get("posts", []) for h in entry.get("history", [])] elif isinstance(entry, dict) and "posts" in entry: post_lists = [entry.get("posts", [])] else: continue for posts in post_lists: for post in posts or []: if not isinstance(post, dict): continue cid = post.get("channel_content_id") if cid: # 直接用 (platform, content_id) 作为索引键 index[(platform, str(cid))] = post return index # ── 主入口 ──────────────────────────────── def extract_sources_to_json( raw_cases_dir: Path, cache_dir: Optional[Path] = None, output_name: str = "source.json", trace_ids: Optional[List[str]] = None, ) -> Dict[str, Any]: """ 扫描 raw_cases_dir 下的 case_*.json, 从 cache 中找出原始帖子,输出到 {raw_cases_dir}/{output_name}。 返回统计信息 dict。 """ raw_cases_dir = Path(raw_cases_dir) if cache_dir is None: # 项目根目录:script 文件往上三级 project_root = Path(__file__).resolve().parent.parent.parent.parent cache_dir = project_root / ".cache" / "content_search" cache_dir = Path(cache_dir) # 1. 构建 cache 索引 cache_index = build_cache_index(cache_dir, trace_ids=trace_ids) # 2. 加载已有的 source.json(如果存在) output_file = raw_cases_dir / output_name existing_sources = [] existing_ids = set() # (platform, channel_content_id) 集合用于去重 if output_file.exists(): try: with open(output_file, "r", encoding="utf-8") as f: existing_data = json.load(f) existing_sources = existing_data.get("sources", []) # 构建已有的 ID 集合 for src in existing_sources: key = (src.get("platform"), src.get("channel_content_id")) existing_ids.add(key) except Exception as e: print(f"Warning: Failed to load existing source.json: {e}") # 3. 扫描所有 case 文件 matched: List[Dict[str, Any]] = [] unmatched: List[Dict[str, Any]] = [] seen_keys: set = set(existing_ids) # 从已有的 ID 开始 for case_file in sorted(raw_cases_dir.glob("case_*.json")): # 跳过自己(如果 source.json 误被命名成 case_*) if case_file.name == output_name: continue try: with open(case_file, "r", encoding="utf-8") as f: case_data = json.load(f) except Exception as e: unmatched.append({"case_file": case_file.name, "error": str(e)}) continue urls = extract_urls_from_case(case_data) for url in urls: # 解析 URL 得到 platform 和 content_id parsed = parse_url(url) if not parsed: unmatched.append({ "case_file": case_file.name, "url": url, "reason": "url_parse_failed", }) continue platform, cid = parsed key = (platform, cid) if key in seen_keys: continue seen_keys.add(key) # 直接用 (platform, content_id) 在 cache 索引中查找 post = cache_index.get(key) if post: matched.append({ "case_file": case_file.name, "platform": platform, "channel_content_id": cid, "source_url": url, "post": post, }) else: unmatched.append({ "case_file": case_file.name, "platform": platform, "channel_content_id": cid, "source_url": url, "reason": "not_in_cache", }) # 4. 合并已有数据和新匹配的数据 all_sources = existing_sources + matched # 5. 写输出 output = { "total": len(all_sources), "cache_dir": str(cache_dir), "sources": all_sources, } output_file.parent.mkdir(parents=True, exist_ok=True) with open(output_file, "w", encoding="utf-8") as f: json.dump(output, f, ensure_ascii=False, indent=2) # 返回统计信息(包含 unmatched 用于日志输出) return { "total_matched": len(matched), "total_existing": len(existing_sources), "total_unmatched": len(unmatched), "output_file": str(output_file), } if __name__ == "__main__": # CLI:python extract_sources.py [cache_dir] import sys if len(sys.argv) < 2: print("Usage: python extract_sources.py [cache_dir]") sys.exit(1) raw_cases_dir = Path(sys.argv[1]) cache_dir = Path(sys.argv[2]) if len(sys.argv) > 2 else None result = extract_sources_to_json(raw_cases_dir, cache_dir=cache_dir) print(f"[OK] Matched: {result['total_matched']}, Unmatched: {result['total_unmatched']}") print(f" Output: {raw_cases_dir / 'source.json'}")