extract_sources.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897
  1. """
  2. 从 raw_cases/case_*.json 中提取 source_url / 帖子链接,
  3. 解析 channel_content_id,再从 .cache/content_search 中查找对应的原始帖子数据。
  4. 主函数:extract_sources_to_json(raw_cases_dir)
  5. - 扫描该目录下所有 case_{platform}.json
  6. - 解析每个 "工序发现[].帖子链接"(新格式)或 "cases[].source_url"(旧格式)
  7. - 从项目根的 .cache/content_search/*.json 中匹配 channel_content_id
  8. - 把匹配到的完整 post 写入 {raw_cases_dir}/source.json
  9. - 同时下载图片到 {raw_cases_dir}/images/{case_id}/
  10. """
  11. import json
  12. import logging
  13. import re
  14. from pathlib import Path
  15. from typing import Any, Dict, List, Optional, Tuple
  16. import asyncio
  17. import aiohttp
  18. from urllib.parse import urlparse, parse_qs, urlencode
  19. logger = logging.getLogger(__name__)
  20. # ── URL → (platform, content_id) 解析 ────────────────────────────────
  21. _URL_PATTERNS = [
  22. # B站: https://www.bilibili.com/video/BV1xxx
  23. ("bili", re.compile(r"bilibili\.com/video/(BV[\w]+)")),
  24. # 小红书: https://www.xiaohongshu.com/explore/{id} 或 /discovery/item/{id}
  25. ("xhs", re.compile(r"xiaohongshu\.com/(?:explore|discovery/item)/([a-f0-9]+)")),
  26. # YouTube: https://www.youtube.com/watch?v={id} 或 https://youtu.be/{id}
  27. ("youtube", re.compile(r"(?:youtube\.com/watch\?v=|youtu\.be/)([\w-]+)")),
  28. # X/Twitter: https://x.com/{user}/status/{id} 或 twitter.com
  29. ("x", re.compile(r"(?:x\.com|twitter\.com)/[^/]+/status/(\d+)")),
  30. # 知乎: https://zhuanlan.zhihu.com/p/{id} 或 zhihu.com/question/{qid}/answer/{aid}
  31. ("zhihu", re.compile(r"zhuanlan\.zhihu\.com/p/(\d+)")),
  32. ("zhihu", re.compile(r"zhihu\.com/question/\d+/answer/(\d+)")),
  33. # 公众号: 通过 __biz 或整个 URL 作为 id(后备)
  34. ("gzh", re.compile(r"mp\.weixin\.qq\.com/s[/?]([^\s\"']+)")),
  35. # 抖音: https://www.douyin.com/video/{numeric_id} — content_id 直接匹配
  36. # douyin post.channel_content_id
  37. ("douyin", re.compile(r"douyin\.com/video/(\d+)")),
  38. # 视频号: post.link 是 'export/{base64-like token}' 不是 http URL;
  39. # 这里识别出来打上 sph 标签,真实定位走 ("url", link) 索引兜底(见 main 流程)
  40. ("sph", re.compile(r"^export/([A-Za-z0-9+/=_\-]+)$")),
  41. ]
  42. def parse_url(url: str) -> Optional[Tuple[str, str]]:
  43. """从 URL 解析出 (platform, content_id)。返回 None 表示无法解析。"""
  44. if not url or not isinstance(url, str):
  45. return None
  46. for platform, pat in _URL_PATTERNS:
  47. m = pat.search(url)
  48. if m:
  49. return platform, m.group(1)
  50. return None
  51. # ── 从 case 文件中抽取所有条目(URL + evaluation) ────────────────────────────────
  52. def extract_entries_from_case(case_data: Any) -> List[Dict[str, Any]]:
  53. """
  54. 从 case 数据中抽取条目,每条包含 url 和 agent 的 evaluation。
  55. 返回: [{"url": str, "evaluation": dict | None, "title": str | None, "case_id": str | None}]
  56. """
  57. entries: List[Dict[str, Any]] = []
  58. if not isinstance(case_data, dict):
  59. return entries
  60. # 新格式:工序发现[].帖子链接
  61. for item in case_data.get("工序发现", []) or []:
  62. if isinstance(item, dict):
  63. link = item.get("帖子链接") or item.get("source_url")
  64. if link:
  65. entries.append({
  66. "url": link,
  67. "evaluation": item.get("evaluation"),
  68. "title": item.get("title"),
  69. "case_id": item.get("case_id"),
  70. })
  71. # 主格式:cases[]
  72. for item in case_data.get("cases", []) or []:
  73. if isinstance(item, dict):
  74. link = item.get("source_url") or item.get("帖子链接")
  75. if link:
  76. entries.append({
  77. "url": link,
  78. "evaluation": item.get("evaluation"),
  79. "title": item.get("title"),
  80. "case_id": item.get("case_id"),
  81. })
  82. return entries
  83. def extract_urls_from_case(case_data: Any) -> List[str]:
  84. """[Legacy] 旧接口,保留供可能的外部调用。内部改用 extract_entries_from_case。"""
  85. return [e["url"] for e in extract_entries_from_case(case_data)]
  86. # ── 过滤规则(统一入口) ────────────────────────────────
  87. # 默认阈值:body_text 最少字符数、时效截止
  88. # 注:质量门槛已从这里的规则过滤移交给 LLM rubric 评估(见 llm_evaluate_sources.py),
  89. # 本模块只做廉价的预筛(内容完整性 + 时效)。
  90. DEFAULT_MIN_BODY_LEN = 30
  91. DEFAULT_CUTOFF_DATE = (2025, 10, 1)
  92. _TRANSCRIPT_MARKER = "[视频字幕]"
  93. def _merge_transcript_into_body(post: Dict[str, Any]) -> Dict[str, Any]:
  94. """Return a shallow copy of `post` where `body_text` includes the video transcript.
  95. For sph/douyin/x video posts whose original body_text is just hashtags or empty,
  96. this exposes the real video content (Deepgram transcript or YouTube captions)
  97. under the field the frontend reads. The original transcript field is left intact
  98. so downstream code that wants it separately still works.
  99. Idempotent: if body_text already contains the `[视频字幕]` marker, skip.
  100. """
  101. if not isinstance(post, dict):
  102. return post
  103. transcript = post.get("video_transcript") or post.get("captions") or ""
  104. if not isinstance(transcript, str) or not transcript.strip():
  105. return post
  106. transcript = transcript.strip()
  107. body = post.get("body_text") or post.get("desc") or ""
  108. body = body.strip() if isinstance(body, str) else ""
  109. if body and _TRANSCRIPT_MARKER in body:
  110. return post # 已经合并过
  111. merged = dict(post) # shallow copy — body_text 是 top-level field
  112. if body:
  113. merged["body_text"] = f"{body}\n\n{_TRANSCRIPT_MARKER}\n{transcript}"
  114. else:
  115. merged["body_text"] = f"{_TRANSCRIPT_MARKER}\n{transcript}"
  116. return merged
  117. def _needs_transcribe(platform: str, post: Dict[str, Any]) -> bool:
  118. """是否需要给这条 post 跑 Deepgram 转写。
  119. 语义(用户明确约束):
  120. - `video_transcript` 字段**缺失** → 视为"从未尝试",需要跑
  121. - `video_transcript` 字段存在(即使为空字符串 "")→ 视为"已尝试过",跳过
  122. (Deepgram 对纯音乐/无人声视频会返回空,这是合法的"失败"标记,不重跑)
  123. - 必须有视频源(用 transcription.extract_video_url 统一判断,跨平台)
  124. """
  125. if not isinstance(post, dict):
  126. return False
  127. if "video_transcript" in post:
  128. return False
  129. try:
  130. from agent.tools.builtin.content.transcription import extract_video_url
  131. return bool(extract_video_url(platform, post))
  132. except Exception:
  133. return False
  134. async def _transcribe_one_post(
  135. platform: str,
  136. post: Dict[str, Any],
  137. sem: asyncio.Semaphore,
  138. ) -> Optional[str]:
  139. """对一条 post 跑 transcribe_video_from_post,写回 post["video_transcript"]。
  140. 成功 → post["video_transcript"] = transcript
  141. 失败 → post["video_transcript"] = "" # 明确"已尝试"标记,避免后续 backfill 重跑
  142. 返回 transcript 或 None。
  143. """
  144. from agent.tools.builtin.content.transcription import transcribe_video_from_post
  145. async with sem:
  146. try:
  147. text = await transcribe_video_from_post(platform, post)
  148. except Exception as e:
  149. logger.warning("transcribe failed (%s): %s", platform, e)
  150. text = None
  151. if text:
  152. post["video_transcript"] = text
  153. return text
  154. # 失败也写一个 "",表示"我们尝试过 Deepgram 但没拿到结果"
  155. post["video_transcript"] = ""
  156. return None
  157. async def _transcribe_pending_async(
  158. matched: List[Dict[str, Any]],
  159. concurrency: int = 3,
  160. ) -> Dict[Tuple[str, str], str]:
  161. """对 matched 里所有"缺 video_transcript 字段 + 有视频源"的 post 并发跑 Deepgram。
  162. 返回成功的 {(platform, channel_content_id): transcript_text} 映射,
  163. 供调用方写回 cache 文件(让其他 trace 命中同一 post 时复用)。
  164. """
  165. targets: List[Tuple[str, Dict[str, Any]]] = []
  166. for src in matched:
  167. platform = src.get("platform")
  168. post = src.get("post")
  169. if not isinstance(post, dict) or not platform:
  170. continue
  171. if _needs_transcribe(platform, post):
  172. targets.append((platform, post))
  173. if not targets:
  174. return {}
  175. logger.info("Auto-transcribe: %d post(s) pending", len(targets))
  176. sem = asyncio.Semaphore(concurrency)
  177. results = await asyncio.gather(
  178. *[_transcribe_one_post(p, post, sem) for p, post in targets]
  179. )
  180. updates: Dict[Tuple[str, str], str] = {}
  181. success_n = 0
  182. for (platform, post), text in zip(targets, results):
  183. if text:
  184. success_n += 1
  185. cid = post.get("channel_content_id") or post.get("video_id")
  186. if cid:
  187. updates[(platform, str(cid))] = text
  188. logger.info("Auto-transcribe: %d/%d success", success_n, len(targets))
  189. return updates
  190. def _writeback_transcript_to_cache(
  191. cache_dir: Path,
  192. updates: Dict[Tuple[str, str], str],
  193. ) -> int:
  194. """把新拿到的 transcript 写回所有 cache 文件里匹配的 post。
  195. 跨 trace 扩散:同一条 video 可能被多个 trace 的搜索引用过,这里一次写回所有
  196. cache 副本,避免下次另一个 trace 跑 extract_sources 时又触发一遍 Deepgram。
  197. 返回 cache 中被更新的 post 总数。
  198. """
  199. if not updates or not cache_dir.exists():
  200. return 0
  201. written = 0
  202. for cf in cache_dir.glob("*.json"):
  203. try:
  204. data = json.loads(cf.read_text(encoding="utf-8"))
  205. except Exception:
  206. continue
  207. dirty = False
  208. for key, entry in data.items():
  209. if not key.startswith("search:") or not isinstance(entry, dict):
  210. continue
  211. platform = key.split(":", 1)[1]
  212. post_lists = []
  213. for h in entry.get("history", []) or []:
  214. post_lists.append(h.get("posts", []))
  215. if "posts" in entry and isinstance(entry["posts"], list):
  216. post_lists.append(entry["posts"])
  217. for posts in post_lists:
  218. for post in posts or []:
  219. if not isinstance(post, dict):
  220. continue
  221. cid = post.get("channel_content_id") or post.get("video_id")
  222. if not cid:
  223. continue
  224. text = updates.get((platform, str(cid)))
  225. # 注意:用 "video_transcript" not in post 判断,跟 _needs_transcribe 语义一致
  226. # 已经有字段(即使为空)→ 不覆盖,尊重之前的"已尝试"状态
  227. if text and "video_transcript" not in post:
  228. post["video_transcript"] = text
  229. dirty = True
  230. written += 1
  231. if dirty:
  232. cf.write_text(
  233. json.dumps(data, ensure_ascii=False, indent=2),
  234. encoding="utf-8",
  235. )
  236. return written
  237. def _is_before_cutoff(source: Dict[str, Any], cutoff_ts: int) -> bool:
  238. """判断帖子是否早于截止时间戳(秒级)
  239. 如果 timestamp 为 0 或不存在,返回 False(保留)
  240. """
  241. post = source.get("post", {})
  242. if not isinstance(post, dict):
  243. return False
  244. ts = post.get("publish_timestamp")
  245. # 没有 timestamp 或为 0,保留
  246. if not ts or ts == 0:
  247. return False
  248. try:
  249. ts = int(ts)
  250. # 毫秒级转秒级
  251. if ts > 1000000000000:
  252. ts = ts / 1000
  253. return ts < cutoff_ts
  254. except (ValueError, TypeError):
  255. pass
  256. # 尝试解析字符串格式 "2025-05-02 19:25:30"
  257. try:
  258. from datetime import datetime
  259. dt = datetime.strptime(str(ts), "%Y-%m-%d %H:%M:%S")
  260. return dt.timestamp() < cutoff_ts
  261. except Exception:
  262. # 解析失败,保留
  263. return False
  264. def _check_filters(
  265. source: Dict[str, Any],
  266. cutoff_ts: int,
  267. min_body_len: int,
  268. ) -> Optional[str]:
  269. """
  270. 对一条 source 做廉价预筛(内容完整性 + 时效)。
  271. 质量评分这道闸已移交给下游的 LLM rubric 评估(llm_evaluate_sources.py)——
  272. 这里不再读取 agent 自评分 overall_score。
  273. 返回:
  274. None —— 通过预筛
  275. 非 None 字符串 —— 不合格的原因(写入 filter_reason)
  276. """
  277. post = source.get("post", {}) or {}
  278. # 1. 内容完整性:视频帖把 video_transcript 也计入(视频帖 body_text 通常为空)
  279. body = post.get("body_text") or post.get("desc") or ""
  280. transcript = post.get("video_transcript") or post.get("captions") or ""
  281. body_len = len(body.strip()) if isinstance(body, str) else 0
  282. transcript_len = len(transcript.strip()) if isinstance(transcript, str) else 0
  283. total_len = body_len + transcript_len
  284. if total_len < min_body_len:
  285. return f"missing_body_text:body={body_len},transcript={transcript_len}"
  286. # 2. 过时(复用 _is_before_cutoff 对时间戳的解析)
  287. if _is_before_cutoff(source, cutoff_ts):
  288. return "outdated"
  289. return None
  290. # ── 从 cache 中构建 (platform, content_id) → post 索引 ────────────────────────────────
  291. def _normalize_url(url: str) -> Optional[str]:
  292. """规范化 URL:排序 query 参数,去掉尾斜杠"""
  293. try:
  294. parsed = urlparse(url)
  295. if parsed.query:
  296. # 排序 query 参数
  297. params = parse_qs(parsed.query, keep_blank_values=True)
  298. sorted_query = urlencode(sorted(params.items()), doseq=True)
  299. normalized = f"{parsed.scheme}://{parsed.netloc}{parsed.path}?{sorted_query}"
  300. else:
  301. normalized = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
  302. return normalized.rstrip('/')
  303. except:
  304. return None
  305. def _normalize_post_in_place(platform: str, post: Dict[str, Any]) -> None:
  306. """对 cache 里读出的 post 做平台相关的字段补齐(in-place)。
  307. 早期 cache 可能在 platform-level normalize 函数加上之前就写入了,此处兜底补救:
  308. YouTube: description_snippet -> body_text / thumbnails -> images / url -> videos / ...
  309. sph: title (caption) -> body_text (视频号 title 字段塞的是整段 caption)
  310. """
  311. if platform == "youtube":
  312. try:
  313. from agent.tools.builtin.content.platforms.youtube import _normalize_youtube_post
  314. _normalize_youtube_post(post)
  315. except Exception:
  316. pass
  317. elif platform == "sph":
  318. try:
  319. from agent.tools.builtin.content.platforms.aigc_channel import _normalize_sph_post
  320. _normalize_sph_post(post)
  321. except Exception:
  322. pass
  323. def build_cache_index(cache_dir: Path, trace_ids: Optional[List[str]] = None) -> Dict[Tuple[str, str], Dict[str, Any]]:
  324. """
  325. 构建 (platform, channel_content_id) -> post 映射。
  326. Args:
  327. cache_dir: cache 目录路径
  328. trace_ids: 可选的 trace_id 列表。如果提供,只加载这些特定的 cache 文件;
  329. 否则扫描所有 cache 文件
  330. Returns:
  331. (platform, content_id) -> post 的映射字典
  332. """
  333. index: Dict[Tuple[str, str], Dict[str, Any]] = {}
  334. if not cache_dir.exists():
  335. return index
  336. # 如果提供了 trace_ids,只加载这些特定文件
  337. if trace_ids:
  338. cache_files = [cache_dir / f"{tid}.json" for tid in trace_ids if tid]
  339. cache_files = [f for f in cache_files if f.exists()]
  340. else:
  341. # 否则扫描所有 cache 文件
  342. cache_files = list(cache_dir.glob("*.json"))
  343. for cache_file in cache_files:
  344. try:
  345. with open(cache_file, "r", encoding="utf-8") as f:
  346. data = json.load(f)
  347. except Exception:
  348. continue
  349. for key, entry in data.items():
  350. if not key.startswith("search:"):
  351. continue
  352. platform = key.split(":", 1)[1]
  353. # 新格式:entry = {"history": [...], "latest_index": n}
  354. # 旧格式:entry = {"keyword": ..., "posts": [...]}
  355. if isinstance(entry, dict) and "history" in entry:
  356. post_lists = [h.get("posts", []) for h in entry.get("history", [])]
  357. elif isinstance(entry, dict) and "posts" in entry:
  358. post_lists = [entry.get("posts", [])]
  359. else:
  360. continue
  361. for posts in post_lists:
  362. for post in posts or []:
  363. if not isinstance(post, dict):
  364. continue
  365. # 平台字段 normalize:兜底救援早期 cache(normalize 函数加之前写入的)
  366. _normalize_post_in_place(platform, post)
  367. cid = post.get("channel_content_id")
  368. # YouTube 平台用 video_id 而非 channel_content_id
  369. # (normalize 已经处理过,这里是双保险,对早期未 normalize 的 post 也兜底)
  370. if not cid and post.get("video_id"):
  371. cid = post.get("video_id")
  372. post["channel_content_id"] = cid # 补全字段
  373. if cid:
  374. # 用 (platform, content_id) 作为索引键
  375. index[(platform, str(cid))] = post
  376. # 额外用 link / url 字段建立索引(用于 GZH 等平台)
  377. link = post.get("link") or post.get("url") or ""
  378. if link:
  379. index[("url", link)] = post
  380. # 规范化 URL 索引(排序 query 参数,去掉尾斜杠)
  381. norm = _normalize_url(link)
  382. if norm:
  383. index[("norm_url", norm)] = post
  384. return index
  385. # ── 主入口 ────────────────────────────────
  386. def extract_sources_to_json(
  387. raw_cases_dir: Path,
  388. cache_dir: Optional[Path] = None,
  389. output_name: str = "source.json",
  390. trace_ids: Optional[List[str]] = None,
  391. min_body_len: int = DEFAULT_MIN_BODY_LEN,
  392. cutoff_date: Tuple[int, int, int] = DEFAULT_CUTOFF_DATE,
  393. auto_transcribe: bool = True,
  394. transcribe_concurrency: int = 3,
  395. ) -> Dict[str, Any]:
  396. """
  397. 扫描 raw_cases_dir 下的 case_*.json,
  398. 从 cache 中找出原始帖子,并对结果执行廉价预筛(body_text 完整性 / 时效)。
  399. 预筛规则(均可通过参数调整):
  400. - body_text 为空或少于 min_body_len 字符 → filter_reason=missing_body_text
  401. - publish_timestamp 早于 cutoff_date → filter_reason=outdated
  402. 注:质量门槛(原 agent 自评分 overall_score>=70)已移交给下游
  403. llm_evaluate_sources.py 的 LLM rubric 评估,本函数不再做评分过滤。
  404. 参数:
  405. min_body_len: body_text 最少字符数,默认 30
  406. cutoff_date: 过时截止日期 (year, month, day),默认 (2025, 10, 1)
  407. 返回统计信息 dict。
  408. """
  409. raw_cases_dir = Path(raw_cases_dir)
  410. if cache_dir is None:
  411. # 项目根目录:script 文件往上三级
  412. project_root = Path(__file__).resolve().parent.parent.parent.parent
  413. cache_dir = project_root / ".cache" / "content_search"
  414. cache_dir = Path(cache_dir)
  415. # 1. 构建 cache 索引
  416. cache_index = build_cache_index(cache_dir, trace_ids=trace_ids)
  417. # 2. 加载已有的 source.json(如果存在)
  418. output_file = raw_cases_dir / output_name
  419. existing_sources = []
  420. existing_ids = set() # (platform, channel_content_id) 集合用于去重
  421. if output_file.exists():
  422. try:
  423. with open(output_file, "r", encoding="utf-8") as f:
  424. existing_data = json.load(f)
  425. existing_sources = existing_data.get("sources", [])
  426. # 构建已有的 ID 集合
  427. for src in existing_sources:
  428. key = (src.get("platform"), src.get("channel_content_id"))
  429. existing_ids.add(key)
  430. except Exception as e:
  431. print(f"Warning: Failed to load existing source.json: {e}")
  432. # 2.1 加载已有的 filtered_cases.json(如果存在)
  433. filtered_output_file = raw_cases_dir / "filtered_cases.json"
  434. existing_filtered_sources = []
  435. existing_filtered_ids = set()
  436. if filtered_output_file.exists():
  437. try:
  438. with open(filtered_output_file, "r", encoding="utf-8") as f:
  439. filtered_data = json.load(f)
  440. existing_filtered_sources = filtered_data.get("sources", [])
  441. for src in existing_filtered_sources:
  442. key = (src.get("platform"), src.get("channel_content_id"))
  443. existing_filtered_ids.add(key)
  444. except Exception as e:
  445. print(f"Warning: Failed to load existing filtered_cases.json: {e}")
  446. # 3. 扫描所有 case 文件
  447. matched: List[Dict[str, Any]] = []
  448. unmatched: List[Dict[str, Any]] = []
  449. seen_keys: set = set(existing_ids) # 从已有的 ID 开始
  450. for case_file in sorted(raw_cases_dir.glob("case_*.json")):
  451. # 跳过自己(如果 source.json 误被命名成 case_*)
  452. if case_file.name == output_name:
  453. continue
  454. try:
  455. with open(case_file, "r", encoding="utf-8") as f:
  456. case_data = json.load(f)
  457. except Exception as e:
  458. # 尝试自动修复 JSON 格式错误
  459. try:
  460. from examples.process_pipeline.script.fix_json_quotes import try_fix_and_parse
  461. with open(case_file, "r", encoding="utf-8") as f:
  462. raw_content = f.read()
  463. success, case_data, fix_desc = try_fix_and_parse(raw_content)
  464. if success:
  465. # 修复成功,写回文件
  466. with open(case_file, "w", encoding="utf-8") as f:
  467. json.dump(case_data, f, ensure_ascii=False, indent=2)
  468. print(f" 🔧 [Auto-Fix] Fixed {case_file.name}: {fix_desc}")
  469. else:
  470. unmatched.append({"case_file": case_file.name, "error": str(e)})
  471. continue
  472. except Exception:
  473. unmatched.append({"case_file": case_file.name, "error": str(e)})
  474. continue
  475. entries = extract_entries_from_case(case_data)
  476. for entry in entries:
  477. url = entry["url"]
  478. evaluation = entry.get("evaluation")
  479. # 解析 URL 得到 platform 和 content_id(可能失败,例如 sph 的 export/... token)
  480. parsed = parse_url(url)
  481. platform: Optional[str] = parsed[0] if parsed else None
  482. cid: Optional[str] = parsed[1] if parsed else None
  483. # 多级匹配,按优先级:
  484. # 1. (platform, content_id) 精确匹配(parse_url 成功的情况)
  485. # 2. ("url", link) 完整字符串匹配(兜底,对 sph 这种非 http URL 必需)
  486. # 3. ("norm_url", normalized) 规范化匹配
  487. post = None
  488. if platform and cid:
  489. post = cache_index.get((platform, cid))
  490. if not post:
  491. post = cache_index.get(("url", url))
  492. if not post:
  493. norm = _normalize_url(url)
  494. if norm:
  495. post = cache_index.get(("norm_url", norm))
  496. # 如果 parse_url 失败但 URL 索引命中了 post,从 post 推断 platform / cid
  497. if post and (not platform or not cid):
  498. platform = platform or post.get("channel") or "unknown"
  499. cid = cid or post.get("channel_content_id") or post.get("video_id") or url
  500. # 既无 parsed 又无 cache 命中:真 unmatched
  501. if not post and not platform:
  502. unmatched.append({
  503. "case_file": case_file.name,
  504. "url": url,
  505. "reason": "url_parse_failed",
  506. })
  507. continue
  508. key = (platform, str(cid) if cid else url)
  509. if key in seen_keys:
  510. continue
  511. seen_keys.add(key)
  512. if post:
  513. # 统一用 cache 中的 channel_content_id 生成 case_id
  514. # 这样保证 case_id 和 cache 中的 ID 一致
  515. actual_cid = post.get("channel_content_id") or post.get("video_id") or cid
  516. actual_case_id = f"{platform}_{actual_cid}"
  517. # 合并 video_transcript 到 body_text(视频帖前端展示需要)
  518. post_for_source = _merge_transcript_into_body(post)
  519. matched.append({
  520. "case_id": actual_case_id,
  521. "case_file": case_file.name,
  522. "platform": platform,
  523. "channel_content_id": str(actual_cid),
  524. "source_url": url,
  525. "evaluation": evaluation,
  526. "post": post_for_source,
  527. "comments": post.get("author_comments", []) or [],
  528. })
  529. else:
  530. unmatched.append({
  531. "case_id": f"{platform}_{cid}", # 统一格式的 ID
  532. "case_file": case_file.name,
  533. "platform": platform,
  534. "channel_content_id": cid,
  535. "source_url": url,
  536. "reason": "not_in_cache",
  537. })
  538. # 4. 合并已有数据和新匹配的数据
  539. all_sources = existing_sources + matched
  540. # 4.5. 自动 backfill 视频转写(保底兜底)
  541. # 触发条件:post 有视频源(extract_video_url 非空)且**完全没有 `video_transcript` 字段**
  542. # 空字符串视为"已尝试过"不重跑,跨平台统一。失败也会写 "",下次跳过避免反复浪费 Deepgram 额度。
  543. # 跑完写回所有 cache 文件,让其他 trace 引用同一 post 时直接复用。
  544. auto_transcribe_stats: Dict[str, Any] = {"attempted": 0, "succeeded": 0, "cache_writeback": 0}
  545. if auto_transcribe and all_sources:
  546. try:
  547. transcribe_targets = sum(
  548. 1 for s in all_sources
  549. if isinstance(s.get("post"), dict)
  550. and _needs_transcribe(s.get("platform"), s["post"])
  551. )
  552. if transcribe_targets > 0:
  553. logger.info("extract_sources: auto-transcribe %d post(s)", transcribe_targets)
  554. updates = asyncio.run(
  555. _transcribe_pending_async(all_sources, concurrency=transcribe_concurrency)
  556. )
  557. auto_transcribe_stats["attempted"] = transcribe_targets
  558. auto_transcribe_stats["succeeded"] = len(updates)
  559. # 写回 cache(跨 trace 扩散)
  560. if updates:
  561. n = _writeback_transcript_to_cache(cache_dir, updates)
  562. auto_transcribe_stats["cache_writeback"] = n
  563. # 顺手把 transcript merge 进 body_text,保持跟 _merge_transcript_into_body 一致
  564. for s in all_sources:
  565. post = s.get("post")
  566. if not isinstance(post, dict):
  567. continue
  568. if not post.get("video_transcript"):
  569. continue
  570. merged = _merge_transcript_into_body(post)
  571. if merged is not post:
  572. post["body_text"] = merged.get("body_text", post.get("body_text", ""))
  573. except RuntimeError as e:
  574. # 比如已在 event loop 内 — 跳过 auto-transcribe 不阻塞主流程
  575. logger.warning("auto-transcribe skipped: %s", e)
  576. except Exception as e:
  577. logger.warning("auto-transcribe failed: %s", e)
  578. # 5. 廉价预筛:body_text 完整性 / 时效(质量门槛交给下游 LLM rubric 评估)
  579. from datetime import datetime as _dt
  580. cutoff_ts = int(_dt(*cutoff_date).timestamp())
  581. kept_sources: List[Dict[str, Any]] = []
  582. filtered_sources: List[Dict[str, Any]] = []
  583. reason_counts: Dict[str, int] = {}
  584. for s in all_sources:
  585. reason = _check_filters(s, cutoff_ts, min_body_len)
  586. if reason is None:
  587. kept_sources.append(s)
  588. else:
  589. # 记录过滤原因
  590. s_copy = dict(s)
  591. s_copy["filter_reason"] = reason
  592. filtered_sources.append(s_copy)
  593. # 统计原因类型(只取冒号前的类别)
  594. category = reason.split(":", 1)[0]
  595. reason_counts[category] = reason_counts.get(category, 0) + 1
  596. all_sources = kept_sources
  597. filtered_count = len(filtered_sources)
  598. # 6. 转换 timestamp 为可读格式
  599. _convert_timestamps(all_sources)
  600. _convert_timestamps(filtered_sources)
  601. # 7. 写 source.json
  602. output = {
  603. "total": len(all_sources),
  604. "cache_dir": str(cache_dir),
  605. "sources": all_sources,
  606. }
  607. output_file.parent.mkdir(parents=True, exist_ok=True)
  608. with open(output_file, "w", encoding="utf-8") as f:
  609. json.dump(output, f, ensure_ascii=False, indent=2)
  610. # 8. 写 filtered_cases.json(被过滤掉的帖子,按原因分组)
  611. if filtered_sources:
  612. for fs in filtered_sources:
  613. key = (fs.get("platform"), fs.get("channel_content_id"))
  614. if key not in existing_filtered_ids:
  615. existing_filtered_sources.append(fs)
  616. existing_filtered_ids.add(key)
  617. # 按原因类别分组
  618. by_reason: Dict[str, List[Dict[str, Any]]] = {}
  619. for fs in existing_filtered_sources:
  620. reason = fs.get("filter_reason", "unknown")
  621. category = reason.split(":", 1)[0]
  622. by_reason.setdefault(category, []).append(fs)
  623. filtered_output = {
  624. "total": len(existing_filtered_sources),
  625. "by_reason": {
  626. category: {
  627. "count": len(items),
  628. "sources": items,
  629. }
  630. for category, items in by_reason.items()
  631. },
  632. }
  633. with open(filtered_output_file, "w", encoding="utf-8") as f:
  634. json.dump(filtered_output, f, ensure_ascii=False, indent=2)
  635. # 9. 下载图片到 raw_cases/images/{case_id}/
  636. images_downloaded = download_images_for_sources(matched, raw_cases_dir)
  637. # 10. 构建被过滤条目的摘要(供续跑 feedback 使用)
  638. filtered_details: List[Dict[str, Any]] = []
  639. for fs in filtered_sources:
  640. post = fs.get("post", {}) or {}
  641. title = post.get("title") or fs.get("source_url", "")
  642. filtered_details.append({
  643. "case_id": fs.get("case_id", ""),
  644. "platform": fs.get("platform", ""),
  645. "title": title[:60] if title else "",
  646. "filter_reason": fs.get("filter_reason", ""),
  647. })
  648. # 返回统计信息
  649. return {
  650. "total_matched": len(matched),
  651. "total_existing": len(existing_sources),
  652. "total_unmatched": len(unmatched),
  653. "filtered_total": filtered_count,
  654. "filtered_reasons": reason_counts,
  655. "filtered_details": filtered_details,
  656. "images_downloaded": images_downloaded,
  657. "auto_transcribe": auto_transcribe_stats,
  658. "output_file": str(output_file),
  659. }
  660. # ── 图片下载 ────────────────────────────────
  661. def _get_image_urls_from_post(post: Dict[str, Any]) -> List[str]:
  662. """从 post 中提取所有图片 URL"""
  663. urls = []
  664. images = post.get("images", [])
  665. if isinstance(images, list):
  666. for img in images:
  667. if isinstance(img, str) and img.startswith("http"):
  668. urls.append(img)
  669. elif isinstance(img, dict) and "url" in img:
  670. urls.append(img["url"])
  671. image_url_list = post.get("image_url_list", [])
  672. if isinstance(image_url_list, list):
  673. for img_obj in image_url_list:
  674. if isinstance(img_obj, dict) and "image_url" in img_obj:
  675. urls.append(img_obj["image_url"])
  676. return urls
  677. def _format_timestamp(ts: Any) -> Optional[str]:
  678. """将时间戳(秒/毫秒)转换为可读格式"""
  679. from datetime import datetime
  680. if ts is None or ts == 0 or ts == "":
  681. return None
  682. try:
  683. ts = int(ts)
  684. # 毫秒级时间戳
  685. if ts > 1000000000000:
  686. ts = ts / 1000
  687. dt = datetime.fromtimestamp(ts)
  688. return dt.strftime("%Y-%m-%d %H:%M:%S")
  689. except Exception:
  690. return None
  691. def _convert_timestamps(sources: List[Dict[str, Any]]) -> None:
  692. """将 source 列表中 post 的时间戳字段替换为可读格式"""
  693. timestamp_fields = ["publish_timestamp", "modify_timestamp", "update_timestamp"]
  694. for src in sources:
  695. post = src.get("post", {})
  696. if not isinstance(post, dict):
  697. continue
  698. for field in timestamp_fields:
  699. if field in post:
  700. readable = _format_timestamp(post.get(field))
  701. if readable:
  702. post[field] = readable
  703. def download_images_for_sources(sources: List[Dict[str, Any]], raw_cases_dir: Path) -> int:
  704. """
  705. 为新匹配的 sources 下载图片到 raw_cases/images/{case_id}/
  706. Returns:
  707. 下载成功的图片总数
  708. """
  709. import urllib.request
  710. import urllib.error
  711. images_base = raw_cases_dir / "images"
  712. total_downloaded = 0
  713. # 设置 headers 避免被拒(X/Twitter 需要 User-Agent)
  714. opener = urllib.request.build_opener()
  715. opener.addheaders = [("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36")]
  716. urllib.request.install_opener(opener)
  717. for src in sources:
  718. case_id = src.get("case_id", "")
  719. post = src.get("post", {})
  720. image_urls = _get_image_urls_from_post(post)
  721. if not image_urls:
  722. continue
  723. case_dir = images_base / case_id
  724. case_dir.mkdir(parents=True, exist_ok=True)
  725. for idx, url in enumerate(image_urls):
  726. # 确定文件扩展名
  727. ext = ".jpg"
  728. if ".png" in url.lower():
  729. ext = ".png"
  730. elif ".webp" in url.lower():
  731. ext = ".webp"
  732. save_path = case_dir / f"{idx:02d}{ext}"
  733. if save_path.exists():
  734. total_downloaded += 1
  735. continue
  736. try:
  737. urllib.request.urlretrieve(url, str(save_path))
  738. total_downloaded += 1
  739. except Exception:
  740. # 下载失败就跳过,不中断流程
  741. pass
  742. return total_downloaded
  743. if __name__ == "__main__":
  744. # CLI:python extract_sources.py <raw_cases_dir> [cache_dir]
  745. import sys
  746. if len(sys.argv) < 2:
  747. print("Usage: python extract_sources.py <raw_cases_dir> [cache_dir]")
  748. sys.exit(1)
  749. raw_cases_dir = Path(sys.argv[1])
  750. cache_dir = Path(sys.argv[2]) if len(sys.argv) > 2 else None
  751. result = extract_sources_to_json(raw_cases_dir, cache_dir=cache_dir)
  752. print(f"[OK] Matched: {result['total_matched']}, Unmatched: {result['total_unmatched']}")
  753. print(f" Output: {raw_cases_dir / 'source.json'}")