extract_sources.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. """
  2. 从 raw_cases/case_*.json 中提取 source_url / 帖子链接,
  3. 解析 channel_content_id,再从 .cache/content_search 中查找对应的原始帖子数据。
  4. 主函数:extract_sources_to_json(raw_cases_dir)
  5. - 扫描该目录下所有 case_{platform}.json
  6. - 解析每个 "工序发现[].帖子链接"(新格式)或 "cases[].source_url"(旧格式)
  7. - 从项目根的 .cache/content_search/*.json 中匹配 channel_content_id
  8. - 把匹配到的完整 post 写入 {raw_cases_dir}/source.json
  9. """
  10. import json
  11. import re
  12. from pathlib import Path
  13. from typing import Any, Dict, List, Optional, Tuple
  14. # ── URL → (platform, content_id) 解析 ────────────────────────────────
  15. _URL_PATTERNS = [
  16. # B站: https://www.bilibili.com/video/BV1xxx
  17. ("bili", re.compile(r"bilibili\.com/video/(BV[\w]+)")),
  18. # 小红书: https://www.xiaohongshu.com/explore/{id} 或 /discovery/item/{id}
  19. ("xhs", re.compile(r"xiaohongshu\.com/(?:explore|discovery/item)/([a-f0-9]+)")),
  20. # YouTube: https://www.youtube.com/watch?v={id} 或 https://youtu.be/{id}
  21. ("youtube", re.compile(r"(?:youtube\.com/watch\?v=|youtu\.be/)([\w-]+)")),
  22. # X/Twitter: https://x.com/{user}/status/{id} 或 twitter.com
  23. ("x", re.compile(r"(?:x\.com|twitter\.com)/[^/]+/status/(\d+)")),
  24. # 知乎: https://zhuanlan.zhihu.com/p/{id} 或 zhihu.com/question/{qid}/answer/{aid}
  25. ("zhihu", re.compile(r"zhuanlan\.zhihu\.com/p/(\d+)")),
  26. ("zhihu", re.compile(r"zhihu\.com/question/\d+/answer/(\d+)")),
  27. # 公众号: 通过 __biz 或整个 URL 作为 id(后备)
  28. ("gzh", re.compile(r"mp\.weixin\.qq\.com/s[/?]([^\s\"']+)")),
  29. ]
  30. def parse_url(url: str) -> Optional[Tuple[str, str]]:
  31. """从 URL 解析出 (platform, content_id)。返回 None 表示无法解析。"""
  32. if not url or not isinstance(url, str):
  33. return None
  34. for platform, pat in _URL_PATTERNS:
  35. m = pat.search(url)
  36. if m:
  37. return platform, m.group(1)
  38. return None
  39. # ── 从 case 文件中抽取所有链接 ────────────────────────────────
  40. def extract_urls_from_case(case_data: Any) -> List[str]:
  41. """兼容新旧两种格式,返回 case 文件里出现的所有 URL。"""
  42. urls: List[str] = []
  43. if not isinstance(case_data, dict):
  44. return urls
  45. # 新格式:工序发现[].帖子链接
  46. for item in case_data.get("工序发现", []) or []:
  47. if isinstance(item, dict):
  48. link = item.get("帖子链接") or item.get("source_url")
  49. if link:
  50. urls.append(link)
  51. # 旧格式:cases[].source_url
  52. for item in case_data.get("cases", []) or []:
  53. if isinstance(item, dict):
  54. link = item.get("source_url") or item.get("帖子链接")
  55. if link:
  56. urls.append(link)
  57. return urls
  58. # ── 从 cache 中构建 (platform, content_id) → post 索引 ────────────────────────────────
  59. def build_cache_index(cache_dir: Path, trace_ids: Optional[List[str]] = None) -> Dict[Tuple[str, str], Dict[str, Any]]:
  60. """
  61. 构建 (platform, channel_content_id) -> post 映射。
  62. Args:
  63. cache_dir: cache 目录路径
  64. trace_ids: 可选的 trace_id 列表。如果提供,只加载这些特定的 cache 文件;
  65. 否则扫描所有 cache 文件
  66. Returns:
  67. (platform, content_id) -> post 的映射字典
  68. """
  69. index: Dict[Tuple[str, str], Dict[str, Any]] = {}
  70. if not cache_dir.exists():
  71. return index
  72. # 如果提供了 trace_ids,只加载这些特定文件
  73. if trace_ids:
  74. cache_files = [cache_dir / f"{tid}.json" for tid in trace_ids if tid]
  75. cache_files = [f for f in cache_files if f.exists()]
  76. else:
  77. # 否则扫描所有 cache 文件
  78. cache_files = list(cache_dir.glob("*.json"))
  79. for cache_file in cache_files:
  80. try:
  81. with open(cache_file, "r", encoding="utf-8") as f:
  82. data = json.load(f)
  83. except Exception:
  84. continue
  85. for key, entry in data.items():
  86. if not key.startswith("search:"):
  87. continue
  88. platform = key.split(":", 1)[1]
  89. # 新格式:entry = {"history": [...], "latest_index": n}
  90. # 旧格式:entry = {"keyword": ..., "posts": [...]}
  91. if isinstance(entry, dict) and "history" in entry:
  92. post_lists = [h.get("posts", []) for h in entry.get("history", [])]
  93. elif isinstance(entry, dict) and "posts" in entry:
  94. post_lists = [entry.get("posts", [])]
  95. else:
  96. continue
  97. for posts in post_lists:
  98. for post in posts or []:
  99. if not isinstance(post, dict):
  100. continue
  101. cid = post.get("channel_content_id")
  102. if cid:
  103. # 直接用 (platform, content_id) 作为索引键
  104. index[(platform, str(cid))] = post
  105. return index
  106. # ── 主入口 ────────────────────────────────
  107. def extract_sources_to_json(
  108. raw_cases_dir: Path,
  109. cache_dir: Optional[Path] = None,
  110. output_name: str = "source.json",
  111. trace_ids: Optional[List[str]] = None,
  112. ) -> Dict[str, Any]:
  113. """
  114. 扫描 raw_cases_dir 下的 case_*.json,
  115. 从 cache 中找出原始帖子,输出到 {raw_cases_dir}/{output_name}。
  116. 返回统计信息 dict。
  117. """
  118. raw_cases_dir = Path(raw_cases_dir)
  119. if cache_dir is None:
  120. # 项目根目录:script 文件往上三级
  121. project_root = Path(__file__).resolve().parent.parent.parent.parent
  122. cache_dir = project_root / ".cache" / "content_search"
  123. cache_dir = Path(cache_dir)
  124. # 1. 构建 cache 索引
  125. cache_index = build_cache_index(cache_dir, trace_ids=trace_ids)
  126. # 2. 加载已有的 source.json(如果存在)
  127. output_file = raw_cases_dir / output_name
  128. existing_sources = []
  129. existing_ids = set() # (platform, channel_content_id) 集合用于去重
  130. if output_file.exists():
  131. try:
  132. with open(output_file, "r", encoding="utf-8") as f:
  133. existing_data = json.load(f)
  134. existing_sources = existing_data.get("sources", [])
  135. # 构建已有的 ID 集合
  136. for src in existing_sources:
  137. key = (src.get("platform"), src.get("channel_content_id"))
  138. existing_ids.add(key)
  139. except Exception as e:
  140. print(f"Warning: Failed to load existing source.json: {e}")
  141. # 3. 扫描所有 case 文件
  142. matched: List[Dict[str, Any]] = []
  143. unmatched: List[Dict[str, Any]] = []
  144. seen_keys: set = set(existing_ids) # 从已有的 ID 开始
  145. for case_file in sorted(raw_cases_dir.glob("case_*.json")):
  146. # 跳过自己(如果 source.json 误被命名成 case_*)
  147. if case_file.name == output_name:
  148. continue
  149. try:
  150. with open(case_file, "r", encoding="utf-8") as f:
  151. case_data = json.load(f)
  152. except Exception as e:
  153. unmatched.append({"case_file": case_file.name, "error": str(e)})
  154. continue
  155. urls = extract_urls_from_case(case_data)
  156. for url in urls:
  157. # 解析 URL 得到 platform 和 content_id
  158. parsed = parse_url(url)
  159. if not parsed:
  160. unmatched.append({
  161. "case_file": case_file.name,
  162. "url": url,
  163. "reason": "url_parse_failed",
  164. })
  165. continue
  166. platform, cid = parsed
  167. key = (platform, cid)
  168. if key in seen_keys:
  169. continue
  170. seen_keys.add(key)
  171. # 直接用 (platform, content_id) 在 cache 索引中查找
  172. post = cache_index.get(key)
  173. if post:
  174. matched.append({
  175. "case_file": case_file.name,
  176. "platform": platform,
  177. "channel_content_id": cid,
  178. "source_url": url,
  179. "post": post,
  180. })
  181. else:
  182. unmatched.append({
  183. "case_file": case_file.name,
  184. "platform": platform,
  185. "channel_content_id": cid,
  186. "source_url": url,
  187. "reason": "not_in_cache",
  188. })
  189. # 4. 合并已有数据和新匹配的数据
  190. all_sources = existing_sources + matched
  191. # 5. 写输出
  192. output = {
  193. "total": len(all_sources),
  194. "cache_dir": str(cache_dir),
  195. "sources": all_sources,
  196. }
  197. output_file.parent.mkdir(parents=True, exist_ok=True)
  198. with open(output_file, "w", encoding="utf-8") as f:
  199. json.dump(output, f, ensure_ascii=False, indent=2)
  200. # 返回统计信息(包含 unmatched 用于日志输出)
  201. return {
  202. "total_matched": len(matched),
  203. "total_existing": len(existing_sources),
  204. "total_unmatched": len(unmatched),
  205. "output_file": str(output_file),
  206. }
  207. if __name__ == "__main__":
  208. # CLI:python extract_sources.py <raw_cases_dir> [cache_dir]
  209. import sys
  210. if len(sys.argv) < 2:
  211. print("Usage: python extract_sources.py <raw_cases_dir> [cache_dir]")
  212. sys.exit(1)
  213. raw_cases_dir = Path(sys.argv[1])
  214. cache_dir = Path(sys.argv[2]) if len(sys.argv) > 2 else None
  215. result = extract_sources_to_json(raw_cases_dir, cache_dir=cache_dir)
  216. print(f"[OK] Matched: {result['total_matched']}, Unmatched: {result['total_unmatched']}")
  217. print(f" Output: {raw_cases_dir / 'source.json'}")