""" 从 raw_cases/case_*.json 中提取 source_url / 帖子链接, 解析 channel_content_id,再从 .cache/content_search 中查找对应的原始帖子数据。 主函数:extract_sources_to_json(raw_cases_dir) - 扫描该目录下所有 case_{platform}.json - 解析每个 "工序发现[].帖子链接"(新格式)或 "cases[].source_url"(旧格式) - 从项目根的 .cache/content_search/*.json 中匹配 channel_content_id - 把匹配到的完整 post 写入 {raw_cases_dir}/source.json - 同时下载图片到 {raw_cases_dir}/images/{case_id}/ """ import json import re from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import asyncio import aiohttp from urllib.parse import urlparse, parse_qs, urlencode # ── URL → (platform, content_id) 解析 ──────────────────────────────── _URL_PATTERNS = [ # B站: https://www.bilibili.com/video/BV1xxx ("bili", re.compile(r"bilibili\.com/video/(BV[\w]+)")), # 小红书: https://www.xiaohongshu.com/explore/{id} 或 /discovery/item/{id} ("xhs", re.compile(r"xiaohongshu\.com/(?:explore|discovery/item)/([a-f0-9]+)")), # YouTube: https://www.youtube.com/watch?v={id} 或 https://youtu.be/{id} ("youtube", re.compile(r"(?:youtube\.com/watch\?v=|youtu\.be/)([\w-]+)")), # X/Twitter: https://x.com/{user}/status/{id} 或 twitter.com ("x", re.compile(r"(?:x\.com|twitter\.com)/[^/]+/status/(\d+)")), # 知乎: https://zhuanlan.zhihu.com/p/{id} 或 zhihu.com/question/{qid}/answer/{aid} ("zhihu", re.compile(r"zhuanlan\.zhihu\.com/p/(\d+)")), ("zhihu", re.compile(r"zhihu\.com/question/\d+/answer/(\d+)")), # 公众号: 通过 __biz 或整个 URL 作为 id(后备) ("gzh", re.compile(r"mp\.weixin\.qq\.com/s[/?]([^\s\"']+)")), ] def parse_url(url: str) -> Optional[Tuple[str, str]]: """从 URL 解析出 (platform, content_id)。返回 None 表示无法解析。""" if not url or not isinstance(url, str): return None for platform, pat in _URL_PATTERNS: m = pat.search(url) if m: return platform, m.group(1) return None # ── 从 case 文件中抽取所有条目(URL + evaluation) ──────────────────────────────── def extract_entries_from_case(case_data: Any) -> List[Dict[str, Any]]: """ 从 case 数据中抽取条目,每条包含 url 和 agent 的 evaluation。 返回: [{"url": str, "evaluation": dict | None, "title": str | None, "case_id": str | None}] """ entries: List[Dict[str, Any]] = [] if not isinstance(case_data, dict): return entries # 新格式:工序发现[].帖子链接 for item in case_data.get("工序发现", []) or []: if isinstance(item, dict): link = item.get("帖子链接") or item.get("source_url") if link: entries.append({ "url": link, "evaluation": item.get("evaluation"), "title": item.get("title"), "case_id": item.get("case_id"), }) # 主格式:cases[] for item in case_data.get("cases", []) or []: if isinstance(item, dict): link = item.get("source_url") or item.get("帖子链接") if link: entries.append({ "url": link, "evaluation": item.get("evaluation"), "title": item.get("title"), "case_id": item.get("case_id"), }) return entries def extract_urls_from_case(case_data: Any) -> List[str]: """[Legacy] 旧接口,保留供可能的外部调用。内部改用 extract_entries_from_case。""" return [e["url"] for e in extract_entries_from_case(case_data)] # ── 过滤规则(统一入口) ──────────────────────────────── # 默认阈值:body_text 最少字符数、agent 评分下限 DEFAULT_MIN_BODY_LEN = 30 DEFAULT_MIN_SCORE = 70.0 DEFAULT_CUTOFF_DATE = (2025, 10, 1) def _is_before_cutoff(source: Dict[str, Any], cutoff_ts: int) -> bool: """判断帖子是否早于截止时间戳(秒级) 如果 timestamp 为 0 或不存在,返回 False(保留) """ post = source.get("post", {}) if not isinstance(post, dict): return False ts = post.get("publish_timestamp") # 没有 timestamp 或为 0,保留 if not ts or ts == 0: return False try: ts = int(ts) # 毫秒级转秒级 if ts > 1000000000000: ts = ts / 1000 return ts < cutoff_ts except (ValueError, TypeError): pass # 尝试解析字符串格式 "2025-05-02 19:25:30" try: from datetime import datetime dt = datetime.strptime(str(ts), "%Y-%m-%d %H:%M:%S") return dt.timestamp() < cutoff_ts except Exception: # 解析失败,保留 return False def _check_filters( source: Dict[str, Any], cutoff_ts: int, min_body_len: int, min_score: float, ) -> Optional[str]: """ 对一条 source 逐条检查过滤规则。 返回: None —— 条目合格 非 None 字符串 —— 不合格的原因(写入 filter_reason) """ post = source.get("post", {}) or {} # 1. body_text 完整性 body = post.get("body_text") or post.get("desc") or "" if not isinstance(body, str) or len(body.strip()) < min_body_len: return f"missing_body_text:len={len(body.strip()) if isinstance(body, str) else 0}" # 2. agent 评分 evaluation = source.get("evaluation") if not isinstance(evaluation, dict): return "missing_evaluation" quality = evaluation.get("quality") if not isinstance(quality, dict): return "missing_evaluation" score = quality.get("overall_score") if not isinstance(score, (int, float)): return "invalid_score" if score < min_score: return f"low_score:{score}" # 3. 过时(复用原 _is_before_cutoff 对时间戳的解析) if _is_before_cutoff(source, cutoff_ts): return "outdated" return None # ── 从 cache 中构建 (platform, content_id) → post 索引 ──────────────────────────────── def _normalize_url(url: str) -> Optional[str]: """规范化 URL:排序 query 参数,去掉尾斜杠""" try: parsed = urlparse(url) if parsed.query: # 排序 query 参数 params = parse_qs(parsed.query, keep_blank_values=True) sorted_query = urlencode(sorted(params.items()), doseq=True) normalized = f"{parsed.scheme}://{parsed.netloc}{parsed.path}?{sorted_query}" else: normalized = f"{parsed.scheme}://{parsed.netloc}{parsed.path}" return normalized.rstrip('/') except: return None def build_cache_index(cache_dir: Path, trace_ids: Optional[List[str]] = None) -> Dict[Tuple[str, str], Dict[str, Any]]: """ 构建 (platform, channel_content_id) -> post 映射。 Args: cache_dir: cache 目录路径 trace_ids: 可选的 trace_id 列表。如果提供,只加载这些特定的 cache 文件; 否则扫描所有 cache 文件 Returns: (platform, content_id) -> post 的映射字典 """ index: Dict[Tuple[str, str], Dict[str, Any]] = {} if not cache_dir.exists(): return index # 如果提供了 trace_ids,只加载这些特定文件 if trace_ids: cache_files = [cache_dir / f"{tid}.json" for tid in trace_ids if tid] cache_files = [f for f in cache_files if f.exists()] else: # 否则扫描所有 cache 文件 cache_files = list(cache_dir.glob("*.json")) for cache_file in cache_files: try: with open(cache_file, "r", encoding="utf-8") as f: data = json.load(f) except Exception: continue for key, entry in data.items(): if not key.startswith("search:"): continue platform = key.split(":", 1)[1] # 新格式:entry = {"history": [...], "latest_index": n} # 旧格式:entry = {"keyword": ..., "posts": [...]} if isinstance(entry, dict) and "history" in entry: post_lists = [h.get("posts", []) for h in entry.get("history", [])] elif isinstance(entry, dict) and "posts" in entry: post_lists = [entry.get("posts", [])] else: continue for posts in post_lists: for post in posts or []: if not isinstance(post, dict): continue cid = post.get("channel_content_id") # YouTube 平台用 video_id 而非 channel_content_id if not cid and post.get("video_id"): cid = post.get("video_id") post["channel_content_id"] = cid # 补全字段 if cid: # 用 (platform, content_id) 作为索引键 index[(platform, str(cid))] = post # 额外用 link / url 字段建立索引(用于 GZH 等平台) link = post.get("link") or post.get("url") or "" if link: index[("url", link)] = post # 规范化 URL 索引(排序 query 参数,去掉尾斜杠) norm = _normalize_url(link) if norm: index[("norm_url", norm)] = post return index # ── 主入口 ──────────────────────────────── def extract_sources_to_json( raw_cases_dir: Path, cache_dir: Optional[Path] = None, output_name: str = "source.json", trace_ids: Optional[List[str]] = None, min_body_len: int = DEFAULT_MIN_BODY_LEN, min_score: float = DEFAULT_MIN_SCORE, cutoff_date: Tuple[int, int, int] = DEFAULT_CUTOFF_DATE, ) -> Dict[str, Any]: """ 扫描 raw_cases_dir 下的 case_*.json, 从 cache 中找出原始帖子,并对结果执行统一过滤(body_text / 评分 / 时效)。 过滤规则(均可通过参数调整): - body_text 为空或少于 min_body_len 字符 → filter_reason=missing_body_text - agent evaluation 缺失或 weighted_score < min_score → filter_reason=missing_evaluation / invalid_score / low_score - publish_timestamp 早于 cutoff_date → filter_reason=outdated 参数: min_body_len: body_text 最少字符数,默认 30 min_score: agent 评分下限,默认 70 cutoff_date: 过时截止日期 (year, month, day),默认 (2025, 10, 1) 返回统计信息 dict。 """ raw_cases_dir = Path(raw_cases_dir) if cache_dir is None: # 项目根目录:script 文件往上三级 project_root = Path(__file__).resolve().parent.parent.parent.parent cache_dir = project_root / ".cache" / "content_search" cache_dir = Path(cache_dir) # 1. 构建 cache 索引 cache_index = build_cache_index(cache_dir, trace_ids=trace_ids) # 2. 加载已有的 source.json(如果存在) output_file = raw_cases_dir / output_name existing_sources = [] existing_ids = set() # (platform, channel_content_id) 集合用于去重 if output_file.exists(): try: with open(output_file, "r", encoding="utf-8") as f: existing_data = json.load(f) existing_sources = existing_data.get("sources", []) # 构建已有的 ID 集合 for src in existing_sources: key = (src.get("platform"), src.get("channel_content_id")) existing_ids.add(key) except Exception as e: print(f"Warning: Failed to load existing source.json: {e}") # 2.1 加载已有的 filtered_cases.json(如果存在) filtered_output_file = raw_cases_dir / "filtered_cases.json" existing_filtered_sources = [] existing_filtered_ids = set() if filtered_output_file.exists(): try: with open(filtered_output_file, "r", encoding="utf-8") as f: filtered_data = json.load(f) existing_filtered_sources = filtered_data.get("sources", []) for src in existing_filtered_sources: key = (src.get("platform"), src.get("channel_content_id")) existing_filtered_ids.add(key) except Exception as e: print(f"Warning: Failed to load existing filtered_cases.json: {e}") # 3. 扫描所有 case 文件 matched: List[Dict[str, Any]] = [] unmatched: List[Dict[str, Any]] = [] seen_keys: set = set(existing_ids) # 从已有的 ID 开始 for case_file in sorted(raw_cases_dir.glob("case_*.json")): # 跳过自己(如果 source.json 误被命名成 case_*) if case_file.name == output_name: continue try: with open(case_file, "r", encoding="utf-8") as f: case_data = json.load(f) except Exception as e: # 尝试自动修复 JSON 格式错误 try: from examples.process_pipeline.script.fix_json_quotes import try_fix_and_parse with open(case_file, "r", encoding="utf-8") as f: raw_content = f.read() success, case_data, fix_desc = try_fix_and_parse(raw_content) if success: # 修复成功,写回文件 with open(case_file, "w", encoding="utf-8") as f: json.dump(case_data, f, ensure_ascii=False, indent=2) print(f" 🔧 [Auto-Fix] Fixed {case_file.name}: {fix_desc}") else: unmatched.append({"case_file": case_file.name, "error": str(e)}) continue except Exception: unmatched.append({"case_file": case_file.name, "error": str(e)}) continue entries = extract_entries_from_case(case_data) for entry in entries: url = entry["url"] evaluation = entry.get("evaluation") # 解析 URL 得到 platform 和 content_id parsed = parse_url(url) if not parsed: unmatched.append({ "case_file": case_file.name, "url": url, "reason": "url_parse_failed", }) continue platform, cid = parsed key = (platform, cid) if key in seen_keys: continue seen_keys.add(key) # 多级匹配: # 1. (platform, content_id) 精确匹配 # 2. 完整 URL 匹配 # 3. 规范化 URL 匹配 post = cache_index.get(key) if not post: # 2. 完整 URL post = cache_index.get(("url", url)) if not post: # 3. 规范化 URL norm = _normalize_url(url) if norm: post = cache_index.get(("norm_url", norm)) if post: # 统一用 cache 中的 channel_content_id 生成 case_id # 这样保证 case_id 和 cache 中的 ID 一致 actual_cid = post.get("channel_content_id") or post.get("video_id") or cid actual_case_id = f"{platform}_{actual_cid}" matched.append({ "case_id": actual_case_id, "case_file": case_file.name, "platform": platform, "channel_content_id": str(actual_cid), "source_url": url, "evaluation": evaluation, "post": post, }) else: unmatched.append({ "case_id": f"{platform}_{cid}", # 统一格式的 ID "case_file": case_file.name, "platform": platform, "channel_content_id": cid, "source_url": url, "reason": "not_in_cache", }) # 4. 合并已有数据和新匹配的数据 all_sources = existing_sources + matched # 5. 统一过滤:body_text 完整性 / agent 评分 / 时效 from datetime import datetime as _dt cutoff_ts = int(_dt(*cutoff_date).timestamp()) kept_sources: List[Dict[str, Any]] = [] filtered_sources: List[Dict[str, Any]] = [] reason_counts: Dict[str, int] = {} for s in all_sources: reason = _check_filters(s, cutoff_ts, min_body_len, min_score) if reason is None: kept_sources.append(s) else: # 记录过滤原因 s_copy = dict(s) s_copy["filter_reason"] = reason filtered_sources.append(s_copy) # 统计原因类型(只取冒号前的类别) category = reason.split(":", 1)[0] reason_counts[category] = reason_counts.get(category, 0) + 1 all_sources = kept_sources filtered_count = len(filtered_sources) # 6. 转换 timestamp 为可读格式 _convert_timestamps(all_sources) _convert_timestamps(filtered_sources) # 7. 写 source.json output = { "total": len(all_sources), "cache_dir": str(cache_dir), "sources": all_sources, } output_file.parent.mkdir(parents=True, exist_ok=True) with open(output_file, "w", encoding="utf-8") as f: json.dump(output, f, ensure_ascii=False, indent=2) # 8. 写 filtered_cases.json(被过滤掉的帖子,按原因分组) if filtered_sources: for fs in filtered_sources: key = (fs.get("platform"), fs.get("channel_content_id")) if key not in existing_filtered_ids: existing_filtered_sources.append(fs) existing_filtered_ids.add(key) # 按原因类别分组 by_reason: Dict[str, List[Dict[str, Any]]] = {} for fs in existing_filtered_sources: reason = fs.get("filter_reason", "unknown") category = reason.split(":", 1)[0] by_reason.setdefault(category, []).append(fs) filtered_output = { "total": len(existing_filtered_sources), "by_reason": { category: { "count": len(items), "sources": items, } for category, items in by_reason.items() }, } with open(filtered_output_file, "w", encoding="utf-8") as f: json.dump(filtered_output, f, ensure_ascii=False, indent=2) # 9. 下载图片到 raw_cases/images/{case_id}/ images_downloaded = download_images_for_sources(matched, raw_cases_dir) # 10. 构建被过滤条目的摘要(供续跑 feedback 使用) filtered_details: List[Dict[str, Any]] = [] for fs in filtered_sources: post = fs.get("post", {}) or {} title = post.get("title") or fs.get("source_url", "") filtered_details.append({ "case_id": fs.get("case_id", ""), "platform": fs.get("platform", ""), "title": title[:60] if title else "", "filter_reason": fs.get("filter_reason", ""), }) # 返回统计信息 return { "total_matched": len(matched), "total_existing": len(existing_sources), "total_unmatched": len(unmatched), "filtered_total": filtered_count, "filtered_reasons": reason_counts, "filtered_details": filtered_details, "images_downloaded": images_downloaded, "output_file": str(output_file), } # ── 图片下载 ──────────────────────────────── def _get_image_urls_from_post(post: Dict[str, Any]) -> List[str]: """从 post 中提取所有图片 URL""" urls = [] images = post.get("images", []) if isinstance(images, list): for img in images: if isinstance(img, str) and img.startswith("http"): urls.append(img) elif isinstance(img, dict) and "url" in img: urls.append(img["url"]) image_url_list = post.get("image_url_list", []) if isinstance(image_url_list, list): for img_obj in image_url_list: if isinstance(img_obj, dict) and "image_url" in img_obj: urls.append(img_obj["image_url"]) return urls def _format_timestamp(ts: Any) -> Optional[str]: """将时间戳(秒/毫秒)转换为可读格式""" from datetime import datetime if ts is None or ts == 0 or ts == "": return None try: ts = int(ts) # 毫秒级时间戳 if ts > 1000000000000: ts = ts / 1000 dt = datetime.fromtimestamp(ts) return dt.strftime("%Y-%m-%d %H:%M:%S") except Exception: return None def _convert_timestamps(sources: List[Dict[str, Any]]) -> None: """将 source 列表中 post 的时间戳字段替换为可读格式""" timestamp_fields = ["publish_timestamp", "modify_timestamp", "update_timestamp"] for src in sources: post = src.get("post", {}) if not isinstance(post, dict): continue for field in timestamp_fields: if field in post: readable = _format_timestamp(post.get(field)) if readable: post[field] = readable def download_images_for_sources(sources: List[Dict[str, Any]], raw_cases_dir: Path) -> int: """ 为新匹配的 sources 下载图片到 raw_cases/images/{case_id}/ Returns: 下载成功的图片总数 """ import urllib.request import urllib.error images_base = raw_cases_dir / "images" total_downloaded = 0 # 设置 headers 避免被拒(X/Twitter 需要 User-Agent) opener = urllib.request.build_opener() opener.addheaders = [("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36")] urllib.request.install_opener(opener) for src in sources: case_id = src.get("case_id", "") post = src.get("post", {}) image_urls = _get_image_urls_from_post(post) if not image_urls: continue case_dir = images_base / case_id case_dir.mkdir(parents=True, exist_ok=True) for idx, url in enumerate(image_urls): # 确定文件扩展名 ext = ".jpg" if ".png" in url.lower(): ext = ".png" elif ".webp" in url.lower(): ext = ".webp" save_path = case_dir / f"{idx:02d}{ext}" if save_path.exists(): total_downloaded += 1 continue try: urllib.request.urlretrieve(url, str(save_path)) total_downloaded += 1 except Exception: # 下载失败就跳过,不中断流程 pass return total_downloaded if __name__ == "__main__": # CLI:python extract_sources.py [cache_dir] import sys if len(sys.argv) < 2: print("Usage: python extract_sources.py [cache_dir]") sys.exit(1) raw_cases_dir = Path(sys.argv[1]) cache_dir = Path(sys.argv[2]) if len(sys.argv) > 2 else None result = extract_sources_to_json(raw_cases_dir, cache_dir=cache_dir) print(f"[OK] Matched: {result['total_matched']}, Unmatched: {result['total_unmatched']}") print(f" Output: {raw_cases_dir / 'source.json'}")