| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262 |
- """
- 从 raw_cases/case_*.json 中提取 source_url / 帖子链接,
- 解析 channel_content_id,再从 .cache/content_search 中查找对应的原始帖子数据。
- 主函数:extract_sources_to_json(raw_cases_dir)
- - 扫描该目录下所有 case_{platform}.json
- - 解析每个 "工序发现[].帖子链接"(新格式)或 "cases[].source_url"(旧格式)
- - 从项目根的 .cache/content_search/*.json 中匹配 channel_content_id
- - 把匹配到的完整 post 写入 {raw_cases_dir}/source.json
- """
- import json
- import re
- from pathlib import Path
- from typing import Any, Dict, List, Optional, Tuple
- # ── URL → (platform, content_id) 解析 ────────────────────────────────
- _URL_PATTERNS = [
- # B站: https://www.bilibili.com/video/BV1xxx
- ("bili", re.compile(r"bilibili\.com/video/(BV[\w]+)")),
- # 小红书: https://www.xiaohongshu.com/explore/{id} 或 /discovery/item/{id}
- ("xhs", re.compile(r"xiaohongshu\.com/(?:explore|discovery/item)/([a-f0-9]+)")),
- # YouTube: https://www.youtube.com/watch?v={id} 或 https://youtu.be/{id}
- ("youtube", re.compile(r"(?:youtube\.com/watch\?v=|youtu\.be/)([\w-]+)")),
- # X/Twitter: https://x.com/{user}/status/{id} 或 twitter.com
- ("x", re.compile(r"(?:x\.com|twitter\.com)/[^/]+/status/(\d+)")),
- # 知乎: https://zhuanlan.zhihu.com/p/{id} 或 zhihu.com/question/{qid}/answer/{aid}
- ("zhihu", re.compile(r"zhuanlan\.zhihu\.com/p/(\d+)")),
- ("zhihu", re.compile(r"zhihu\.com/question/\d+/answer/(\d+)")),
- # 公众号: 通过 __biz 或整个 URL 作为 id(后备)
- ("gzh", re.compile(r"mp\.weixin\.qq\.com/s[/?]([^\s\"']+)")),
- ]
- def parse_url(url: str) -> Optional[Tuple[str, str]]:
- """从 URL 解析出 (platform, content_id)。返回 None 表示无法解析。"""
- if not url or not isinstance(url, str):
- return None
- for platform, pat in _URL_PATTERNS:
- m = pat.search(url)
- if m:
- return platform, m.group(1)
- return None
- # ── 从 case 文件中抽取所有链接 ────────────────────────────────
- def extract_urls_from_case(case_data: Any) -> List[str]:
- """兼容新旧两种格式,返回 case 文件里出现的所有 URL。"""
- urls: List[str] = []
- if not isinstance(case_data, dict):
- return urls
- # 新格式:工序发现[].帖子链接
- for item in case_data.get("工序发现", []) or []:
- if isinstance(item, dict):
- link = item.get("帖子链接") or item.get("source_url")
- if link:
- urls.append(link)
- # 旧格式:cases[].source_url
- for item in case_data.get("cases", []) or []:
- if isinstance(item, dict):
- link = item.get("source_url") or item.get("帖子链接")
- if link:
- urls.append(link)
- return urls
- # ── 从 cache 中构建 (platform, content_id) → post 索引 ────────────────────────────────
- def build_cache_index(cache_dir: Path, trace_ids: Optional[List[str]] = None) -> Dict[Tuple[str, str], Dict[str, Any]]:
- """
- 构建 (platform, channel_content_id) -> post 映射。
- Args:
- cache_dir: cache 目录路径
- trace_ids: 可选的 trace_id 列表。如果提供,只加载这些特定的 cache 文件;
- 否则扫描所有 cache 文件
- Returns:
- (platform, content_id) -> post 的映射字典
- """
- index: Dict[Tuple[str, str], Dict[str, Any]] = {}
- if not cache_dir.exists():
- return index
- # 如果提供了 trace_ids,只加载这些特定文件
- if trace_ids:
- cache_files = [cache_dir / f"{tid}.json" for tid in trace_ids if tid]
- cache_files = [f for f in cache_files if f.exists()]
- else:
- # 否则扫描所有 cache 文件
- cache_files = list(cache_dir.glob("*.json"))
- for cache_file in cache_files:
- try:
- with open(cache_file, "r", encoding="utf-8") as f:
- data = json.load(f)
- except Exception:
- continue
- for key, entry in data.items():
- if not key.startswith("search:"):
- continue
- platform = key.split(":", 1)[1]
- # 新格式:entry = {"history": [...], "latest_index": n}
- # 旧格式:entry = {"keyword": ..., "posts": [...]}
- if isinstance(entry, dict) and "history" in entry:
- post_lists = [h.get("posts", []) for h in entry.get("history", [])]
- elif isinstance(entry, dict) and "posts" in entry:
- post_lists = [entry.get("posts", [])]
- else:
- continue
- for posts in post_lists:
- for post in posts or []:
- if not isinstance(post, dict):
- continue
- cid = post.get("channel_content_id")
- if cid:
- # 直接用 (platform, content_id) 作为索引键
- index[(platform, str(cid))] = post
- return index
- # ── 主入口 ────────────────────────────────
- def extract_sources_to_json(
- raw_cases_dir: Path,
- cache_dir: Optional[Path] = None,
- output_name: str = "source.json",
- trace_ids: Optional[List[str]] = None,
- ) -> Dict[str, Any]:
- """
- 扫描 raw_cases_dir 下的 case_*.json,
- 从 cache 中找出原始帖子,输出到 {raw_cases_dir}/{output_name}。
- 返回统计信息 dict。
- """
- raw_cases_dir = Path(raw_cases_dir)
- if cache_dir is None:
- # 项目根目录:script 文件往上三级
- project_root = Path(__file__).resolve().parent.parent.parent.parent
- cache_dir = project_root / ".cache" / "content_search"
- cache_dir = Path(cache_dir)
- # 1. 构建 cache 索引
- cache_index = build_cache_index(cache_dir, trace_ids=trace_ids)
- # 2. 加载已有的 source.json(如果存在)
- output_file = raw_cases_dir / output_name
- existing_sources = []
- existing_ids = set() # (platform, channel_content_id) 集合用于去重
- if output_file.exists():
- try:
- with open(output_file, "r", encoding="utf-8") as f:
- existing_data = json.load(f)
- existing_sources = existing_data.get("sources", [])
- # 构建已有的 ID 集合
- for src in existing_sources:
- key = (src.get("platform"), src.get("channel_content_id"))
- existing_ids.add(key)
- except Exception as e:
- print(f"Warning: Failed to load existing source.json: {e}")
- # 3. 扫描所有 case 文件
- matched: List[Dict[str, Any]] = []
- unmatched: List[Dict[str, Any]] = []
- seen_keys: set = set(existing_ids) # 从已有的 ID 开始
- for case_file in sorted(raw_cases_dir.glob("case_*.json")):
- # 跳过自己(如果 source.json 误被命名成 case_*)
- if case_file.name == output_name:
- continue
- try:
- with open(case_file, "r", encoding="utf-8") as f:
- case_data = json.load(f)
- except Exception as e:
- unmatched.append({"case_file": case_file.name, "error": str(e)})
- continue
- urls = extract_urls_from_case(case_data)
- for url in urls:
- # 解析 URL 得到 platform 和 content_id
- parsed = parse_url(url)
- if not parsed:
- unmatched.append({
- "case_file": case_file.name,
- "url": url,
- "reason": "url_parse_failed",
- })
- continue
- platform, cid = parsed
- key = (platform, cid)
- if key in seen_keys:
- continue
- seen_keys.add(key)
- # 直接用 (platform, content_id) 在 cache 索引中查找
- post = cache_index.get(key)
- if post:
- matched.append({
- "case_file": case_file.name,
- "platform": platform,
- "channel_content_id": cid,
- "source_url": url,
- "post": post,
- })
- else:
- unmatched.append({
- "case_file": case_file.name,
- "platform": platform,
- "channel_content_id": cid,
- "source_url": url,
- "reason": "not_in_cache",
- })
- # 4. 合并已有数据和新匹配的数据
- all_sources = existing_sources + matched
- # 5. 写输出
- output = {
- "total": len(all_sources),
- "cache_dir": str(cache_dir),
- "sources": all_sources,
- }
- output_file.parent.mkdir(parents=True, exist_ok=True)
- with open(output_file, "w", encoding="utf-8") as f:
- json.dump(output, f, ensure_ascii=False, indent=2)
- # 返回统计信息(包含 unmatched 用于日志输出)
- return {
- "total_matched": len(matched),
- "total_existing": len(existing_sources),
- "total_unmatched": len(unmatched),
- "output_file": str(output_file),
- }
- if __name__ == "__main__":
- # CLI:python extract_sources.py <raw_cases_dir> [cache_dir]
- import sys
- if len(sys.argv) < 2:
- print("Usage: python extract_sources.py <raw_cases_dir> [cache_dir]")
- sys.exit(1)
- raw_cases_dir = Path(sys.argv[1])
- cache_dir = Path(sys.argv[2]) if len(sys.argv) > 2 else None
- result = extract_sources_to_json(raw_cases_dir, cache_dir=cache_dir)
- print(f"[OK] Matched: {result['total_matched']}, Unmatched: {result['total_unmatched']}")
- print(f" Output: {raw_cases_dir / 'source.json'}")
|