""" 从 raw_cases/case_*.json 中提取 source_url / 帖子链接, 解析 channel_content_id,再从 .cache/content_search 中查找对应的原始帖子数据。 主函数:extract_sources_to_json(raw_cases_dir) - 扫描该目录下所有 case_{platform}.json - 解析每个 "工序发现[].帖子链接"(新格式)或 "cases[].source_url"(旧格式) - 从项目根的 .cache/content_search/*.json 中匹配 channel_content_id - 把匹配到的完整 post 写入 {raw_cases_dir}/source.json - 同时下载图片到 {raw_cases_dir}/images/{case_id}/ """ import json import logging import re from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import asyncio import aiohttp from urllib.parse import urlparse, parse_qs, urlencode logger = logging.getLogger(__name__) # ── URL → (platform, content_id) 解析 ──────────────────────────────── _URL_PATTERNS = [ # B站: https://www.bilibili.com/video/BV1xxx ("bili", re.compile(r"bilibili\.com/video/(BV[\w]+)")), # 小红书: https://www.xiaohongshu.com/explore/{id} 或 /discovery/item/{id} ("xhs", re.compile(r"xiaohongshu\.com/(?:explore|discovery/item)/([a-f0-9]+)")), # YouTube: https://www.youtube.com/watch?v={id} 或 https://youtu.be/{id} ("youtube", re.compile(r"(?:youtube\.com/watch\?v=|youtu\.be/)([\w-]+)")), # X/Twitter: https://x.com/{user}/status/{id} 或 twitter.com ("x", re.compile(r"(?:x\.com|twitter\.com)/[^/]+/status/(\d+)")), # 知乎: https://zhuanlan.zhihu.com/p/{id} 或 zhihu.com/question/{qid}/answer/{aid} ("zhihu", re.compile(r"zhuanlan\.zhihu\.com/p/(\d+)")), ("zhihu", re.compile(r"zhihu\.com/question/\d+/answer/(\d+)")), # 公众号: 通过 __biz 或整个 URL 作为 id(后备) ("gzh", re.compile(r"mp\.weixin\.qq\.com/s[/?]([^\s\"']+)")), # 抖音: https://www.douyin.com/video/{numeric_id} — content_id 直接匹配 # douyin post.channel_content_id ("douyin", re.compile(r"douyin\.com/video/(\d+)")), # 视频号: post.link 是 'export/{base64-like token}' 不是 http URL; # 这里识别出来打上 sph 标签,真实定位走 ("url", link) 索引兜底(见 main 流程) ("sph", re.compile(r"^export/([A-Za-z0-9+/=_\-]+)$")), ] def parse_url(url: str) -> Optional[Tuple[str, str]]: """从 URL 解析出 (platform, content_id)。返回 None 表示无法解析。""" if not url or not isinstance(url, str): return None for platform, pat in _URL_PATTERNS: m = pat.search(url) if m: return platform, m.group(1) return None # ── 从 case 文件中抽取所有条目(URL + evaluation) ──────────────────────────────── def extract_entries_from_case(case_data: Any) -> List[Dict[str, Any]]: """ 从 case 数据中抽取条目,每条包含 url 和 agent 的 evaluation。 返回: [{"url": str, "evaluation": dict | None, "title": str | None, "case_id": str | None}] """ entries: List[Dict[str, Any]] = [] if not isinstance(case_data, dict): return entries # 新格式:工序发现[].帖子链接 for item in case_data.get("工序发现", []) or []: if isinstance(item, dict): link = item.get("帖子链接") or item.get("source_url") if link: entries.append({ "url": link, "evaluation": item.get("evaluation"), "title": item.get("title"), "case_id": item.get("case_id"), }) # 主格式:cases[] for item in case_data.get("cases", []) or []: if isinstance(item, dict): link = item.get("source_url") or item.get("帖子链接") if link: entries.append({ "url": link, "evaluation": item.get("evaluation"), "title": item.get("title"), "case_id": item.get("case_id"), }) return entries def extract_urls_from_case(case_data: Any) -> List[str]: """[Legacy] 旧接口,保留供可能的外部调用。内部改用 extract_entries_from_case。""" return [e["url"] for e in extract_entries_from_case(case_data)] # ── 过滤规则(统一入口) ──────────────────────────────── # 默认阈值:body_text 最少字符数、时效截止 # 注:质量门槛已从这里的规则过滤移交给 LLM rubric 评估(见 llm_evaluate_sources.py), # 本模块只做廉价的预筛(内容完整性 + 时效)。 DEFAULT_MIN_BODY_LEN = 30 DEFAULT_CUTOFF_DATE = (2025, 10, 1) _TRANSCRIPT_MARKER = "[视频字幕]" def _merge_transcript_into_body(post: Dict[str, Any]) -> Dict[str, Any]: """Return a shallow copy of `post` where `body_text` includes the video transcript. For sph/douyin/x video posts whose original body_text is just hashtags or empty, this exposes the real video content (Deepgram transcript or YouTube captions) under the field the frontend reads. The original transcript field is left intact so downstream code that wants it separately still works. Idempotent: if body_text already contains the `[视频字幕]` marker, skip. """ if not isinstance(post, dict): return post transcript = post.get("video_transcript") or post.get("captions") or "" if not isinstance(transcript, str) or not transcript.strip(): return post transcript = transcript.strip() body = post.get("body_text") or post.get("desc") or "" body = body.strip() if isinstance(body, str) else "" if body and _TRANSCRIPT_MARKER in body: return post # 已经合并过 merged = dict(post) # shallow copy — body_text 是 top-level field if body: merged["body_text"] = f"{body}\n\n{_TRANSCRIPT_MARKER}\n{transcript}" else: merged["body_text"] = f"{_TRANSCRIPT_MARKER}\n{transcript}" return merged def _needs_transcribe(platform: str, post: Dict[str, Any]) -> bool: """是否需要给这条 post 跑 Deepgram 转写。 语义(用户明确约束): - `video_transcript` 字段**缺失** → 视为"从未尝试",需要跑 - `video_transcript` 字段存在(即使为空字符串 "")→ 视为"已尝试过",跳过 (Deepgram 对纯音乐/无人声视频会返回空,这是合法的"失败"标记,不重跑) - 必须有视频源(用 transcription.extract_video_url 统一判断,跨平台) """ if not isinstance(post, dict): return False if "video_transcript" in post: return False try: from agent.tools.builtin.content.transcription import extract_video_url return bool(extract_video_url(platform, post)) except Exception: return False async def _transcribe_one_post( platform: str, post: Dict[str, Any], sem: asyncio.Semaphore, ) -> Optional[str]: """对一条 post 跑 transcribe_video_from_post,写回 post["video_transcript"]。 成功 → post["video_transcript"] = transcript 失败 → post["video_transcript"] = "" # 明确"已尝试"标记,避免后续 backfill 重跑 返回 transcript 或 None。 """ from agent.tools.builtin.content.transcription import transcribe_video_from_post async with sem: try: text = await transcribe_video_from_post(platform, post) except Exception as e: logger.warning("transcribe failed (%s): %s", platform, e) text = None if text: post["video_transcript"] = text return text # 失败也写一个 "",表示"我们尝试过 Deepgram 但没拿到结果" post["video_transcript"] = "" return None async def _transcribe_pending_async( matched: List[Dict[str, Any]], concurrency: int = 3, ) -> Dict[Tuple[str, str], str]: """对 matched 里所有"缺 video_transcript 字段 + 有视频源"的 post 并发跑 Deepgram。 返回成功的 {(platform, channel_content_id): transcript_text} 映射, 供调用方写回 cache 文件(让其他 trace 命中同一 post 时复用)。 """ targets: List[Tuple[str, Dict[str, Any]]] = [] for src in matched: platform = src.get("platform") post = src.get("post") if not isinstance(post, dict) or not platform: continue if _needs_transcribe(platform, post): targets.append((platform, post)) if not targets: return {} logger.info("Auto-transcribe: %d post(s) pending", len(targets)) sem = asyncio.Semaphore(concurrency) results = await asyncio.gather( *[_transcribe_one_post(p, post, sem) for p, post in targets] ) updates: Dict[Tuple[str, str], str] = {} success_n = 0 for (platform, post), text in zip(targets, results): if text: success_n += 1 cid = post.get("channel_content_id") or post.get("video_id") if cid: updates[(platform, str(cid))] = text logger.info("Auto-transcribe: %d/%d success", success_n, len(targets)) return updates def _writeback_transcript_to_cache( cache_dir: Path, updates: Dict[Tuple[str, str], str], ) -> int: """把新拿到的 transcript 写回所有 cache 文件里匹配的 post。 跨 trace 扩散:同一条 video 可能被多个 trace 的搜索引用过,这里一次写回所有 cache 副本,避免下次另一个 trace 跑 extract_sources 时又触发一遍 Deepgram。 返回 cache 中被更新的 post 总数。 """ if not updates or not cache_dir.exists(): return 0 written = 0 for cf in cache_dir.glob("*.json"): try: data = json.loads(cf.read_text(encoding="utf-8")) except Exception: continue dirty = False for key, entry in data.items(): if not key.startswith("search:") or not isinstance(entry, dict): continue platform = key.split(":", 1)[1] post_lists = [] for h in entry.get("history", []) or []: post_lists.append(h.get("posts", [])) if "posts" in entry and isinstance(entry["posts"], list): post_lists.append(entry["posts"]) for posts in post_lists: for post in posts or []: if not isinstance(post, dict): continue cid = post.get("channel_content_id") or post.get("video_id") if not cid: continue text = updates.get((platform, str(cid))) # 注意:用 "video_transcript" not in post 判断,跟 _needs_transcribe 语义一致 # 已经有字段(即使为空)→ 不覆盖,尊重之前的"已尝试"状态 if text and "video_transcript" not in post: post["video_transcript"] = text dirty = True written += 1 if dirty: cf.write_text( json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8", ) return written def _is_before_cutoff(source: Dict[str, Any], cutoff_ts: int) -> bool: """判断帖子是否早于截止时间戳(秒级) 如果 timestamp 为 0 或不存在,返回 False(保留) """ post = source.get("post", {}) if not isinstance(post, dict): return False ts = post.get("publish_timestamp") # 没有 timestamp 或为 0,保留 if not ts or ts == 0: return False try: ts = int(ts) # 毫秒级转秒级 if ts > 1000000000000: ts = ts / 1000 return ts < cutoff_ts except (ValueError, TypeError): pass # 尝试解析字符串格式 "2025-05-02 19:25:30" try: from datetime import datetime dt = datetime.strptime(str(ts), "%Y-%m-%d %H:%M:%S") return dt.timestamp() < cutoff_ts except Exception: # 解析失败,保留 return False def _check_filters( source: Dict[str, Any], cutoff_ts: int, min_body_len: int, ) -> Optional[str]: """ 对一条 source 做廉价预筛(内容完整性 + 时效)。 质量评分这道闸已移交给下游的 LLM rubric 评估(llm_evaluate_sources.py)—— 这里不再读取 agent 自评分 overall_score。 返回: None —— 通过预筛 非 None 字符串 —— 不合格的原因(写入 filter_reason) """ post = source.get("post", {}) or {} # 1. 内容完整性:视频帖把 video_transcript 也计入(视频帖 body_text 通常为空) body = post.get("body_text") or post.get("desc") or "" transcript = post.get("video_transcript") or post.get("captions") or "" body_len = len(body.strip()) if isinstance(body, str) else 0 transcript_len = len(transcript.strip()) if isinstance(transcript, str) else 0 total_len = body_len + transcript_len if total_len < min_body_len: return f"missing_body_text:body={body_len},transcript={transcript_len}" # 2. 过时(复用 _is_before_cutoff 对时间戳的解析) if _is_before_cutoff(source, cutoff_ts): return "outdated" return None # ── 从 cache 中构建 (platform, content_id) → post 索引 ──────────────────────────────── def _normalize_url(url: str) -> Optional[str]: """规范化 URL:排序 query 参数,去掉尾斜杠""" try: parsed = urlparse(url) if parsed.query: # 排序 query 参数 params = parse_qs(parsed.query, keep_blank_values=True) sorted_query = urlencode(sorted(params.items()), doseq=True) normalized = f"{parsed.scheme}://{parsed.netloc}{parsed.path}?{sorted_query}" else: normalized = f"{parsed.scheme}://{parsed.netloc}{parsed.path}" return normalized.rstrip('/') except: return None def _normalize_post_in_place(platform: str, post: Dict[str, Any]) -> None: """对 cache 里读出的 post 做平台相关的字段补齐(in-place)。 早期 cache 可能在 platform-level normalize 函数加上之前就写入了,此处兜底补救: YouTube: description_snippet -> body_text / thumbnails -> images / url -> videos / ... sph: title (caption) -> body_text (视频号 title 字段塞的是整段 caption) """ if platform == "youtube": try: from agent.tools.builtin.content.platforms.youtube import _normalize_youtube_post _normalize_youtube_post(post) except Exception: pass elif platform == "sph": try: from agent.tools.builtin.content.platforms.aigc_channel import _normalize_sph_post _normalize_sph_post(post) except Exception: pass def build_cache_index(cache_dir: Path, trace_ids: Optional[List[str]] = None) -> Dict[Tuple[str, str], Dict[str, Any]]: """ 构建 (platform, channel_content_id) -> post 映射。 Args: cache_dir: cache 目录路径 trace_ids: 可选的 trace_id 列表。如果提供,只加载这些特定的 cache 文件; 否则扫描所有 cache 文件 Returns: (platform, content_id) -> post 的映射字典 """ index: Dict[Tuple[str, str], Dict[str, Any]] = {} if not cache_dir.exists(): return index # 如果提供了 trace_ids,只加载这些特定文件 if trace_ids: cache_files = [cache_dir / f"{tid}.json" for tid in trace_ids if tid] cache_files = [f for f in cache_files if f.exists()] else: # 否则扫描所有 cache 文件 cache_files = list(cache_dir.glob("*.json")) for cache_file in cache_files: try: with open(cache_file, "r", encoding="utf-8") as f: data = json.load(f) except Exception: continue for key, entry in data.items(): if not key.startswith("search:"): continue platform = key.split(":", 1)[1] # 新格式:entry = {"history": [...], "latest_index": n} # 旧格式:entry = {"keyword": ..., "posts": [...]} if isinstance(entry, dict) and "history" in entry: post_lists = [h.get("posts", []) for h in entry.get("history", [])] elif isinstance(entry, dict) and "posts" in entry: post_lists = [entry.get("posts", [])] else: continue for posts in post_lists: for post in posts or []: if not isinstance(post, dict): continue # 平台字段 normalize:兜底救援早期 cache(normalize 函数加之前写入的) _normalize_post_in_place(platform, post) cid = post.get("channel_content_id") # YouTube 平台用 video_id 而非 channel_content_id # (normalize 已经处理过,这里是双保险,对早期未 normalize 的 post 也兜底) if not cid and post.get("video_id"): cid = post.get("video_id") post["channel_content_id"] = cid # 补全字段 if cid: # 用 (platform, content_id) 作为索引键 index[(platform, str(cid))] = post # 额外用 link / url 字段建立索引(用于 GZH 等平台) link = post.get("link") or post.get("url") or "" if link: index[("url", link)] = post # 规范化 URL 索引(排序 query 参数,去掉尾斜杠) norm = _normalize_url(link) if norm: index[("norm_url", norm)] = post return index # ── 主入口 ──────────────────────────────── def extract_sources_to_json( raw_cases_dir: Path, cache_dir: Optional[Path] = None, output_name: str = "source.json", trace_ids: Optional[List[str]] = None, min_body_len: int = DEFAULT_MIN_BODY_LEN, cutoff_date: Tuple[int, int, int] = DEFAULT_CUTOFF_DATE, auto_transcribe: bool = True, transcribe_concurrency: int = 3, ) -> Dict[str, Any]: """ 扫描 raw_cases_dir 下的 case_*.json, 从 cache 中找出原始帖子,并对结果执行廉价预筛(body_text 完整性 / 时效)。 预筛规则(均可通过参数调整): - body_text 为空或少于 min_body_len 字符 → filter_reason=missing_body_text - publish_timestamp 早于 cutoff_date → filter_reason=outdated 注:质量门槛(原 agent 自评分 overall_score>=70)已移交给下游 llm_evaluate_sources.py 的 LLM rubric 评估,本函数不再做评分过滤。 参数: min_body_len: body_text 最少字符数,默认 30 cutoff_date: 过时截止日期 (year, month, day),默认 (2025, 10, 1) 返回统计信息 dict。 """ raw_cases_dir = Path(raw_cases_dir) if cache_dir is None: # 项目根目录:script 文件往上三级 project_root = Path(__file__).resolve().parent.parent.parent.parent cache_dir = project_root / ".cache" / "content_search" cache_dir = Path(cache_dir) # 1. 构建 cache 索引 cache_index = build_cache_index(cache_dir, trace_ids=trace_ids) # 2. 加载已有的 source.json(如果存在) output_file = raw_cases_dir / output_name existing_sources = [] existing_ids = set() # (platform, channel_content_id) 集合用于去重 if output_file.exists(): try: with open(output_file, "r", encoding="utf-8") as f: existing_data = json.load(f) existing_sources = existing_data.get("sources", []) # 构建已有的 ID 集合 for src in existing_sources: key = (src.get("platform"), src.get("channel_content_id")) existing_ids.add(key) except Exception as e: print(f"Warning: Failed to load existing source.json: {e}") # 2.1 加载已有的 filtered_cases.json(如果存在) filtered_output_file = raw_cases_dir / "filtered_cases.json" existing_filtered_sources = [] existing_filtered_ids = set() if filtered_output_file.exists(): try: with open(filtered_output_file, "r", encoding="utf-8") as f: filtered_data = json.load(f) existing_filtered_sources = filtered_data.get("sources", []) for src in existing_filtered_sources: key = (src.get("platform"), src.get("channel_content_id")) existing_filtered_ids.add(key) except Exception as e: print(f"Warning: Failed to load existing filtered_cases.json: {e}") # 3. 扫描所有 case 文件 matched: List[Dict[str, Any]] = [] unmatched: List[Dict[str, Any]] = [] seen_keys: set = set(existing_ids) # 从已有的 ID 开始 for case_file in sorted(raw_cases_dir.glob("case_*.json")): # 跳过自己(如果 source.json 误被命名成 case_*) if case_file.name == output_name: continue try: with open(case_file, "r", encoding="utf-8") as f: case_data = json.load(f) except Exception as e: # 尝试自动修复 JSON 格式错误 try: from examples.process_pipeline.script.fix_json_quotes import try_fix_and_parse with open(case_file, "r", encoding="utf-8") as f: raw_content = f.read() success, case_data, fix_desc = try_fix_and_parse(raw_content) if success: # 修复成功,写回文件 with open(case_file, "w", encoding="utf-8") as f: json.dump(case_data, f, ensure_ascii=False, indent=2) print(f" 🔧 [Auto-Fix] Fixed {case_file.name}: {fix_desc}") else: unmatched.append({"case_file": case_file.name, "error": str(e)}) continue except Exception: unmatched.append({"case_file": case_file.name, "error": str(e)}) continue entries = extract_entries_from_case(case_data) for entry in entries: url = entry["url"] evaluation = entry.get("evaluation") # 解析 URL 得到 platform 和 content_id(可能失败,例如 sph 的 export/... token) parsed = parse_url(url) platform: Optional[str] = parsed[0] if parsed else None cid: Optional[str] = parsed[1] if parsed else None # 多级匹配,按优先级: # 1. (platform, content_id) 精确匹配(parse_url 成功的情况) # 2. ("url", link) 完整字符串匹配(兜底,对 sph 这种非 http URL 必需) # 3. ("norm_url", normalized) 规范化匹配 post = None if platform and cid: post = cache_index.get((platform, cid)) if not post: post = cache_index.get(("url", url)) if not post: norm = _normalize_url(url) if norm: post = cache_index.get(("norm_url", norm)) # 如果 parse_url 失败但 URL 索引命中了 post,从 post 推断 platform / cid if post and (not platform or not cid): platform = platform or post.get("channel") or "unknown" cid = cid or post.get("channel_content_id") or post.get("video_id") or url # 既无 parsed 又无 cache 命中:真 unmatched if not post and not platform: unmatched.append({ "case_file": case_file.name, "url": url, "reason": "url_parse_failed", }) continue key = (platform, str(cid) if cid else url) if key in seen_keys: continue seen_keys.add(key) if post: # 统一用 cache 中的 channel_content_id 生成 case_id # 这样保证 case_id 和 cache 中的 ID 一致 actual_cid = post.get("channel_content_id") or post.get("video_id") or cid actual_case_id = f"{platform}_{actual_cid}" # 合并 video_transcript 到 body_text(视频帖前端展示需要) post_for_source = _merge_transcript_into_body(post) matched.append({ "case_id": actual_case_id, "case_file": case_file.name, "platform": platform, "channel_content_id": str(actual_cid), "source_url": url, "evaluation": evaluation, "post": post_for_source, "comments": post.get("author_comments", []) or [], }) else: unmatched.append({ "case_id": f"{platform}_{cid}", # 统一格式的 ID "case_file": case_file.name, "platform": platform, "channel_content_id": cid, "source_url": url, "reason": "not_in_cache", }) # 4. 合并已有数据和新匹配的数据 all_sources = existing_sources + matched # 4.5. 自动 backfill 视频转写(保底兜底) # 触发条件:post 有视频源(extract_video_url 非空)且**完全没有 `video_transcript` 字段** # 空字符串视为"已尝试过"不重跑,跨平台统一。失败也会写 "",下次跳过避免反复浪费 Deepgram 额度。 # 跑完写回所有 cache 文件,让其他 trace 引用同一 post 时直接复用。 auto_transcribe_stats: Dict[str, Any] = {"attempted": 0, "succeeded": 0, "cache_writeback": 0} if auto_transcribe and all_sources: try: transcribe_targets = sum( 1 for s in all_sources if isinstance(s.get("post"), dict) and _needs_transcribe(s.get("platform"), s["post"]) ) if transcribe_targets > 0: logger.info("extract_sources: auto-transcribe %d post(s)", transcribe_targets) updates = asyncio.run( _transcribe_pending_async(all_sources, concurrency=transcribe_concurrency) ) auto_transcribe_stats["attempted"] = transcribe_targets auto_transcribe_stats["succeeded"] = len(updates) # 写回 cache(跨 trace 扩散) if updates: n = _writeback_transcript_to_cache(cache_dir, updates) auto_transcribe_stats["cache_writeback"] = n # 顺手把 transcript merge 进 body_text,保持跟 _merge_transcript_into_body 一致 for s in all_sources: post = s.get("post") if not isinstance(post, dict): continue if not post.get("video_transcript"): continue merged = _merge_transcript_into_body(post) if merged is not post: post["body_text"] = merged.get("body_text", post.get("body_text", "")) except RuntimeError as e: # 比如已在 event loop 内 — 跳过 auto-transcribe 不阻塞主流程 logger.warning("auto-transcribe skipped: %s", e) except Exception as e: logger.warning("auto-transcribe failed: %s", e) # 5. 廉价预筛:body_text 完整性 / 时效(质量门槛交给下游 LLM rubric 评估) from datetime import datetime as _dt cutoff_ts = int(_dt(*cutoff_date).timestamp()) kept_sources: List[Dict[str, Any]] = [] filtered_sources: List[Dict[str, Any]] = [] reason_counts: Dict[str, int] = {} for s in all_sources: reason = _check_filters(s, cutoff_ts, min_body_len) if reason is None: kept_sources.append(s) else: # 记录过滤原因 s_copy = dict(s) s_copy["filter_reason"] = reason filtered_sources.append(s_copy) # 统计原因类型(只取冒号前的类别) category = reason.split(":", 1)[0] reason_counts[category] = reason_counts.get(category, 0) + 1 all_sources = kept_sources filtered_count = len(filtered_sources) # 6. 转换 timestamp 为可读格式 _convert_timestamps(all_sources) _convert_timestamps(filtered_sources) # 7. 写 source.json output = { "total": len(all_sources), "cache_dir": str(cache_dir), "sources": all_sources, } output_file.parent.mkdir(parents=True, exist_ok=True) with open(output_file, "w", encoding="utf-8") as f: json.dump(output, f, ensure_ascii=False, indent=2) # 8. 写 filtered_cases.json(被过滤掉的帖子,按原因分组) if filtered_sources: for fs in filtered_sources: key = (fs.get("platform"), fs.get("channel_content_id")) if key not in existing_filtered_ids: existing_filtered_sources.append(fs) existing_filtered_ids.add(key) # 按原因类别分组 by_reason: Dict[str, List[Dict[str, Any]]] = {} for fs in existing_filtered_sources: reason = fs.get("filter_reason", "unknown") category = reason.split(":", 1)[0] by_reason.setdefault(category, []).append(fs) filtered_output = { "total": len(existing_filtered_sources), "by_reason": { category: { "count": len(items), "sources": items, } for category, items in by_reason.items() }, } with open(filtered_output_file, "w", encoding="utf-8") as f: json.dump(filtered_output, f, ensure_ascii=False, indent=2) # 9. 下载图片到 raw_cases/images/{case_id}/ images_downloaded = download_images_for_sources(matched, raw_cases_dir) # 10. 构建被过滤条目的摘要(供续跑 feedback 使用) filtered_details: List[Dict[str, Any]] = [] for fs in filtered_sources: post = fs.get("post", {}) or {} title = post.get("title") or fs.get("source_url", "") filtered_details.append({ "case_id": fs.get("case_id", ""), "platform": fs.get("platform", ""), "title": title[:60] if title else "", "filter_reason": fs.get("filter_reason", ""), }) # 返回统计信息 return { "total_matched": len(matched), "total_existing": len(existing_sources), "total_unmatched": len(unmatched), "filtered_total": filtered_count, "filtered_reasons": reason_counts, "filtered_details": filtered_details, "images_downloaded": images_downloaded, "auto_transcribe": auto_transcribe_stats, "output_file": str(output_file), } # ── 图片下载 ──────────────────────────────── def _get_image_urls_from_post(post: Dict[str, Any]) -> List[str]: """从 post 中提取所有图片 URL""" urls = [] images = post.get("images", []) if isinstance(images, list): for img in images: if isinstance(img, str) and img.startswith("http"): urls.append(img) elif isinstance(img, dict) and "url" in img: urls.append(img["url"]) image_url_list = post.get("image_url_list", []) if isinstance(image_url_list, list): for img_obj in image_url_list: if isinstance(img_obj, dict) and "image_url" in img_obj: urls.append(img_obj["image_url"]) return urls def _format_timestamp(ts: Any) -> Optional[str]: """将时间戳(秒/毫秒)转换为可读格式""" from datetime import datetime if ts is None or ts == 0 or ts == "": return None try: ts = int(ts) # 毫秒级时间戳 if ts > 1000000000000: ts = ts / 1000 dt = datetime.fromtimestamp(ts) return dt.strftime("%Y-%m-%d %H:%M:%S") except Exception: return None def _convert_timestamps(sources: List[Dict[str, Any]]) -> None: """将 source 列表中 post 的时间戳字段替换为可读格式""" timestamp_fields = ["publish_timestamp", "modify_timestamp", "update_timestamp"] for src in sources: post = src.get("post", {}) if not isinstance(post, dict): continue for field in timestamp_fields: if field in post: readable = _format_timestamp(post.get(field)) if readable: post[field] = readable def download_images_for_sources(sources: List[Dict[str, Any]], raw_cases_dir: Path) -> int: """ 为新匹配的 sources 下载图片到 raw_cases/images/{case_id}/ Returns: 下载成功的图片总数 """ import urllib.request import urllib.error images_base = raw_cases_dir / "images" total_downloaded = 0 # 设置 headers 避免被拒(X/Twitter 需要 User-Agent) opener = urllib.request.build_opener() opener.addheaders = [("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36")] urllib.request.install_opener(opener) for src in sources: case_id = src.get("case_id", "") post = src.get("post", {}) image_urls = _get_image_urls_from_post(post) if not image_urls: continue case_dir = images_base / case_id case_dir.mkdir(parents=True, exist_ok=True) for idx, url in enumerate(image_urls): # 确定文件扩展名 ext = ".jpg" if ".png" in url.lower(): ext = ".png" elif ".webp" in url.lower(): ext = ".webp" save_path = case_dir / f"{idx:02d}{ext}" if save_path.exists(): total_downloaded += 1 continue try: urllib.request.urlretrieve(url, str(save_path)) total_downloaded += 1 except Exception: # 下载失败就跳过,不中断流程 pass return total_downloaded if __name__ == "__main__": # CLI:python extract_sources.py [cache_dir] import sys if len(sys.argv) < 2: print("Usage: python extract_sources.py [cache_dir]") sys.exit(1) raw_cases_dir = Path(sys.argv[1]) cache_dir = Path(sys.argv[2]) if len(sys.argv) > 2 else None result = extract_sources_to_json(raw_cases_dir, cache_dir=cache_dir) print(f"[OK] Matched: {result['total_matched']}, Unmatched: {result['total_unmatched']}") print(f" Output: {raw_cases_dir / 'source.json'}")