""" 从 raw_cases/case_*.json 中提取 source_url / 帖子链接, 解析 channel_content_id,再从 .cache/content_search 中查找对应的原始帖子数据。 主函数:extract_sources_to_json(raw_cases_dir) - 扫描该目录下所有 case_{platform}.json - 解析每个 "工序发现[].帖子链接"(新格式)或 "cases[].source_url"(旧格式) - 从项目根的 .cache/content_search/*.json 中匹配 channel_content_id - 把匹配到的完整 post 写入 {raw_cases_dir}/source.json - 同时下载图片到 {raw_cases_dir}/images/{case_id}/ """ import json import re from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import asyncio import aiohttp from urllib.parse import urlparse, parse_qs, urlencode # ── URL → (platform, content_id) 解析 ──────────────────────────────── _URL_PATTERNS = [ # B站: https://www.bilibili.com/video/BV1xxx ("bili", re.compile(r"bilibili\.com/video/(BV[\w]+)")), # 小红书: https://www.xiaohongshu.com/explore/{id} 或 /discovery/item/{id} ("xhs", re.compile(r"xiaohongshu\.com/(?:explore|discovery/item)/([a-f0-9]+)")), # YouTube: https://www.youtube.com/watch?v={id} 或 https://youtu.be/{id} ("youtube", re.compile(r"(?:youtube\.com/watch\?v=|youtu\.be/)([\w-]+)")), # X/Twitter: https://x.com/{user}/status/{id} 或 twitter.com ("x", re.compile(r"(?:x\.com|twitter\.com)/[^/]+/status/(\d+)")), # 知乎: https://zhuanlan.zhihu.com/p/{id} 或 zhihu.com/question/{qid}/answer/{aid} ("zhihu", re.compile(r"zhuanlan\.zhihu\.com/p/(\d+)")), ("zhihu", re.compile(r"zhihu\.com/question/\d+/answer/(\d+)")), # 公众号: 通过 __biz 或整个 URL 作为 id(后备) ("gzh", re.compile(r"mp\.weixin\.qq\.com/s[/?]([^\s\"']+)")), ] def parse_url(url: str) -> Optional[Tuple[str, str]]: """从 URL 解析出 (platform, content_id)。返回 None 表示无法解析。""" if not url or not isinstance(url, str): return None for platform, pat in _URL_PATTERNS: m = pat.search(url) if m: return platform, m.group(1) return None # ── 从 case 文件中抽取所有链接 ──────────────────────────────── def extract_urls_from_case(case_data: Any) -> List[str]: """兼容新旧两种格式,返回 case 文件里出现的所有 URL。""" urls: List[str] = [] if not isinstance(case_data, dict): return urls # 新格式:工序发现[].帖子链接 for item in case_data.get("工序发现", []) or []: if isinstance(item, dict): link = item.get("帖子链接") or item.get("source_url") if link: urls.append(link) # 旧格式:cases[].source_url for item in case_data.get("cases", []) or []: if isinstance(item, dict): link = item.get("source_url") or item.get("帖子链接") if link: urls.append(link) return urls # ── 从 cache 中构建 (platform, content_id) → post 索引 ──────────────────────────────── def _normalize_url(url: str) -> Optional[str]: """规范化 URL:排序 query 参数,去掉尾斜杠""" try: parsed = urlparse(url) if parsed.query: # 排序 query 参数 params = parse_qs(parsed.query, keep_blank_values=True) sorted_query = urlencode(sorted(params.items()), doseq=True) normalized = f"{parsed.scheme}://{parsed.netloc}{parsed.path}?{sorted_query}" else: normalized = f"{parsed.scheme}://{parsed.netloc}{parsed.path}" return normalized.rstrip('/') except: return None def build_cache_index(cache_dir: Path, trace_ids: Optional[List[str]] = None) -> Dict[Tuple[str, str], Dict[str, Any]]: """ 构建 (platform, channel_content_id) -> post 映射。 Args: cache_dir: cache 目录路径 trace_ids: 可选的 trace_id 列表。如果提供,只加载这些特定的 cache 文件; 否则扫描所有 cache 文件 Returns: (platform, content_id) -> post 的映射字典 """ index: Dict[Tuple[str, str], Dict[str, Any]] = {} if not cache_dir.exists(): return index # 如果提供了 trace_ids,只加载这些特定文件 if trace_ids: cache_files = [cache_dir / f"{tid}.json" for tid in trace_ids if tid] cache_files = [f for f in cache_files if f.exists()] else: # 否则扫描所有 cache 文件 cache_files = list(cache_dir.glob("*.json")) for cache_file in cache_files: try: with open(cache_file, "r", encoding="utf-8") as f: data = json.load(f) except Exception: continue for key, entry in data.items(): if not key.startswith("search:"): continue platform = key.split(":", 1)[1] # 新格式:entry = {"history": [...], "latest_index": n} # 旧格式:entry = {"keyword": ..., "posts": [...]} if isinstance(entry, dict) and "history" in entry: post_lists = [h.get("posts", []) for h in entry.get("history", [])] elif isinstance(entry, dict) and "posts" in entry: post_lists = [entry.get("posts", [])] else: continue for posts in post_lists: for post in posts or []: if not isinstance(post, dict): continue cid = post.get("channel_content_id") # YouTube 平台用 video_id 而非 channel_content_id if not cid and post.get("video_id"): cid = post.get("video_id") post["channel_content_id"] = cid # 补全字段 if cid: # 用 (platform, content_id) 作为索引键 index[(platform, str(cid))] = post # 额外用 link / url 字段建立索引(用于 GZH 等平台) link = post.get("link") or post.get("url") or "" if link: index[("url", link)] = post # 规范化 URL 索引(排序 query 参数,去掉尾斜杠) norm = _normalize_url(link) if norm: index[("norm_url", norm)] = post return index # ── 主入口 ──────────────────────────────── def extract_sources_to_json( raw_cases_dir: Path, cache_dir: Optional[Path] = None, output_name: str = "source.json", trace_ids: Optional[List[str]] = None, ) -> Dict[str, Any]: """ 扫描 raw_cases_dir 下的 case_*.json, 从 cache 中找出原始帖子,输出到 {raw_cases_dir}/{output_name}。 返回统计信息 dict。 """ raw_cases_dir = Path(raw_cases_dir) if cache_dir is None: # 项目根目录:script 文件往上三级 project_root = Path(__file__).resolve().parent.parent.parent.parent cache_dir = project_root / ".cache" / "content_search" cache_dir = Path(cache_dir) # 1. 构建 cache 索引 cache_index = build_cache_index(cache_dir, trace_ids=trace_ids) # 2. 加载已有的 source.json(如果存在) output_file = raw_cases_dir / output_name existing_sources = [] existing_ids = set() # (platform, channel_content_id) 集合用于去重 if output_file.exists(): try: with open(output_file, "r", encoding="utf-8") as f: existing_data = json.load(f) existing_sources = existing_data.get("sources", []) # 构建已有的 ID 集合 for src in existing_sources: key = (src.get("platform"), src.get("channel_content_id")) existing_ids.add(key) except Exception as e: print(f"Warning: Failed to load existing source.json: {e}") # 2.1 加载已有的 filtered_cases.json(如果存在) filtered_output_file = raw_cases_dir / "filtered_cases.json" existing_filtered_sources = [] existing_filtered_ids = set() if filtered_output_file.exists(): try: with open(filtered_output_file, "r", encoding="utf-8") as f: filtered_data = json.load(f) existing_filtered_sources = filtered_data.get("sources", []) for src in existing_filtered_sources: key = (src.get("platform"), src.get("channel_content_id")) existing_filtered_ids.add(key) except Exception as e: print(f"Warning: Failed to load existing filtered_cases.json: {e}") # 3. 扫描所有 case 文件 matched: List[Dict[str, Any]] = [] unmatched: List[Dict[str, Any]] = [] seen_keys: set = set(existing_ids) # 从已有的 ID 开始 for case_file in sorted(raw_cases_dir.glob("case_*.json")): # 跳过自己(如果 source.json 误被命名成 case_*) if case_file.name == output_name: continue try: with open(case_file, "r", encoding="utf-8") as f: case_data = json.load(f) except Exception as e: # 尝试自动修复 JSON 格式错误 try: from examples.process_pipeline.script.fix_json_quotes import try_fix_and_parse with open(case_file, "r", encoding="utf-8") as f: raw_content = f.read() success, case_data, fix_desc = try_fix_and_parse(raw_content) if success: # 修复成功,写回文件 with open(case_file, "w", encoding="utf-8") as f: json.dump(case_data, f, ensure_ascii=False, indent=2) print(f" 🔧 [Auto-Fix] Fixed {case_file.name}: {fix_desc}") else: unmatched.append({"case_file": case_file.name, "error": str(e)}) continue except Exception: unmatched.append({"case_file": case_file.name, "error": str(e)}) continue urls = extract_urls_from_case(case_data) for url in urls: # 解析 URL 得到 platform 和 content_id parsed = parse_url(url) if not parsed: unmatched.append({ "case_file": case_file.name, "url": url, "reason": "url_parse_failed", }) continue platform, cid = parsed key = (platform, cid) if key in seen_keys: continue seen_keys.add(key) # 多级匹配: # 1. (platform, content_id) 精确匹配 # 2. 完整 URL 匹配 # 3. 规范化 URL 匹配 post = cache_index.get(key) if not post: # 2. 完整 URL post = cache_index.get(("url", url)) if not post: # 3. 规范化 URL norm = _normalize_url(url) if norm: post = cache_index.get(("norm_url", norm)) if post: # 统一用 cache 中的 channel_content_id 生成 case_id # 这样保证 case_id 和 cache 中的 ID 一致 actual_cid = post.get("channel_content_id") or post.get("video_id") or cid actual_case_id = f"{platform}_{actual_cid}" matched.append({ "case_id": actual_case_id, "case_file": case_file.name, "platform": platform, "channel_content_id": str(actual_cid), "source_url": url, "post": post, }) else: unmatched.append({ "case_id": f"{platform}_{cid}", # 统一格式的 ID "case_file": case_file.name, "platform": platform, "channel_content_id": cid, "source_url": url, "reason": "not_in_cache", }) # 4. 合并已有数据和新匹配的数据 all_sources = existing_sources + matched # 5. 过滤掉 2025-10 之前的过时帖子 from datetime import datetime as _dt cutoff_ts = int(_dt(2025, 10, 1).timestamp()) # 本地时区的 2025-10-01 before_filter = len(all_sources) filtered_sources = [s for s in all_sources if _is_before_cutoff(s, cutoff_ts)] all_sources = [s for s in all_sources if not _is_before_cutoff(s, cutoff_ts)] filtered_count = before_filter - len(all_sources) # 6. 转换 timestamp 为可读格式 _convert_timestamps(all_sources) _convert_timestamps(filtered_sources) # 7. 写 source.json output = { "total": len(all_sources), "cache_dir": str(cache_dir), "sources": all_sources, } output_file.parent.mkdir(parents=True, exist_ok=True) with open(output_file, "w", encoding="utf-8") as f: json.dump(output, f, ensure_ascii=False, indent=2) # 8. 写 filtered_cases.json(被过滤掉的帖子,去重后追加) if filtered_sources: for fs in filtered_sources: key = (fs.get("platform"), fs.get("channel_content_id")) if key not in existing_filtered_ids: existing_filtered_sources.append(fs) existing_filtered_ids.add(key) filtered_output = { "total": len(existing_filtered_sources), "reason": "publish_timestamp before 2025-10-01", "sources": existing_filtered_sources, } with open(filtered_output_file, "w", encoding="utf-8") as f: json.dump(filtered_output, f, ensure_ascii=False, indent=2) # 9. 下载图片到 raw_cases/images/{case_id}/ images_downloaded = download_images_for_sources(matched, raw_cases_dir) # 返回统计信息(包含 unmatched 用于日志输出) return { "total_matched": len(matched), "total_existing": len(existing_sources), "total_unmatched": len(unmatched), "filtered_outdated": filtered_count, "images_downloaded": images_downloaded, "output_file": str(output_file), } # ── 图片下载 ──────────────────────────────── def _get_image_urls_from_post(post: Dict[str, Any]) -> List[str]: """从 post 中提取所有图片 URL""" urls = [] images = post.get("images", []) if isinstance(images, list): for img in images: if isinstance(img, str) and img.startswith("http"): urls.append(img) elif isinstance(img, dict) and "url" in img: urls.append(img["url"]) image_url_list = post.get("image_url_list", []) if isinstance(image_url_list, list): for img_obj in image_url_list: if isinstance(img_obj, dict) and "image_url" in img_obj: urls.append(img_obj["image_url"]) return urls def _is_before_cutoff(source: Dict[str, Any], cutoff_ts: int) -> bool: """判断帖子是否早于截止时间戳(秒级) 如果 timestamp 为 0 或不存在,返回 False(保留) """ post = source.get("post", {}) if not isinstance(post, dict): return False ts = post.get("publish_timestamp") # 没有 timestamp 或为 0,保留 if not ts or ts == 0: return False try: ts = int(ts) # 毫秒级转秒级 if ts > 1000000000000: ts = ts / 1000 return ts < cutoff_ts except (ValueError, TypeError): pass # 尝试解析字符串格式 "2025-05-02 19:25:30" try: from datetime import datetime dt = datetime.strptime(str(ts), "%Y-%m-%d %H:%M:%S") return dt.timestamp() < cutoff_ts except Exception: # 解析失败,保留 return False def _format_timestamp(ts: Any) -> Optional[str]: """将时间戳(秒/毫秒)转换为可读格式""" from datetime import datetime if ts is None or ts == 0 or ts == "": return None try: ts = int(ts) # 毫秒级时间戳 if ts > 1000000000000: ts = ts / 1000 dt = datetime.fromtimestamp(ts) return dt.strftime("%Y-%m-%d %H:%M:%S") except Exception: return None def _convert_timestamps(sources: List[Dict[str, Any]]) -> None: """将 source 列表中 post 的时间戳字段替换为可读格式""" timestamp_fields = ["publish_timestamp", "modify_timestamp", "update_timestamp"] for src in sources: post = src.get("post", {}) if not isinstance(post, dict): continue for field in timestamp_fields: if field in post: readable = _format_timestamp(post.get(field)) if readable: post[field] = readable def download_images_for_sources(sources: List[Dict[str, Any]], raw_cases_dir: Path) -> int: """ 为新匹配的 sources 下载图片到 raw_cases/images/{case_id}/ Returns: 下载成功的图片总数 """ import urllib.request import urllib.error images_base = raw_cases_dir / "images" total_downloaded = 0 # 设置 headers 避免被拒(X/Twitter 需要 User-Agent) opener = urllib.request.build_opener() opener.addheaders = [("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36")] urllib.request.install_opener(opener) for src in sources: case_id = src.get("case_id", "") post = src.get("post", {}) image_urls = _get_image_urls_from_post(post) if not image_urls: continue case_dir = images_base / case_id case_dir.mkdir(parents=True, exist_ok=True) for idx, url in enumerate(image_urls): # 确定文件扩展名 ext = ".jpg" if ".png" in url.lower(): ext = ".png" elif ".webp" in url.lower(): ext = ".webp" save_path = case_dir / f"{idx:02d}{ext}" if save_path.exists(): total_downloaded += 1 continue try: urllib.request.urlretrieve(url, str(save_path)) total_downloaded += 1 except Exception: # 下载失败就跳过,不中断流程 pass return total_downloaded if __name__ == "__main__": # CLI:python extract_sources.py [cache_dir] import sys if len(sys.argv) < 2: print("Usage: python extract_sources.py [cache_dir]") sys.exit(1) raw_cases_dir = Path(sys.argv[1]) cache_dir = Path(sys.argv[2]) if len(sys.argv) > 2 else None result = extract_sources_to_json(raw_cases_dir, cache_dir=cache_dir) print(f"[OK] Matched: {result['total_matched']}, Unmatched: {result['total_unmatched']}") print(f" Output: {raw_cases_dir / 'source.json'}")