extract_sources.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907
  1. """
  2. 从 raw_cases/case_*.json 中提取 source_url / 帖子链接,
  3. 解析 channel_content_id,再从 .cache/content_search 中查找对应的原始帖子数据。
  4. 主函数:extract_sources_to_json(raw_cases_dir)
  5. - 扫描该目录下所有 case_{platform}.json
  6. - 解析每个 "工序发现[].帖子链接"(新格式)或 "cases[].source_url"(旧格式)
  7. - 从项目根的 .cache/content_search/*.json 中匹配 channel_content_id
  8. - 把匹配到的完整 post 写入 {raw_cases_dir}/source.json
  9. - 同时下载图片到 {raw_cases_dir}/images/{case_id}/
  10. """
  11. import json
  12. import logging
  13. import re
  14. from pathlib import Path
  15. from typing import Any, Dict, List, Optional, Tuple
  16. import asyncio
  17. import aiohttp
  18. from urllib.parse import urlparse, parse_qs, urlencode
  19. logger = logging.getLogger(__name__)
  20. # ── URL → (platform, content_id) 解析 ────────────────────────────────
  21. _URL_PATTERNS = [
  22. # B站: https://www.bilibili.com/video/BV1xxx
  23. ("bili", re.compile(r"bilibili\.com/video/(BV[\w]+)")),
  24. # 小红书: https://www.xiaohongshu.com/explore/{id} 或 /discovery/item/{id}
  25. ("xhs", re.compile(r"xiaohongshu\.com/(?:explore|discovery/item)/([a-f0-9]+)")),
  26. # YouTube: https://www.youtube.com/watch?v={id} 或 https://youtu.be/{id}
  27. ("youtube", re.compile(r"(?:youtube\.com/watch\?v=|youtu\.be/)([\w-]+)")),
  28. # X/Twitter: https://x.com/{user}/status/{id} 或 twitter.com
  29. ("x", re.compile(r"(?:x\.com|twitter\.com)/[^/]+/status/(\d+)")),
  30. # 知乎: https://zhuanlan.zhihu.com/p/{id} 或 zhihu.com/question/{qid}/answer/{aid}
  31. ("zhihu", re.compile(r"zhuanlan\.zhihu\.com/p/(\d+)")),
  32. ("zhihu", re.compile(r"zhihu\.com/question/\d+/answer/(\d+)")),
  33. # 公众号: 通过 __biz 或整个 URL 作为 id(后备)
  34. ("gzh", re.compile(r"mp\.weixin\.qq\.com/s[/?]([^\s\"']+)")),
  35. # 抖音: https://www.douyin.com/video/{numeric_id} — content_id 直接匹配
  36. # douyin post.channel_content_id
  37. ("douyin", re.compile(r"douyin\.com/video/(\d+)")),
  38. # 视频号: post.link 是 'export/{base64-like token}' 不是 http URL;
  39. # 这里识别出来打上 sph 标签,真实定位走 ("url", link) 索引兜底(见 main 流程)
  40. ("sph", re.compile(r"^export/([A-Za-z0-9+/=_\-]+)$")),
  41. ]
  42. def parse_url(url: str) -> Optional[Tuple[str, str]]:
  43. """从 URL 解析出 (platform, content_id)。返回 None 表示无法解析。"""
  44. if not url or not isinstance(url, str):
  45. return None
  46. for platform, pat in _URL_PATTERNS:
  47. m = pat.search(url)
  48. if m:
  49. return platform, m.group(1)
  50. return None
  51. # ── 从 case 文件中抽取所有条目(URL + evaluation) ────────────────────────────────
  52. def extract_entries_from_case(case_data: Any) -> List[Dict[str, Any]]:
  53. """
  54. 从 case 数据中抽取条目,每条包含 url 和 agent 的 evaluation。
  55. 返回: [{"url": str, "evaluation": dict | None, "title": str | None, "case_id": str | None}]
  56. """
  57. entries: List[Dict[str, Any]] = []
  58. if not isinstance(case_data, dict):
  59. return entries
  60. # 新格式:工序发现[].帖子链接
  61. for item in case_data.get("工序发现", []) or []:
  62. if isinstance(item, dict):
  63. link = item.get("帖子链接") or item.get("source_url")
  64. if link:
  65. entries.append({
  66. "url": link,
  67. "evaluation": item.get("evaluation"),
  68. "title": item.get("title"),
  69. "case_id": item.get("case_id"),
  70. })
  71. # 主格式:cases[]
  72. for item in case_data.get("cases", []) or []:
  73. if isinstance(item, dict):
  74. link = item.get("source_url") or item.get("帖子链接")
  75. if link:
  76. entries.append({
  77. "url": link,
  78. "evaluation": item.get("evaluation"),
  79. "title": item.get("title"),
  80. "case_id": item.get("case_id"),
  81. })
  82. return entries
  83. def extract_urls_from_case(case_data: Any) -> List[str]:
  84. """[Legacy] 旧接口,保留供可能的外部调用。内部改用 extract_entries_from_case。"""
  85. return [e["url"] for e in extract_entries_from_case(case_data)]
  86. # ── 过滤规则(统一入口) ────────────────────────────────
  87. # 默认阈值:body_text 最少字符数、agent 评分下限
  88. DEFAULT_MIN_BODY_LEN = 30
  89. DEFAULT_MIN_SCORE = 70.0
  90. DEFAULT_CUTOFF_DATE = (2025, 10, 1)
  91. _TRANSCRIPT_MARKER = "[视频字幕]"
  92. def _merge_transcript_into_body(post: Dict[str, Any]) -> Dict[str, Any]:
  93. """Return a shallow copy of `post` where `body_text` includes the video transcript.
  94. For sph/douyin/x video posts whose original body_text is just hashtags or empty,
  95. this exposes the real video content (Deepgram transcript or YouTube captions)
  96. under the field the frontend reads. The original transcript field is left intact
  97. so downstream code that wants it separately still works.
  98. Idempotent: if body_text already contains the `[视频字幕]` marker, skip.
  99. """
  100. if not isinstance(post, dict):
  101. return post
  102. transcript = post.get("video_transcript") or post.get("captions") or ""
  103. if not isinstance(transcript, str) or not transcript.strip():
  104. return post
  105. transcript = transcript.strip()
  106. body = post.get("body_text") or post.get("desc") or ""
  107. body = body.strip() if isinstance(body, str) else ""
  108. if body and _TRANSCRIPT_MARKER in body:
  109. return post # 已经合并过
  110. merged = dict(post) # shallow copy — body_text 是 top-level field
  111. if body:
  112. merged["body_text"] = f"{body}\n\n{_TRANSCRIPT_MARKER}\n{transcript}"
  113. else:
  114. merged["body_text"] = f"{_TRANSCRIPT_MARKER}\n{transcript}"
  115. return merged
  116. def _needs_transcribe(platform: str, post: Dict[str, Any]) -> bool:
  117. """是否需要给这条 post 跑 Deepgram 转写。
  118. 语义(用户明确约束):
  119. - `video_transcript` 字段**缺失** → 视为"从未尝试",需要跑
  120. - `video_transcript` 字段存在(即使为空字符串 "")→ 视为"已尝试过",跳过
  121. (Deepgram 对纯音乐/无人声视频会返回空,这是合法的"失败"标记,不重跑)
  122. - 必须有视频源(用 transcription.extract_video_url 统一判断,跨平台)
  123. """
  124. if not isinstance(post, dict):
  125. return False
  126. if "video_transcript" in post:
  127. return False
  128. try:
  129. from agent.tools.builtin.content.transcription import extract_video_url
  130. return bool(extract_video_url(platform, post))
  131. except Exception:
  132. return False
  133. async def _transcribe_one_post(
  134. platform: str,
  135. post: Dict[str, Any],
  136. sem: asyncio.Semaphore,
  137. ) -> Optional[str]:
  138. """对一条 post 跑 transcribe_video_from_post,写回 post["video_transcript"]。
  139. 成功 → post["video_transcript"] = transcript
  140. 失败 → post["video_transcript"] = "" # 明确"已尝试"标记,避免后续 backfill 重跑
  141. 返回 transcript 或 None。
  142. """
  143. from agent.tools.builtin.content.transcription import transcribe_video_from_post
  144. async with sem:
  145. try:
  146. text = await transcribe_video_from_post(platform, post)
  147. except Exception as e:
  148. logger.warning("transcribe failed (%s): %s", platform, e)
  149. text = None
  150. if text:
  151. post["video_transcript"] = text
  152. return text
  153. # 失败也写一个 "",表示"我们尝试过 Deepgram 但没拿到结果"
  154. post["video_transcript"] = ""
  155. return None
  156. async def _transcribe_pending_async(
  157. matched: List[Dict[str, Any]],
  158. concurrency: int = 3,
  159. ) -> Dict[Tuple[str, str], str]:
  160. """对 matched 里所有"缺 video_transcript 字段 + 有视频源"的 post 并发跑 Deepgram。
  161. 返回成功的 {(platform, channel_content_id): transcript_text} 映射,
  162. 供调用方写回 cache 文件(让其他 trace 命中同一 post 时复用)。
  163. """
  164. targets: List[Tuple[str, Dict[str, Any]]] = []
  165. for src in matched:
  166. platform = src.get("platform")
  167. post = src.get("post")
  168. if not isinstance(post, dict) or not platform:
  169. continue
  170. if _needs_transcribe(platform, post):
  171. targets.append((platform, post))
  172. if not targets:
  173. return {}
  174. logger.info("Auto-transcribe: %d post(s) pending", len(targets))
  175. sem = asyncio.Semaphore(concurrency)
  176. results = await asyncio.gather(
  177. *[_transcribe_one_post(p, post, sem) for p, post in targets]
  178. )
  179. updates: Dict[Tuple[str, str], str] = {}
  180. success_n = 0
  181. for (platform, post), text in zip(targets, results):
  182. if text:
  183. success_n += 1
  184. cid = post.get("channel_content_id") or post.get("video_id")
  185. if cid:
  186. updates[(platform, str(cid))] = text
  187. logger.info("Auto-transcribe: %d/%d success", success_n, len(targets))
  188. return updates
  189. def _writeback_transcript_to_cache(
  190. cache_dir: Path,
  191. updates: Dict[Tuple[str, str], str],
  192. ) -> int:
  193. """把新拿到的 transcript 写回所有 cache 文件里匹配的 post。
  194. 跨 trace 扩散:同一条 video 可能被多个 trace 的搜索引用过,这里一次写回所有
  195. cache 副本,避免下次另一个 trace 跑 extract_sources 时又触发一遍 Deepgram。
  196. 返回 cache 中被更新的 post 总数。
  197. """
  198. if not updates or not cache_dir.exists():
  199. return 0
  200. written = 0
  201. for cf in cache_dir.glob("*.json"):
  202. try:
  203. data = json.loads(cf.read_text(encoding="utf-8"))
  204. except Exception:
  205. continue
  206. dirty = False
  207. for key, entry in data.items():
  208. if not key.startswith("search:") or not isinstance(entry, dict):
  209. continue
  210. platform = key.split(":", 1)[1]
  211. post_lists = []
  212. for h in entry.get("history", []) or []:
  213. post_lists.append(h.get("posts", []))
  214. if "posts" in entry and isinstance(entry["posts"], list):
  215. post_lists.append(entry["posts"])
  216. for posts in post_lists:
  217. for post in posts or []:
  218. if not isinstance(post, dict):
  219. continue
  220. cid = post.get("channel_content_id") or post.get("video_id")
  221. if not cid:
  222. continue
  223. text = updates.get((platform, str(cid)))
  224. # 注意:用 "video_transcript" not in post 判断,跟 _needs_transcribe 语义一致
  225. # 已经有字段(即使为空)→ 不覆盖,尊重之前的"已尝试"状态
  226. if text and "video_transcript" not in post:
  227. post["video_transcript"] = text
  228. dirty = True
  229. written += 1
  230. if dirty:
  231. cf.write_text(
  232. json.dumps(data, ensure_ascii=False, indent=2),
  233. encoding="utf-8",
  234. )
  235. return written
  236. def _is_before_cutoff(source: Dict[str, Any], cutoff_ts: int) -> bool:
  237. """判断帖子是否早于截止时间戳(秒级)
  238. 如果 timestamp 为 0 或不存在,返回 False(保留)
  239. """
  240. post = source.get("post", {})
  241. if not isinstance(post, dict):
  242. return False
  243. ts = post.get("publish_timestamp")
  244. # 没有 timestamp 或为 0,保留
  245. if not ts or ts == 0:
  246. return False
  247. try:
  248. ts = int(ts)
  249. # 毫秒级转秒级
  250. if ts > 1000000000000:
  251. ts = ts / 1000
  252. return ts < cutoff_ts
  253. except (ValueError, TypeError):
  254. pass
  255. # 尝试解析字符串格式 "2025-05-02 19:25:30"
  256. try:
  257. from datetime import datetime
  258. dt = datetime.strptime(str(ts), "%Y-%m-%d %H:%M:%S")
  259. return dt.timestamp() < cutoff_ts
  260. except Exception:
  261. # 解析失败,保留
  262. return False
  263. def _check_filters(
  264. source: Dict[str, Any],
  265. cutoff_ts: int,
  266. min_body_len: int,
  267. min_score: float,
  268. ) -> Optional[str]:
  269. """
  270. 对一条 source 逐条检查过滤规则。
  271. 返回:
  272. None —— 条目合格
  273. 非 None 字符串 —— 不合格的原因(写入 filter_reason)
  274. """
  275. post = source.get("post", {}) or {}
  276. # 1. 内容完整性:视频帖把 video_transcript 也计入(视频帖 body_text 通常为空)
  277. body = post.get("body_text") or post.get("desc") or ""
  278. transcript = post.get("video_transcript") or post.get("captions") or ""
  279. body_len = len(body.strip()) if isinstance(body, str) else 0
  280. transcript_len = len(transcript.strip()) if isinstance(transcript, str) else 0
  281. total_len = body_len + transcript_len
  282. if total_len < min_body_len:
  283. return f"missing_body_text:body={body_len},transcript={transcript_len}"
  284. # 2. agent 评分
  285. evaluation = source.get("evaluation")
  286. if not isinstance(evaluation, dict):
  287. return "missing_evaluation"
  288. quality = evaluation.get("quality")
  289. if not isinstance(quality, dict):
  290. return "missing_evaluation"
  291. score = quality.get("overall_score")
  292. if not isinstance(score, (int, float)):
  293. return "invalid_score"
  294. if score < min_score:
  295. return f"low_score:{score}"
  296. # 3. 过时(复用原 _is_before_cutoff 对时间戳的解析)
  297. if _is_before_cutoff(source, cutoff_ts):
  298. return "outdated"
  299. return None
  300. # ── 从 cache 中构建 (platform, content_id) → post 索引 ────────────────────────────────
  301. def _normalize_url(url: str) -> Optional[str]:
  302. """规范化 URL:排序 query 参数,去掉尾斜杠"""
  303. try:
  304. parsed = urlparse(url)
  305. if parsed.query:
  306. # 排序 query 参数
  307. params = parse_qs(parsed.query, keep_blank_values=True)
  308. sorted_query = urlencode(sorted(params.items()), doseq=True)
  309. normalized = f"{parsed.scheme}://{parsed.netloc}{parsed.path}?{sorted_query}"
  310. else:
  311. normalized = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
  312. return normalized.rstrip('/')
  313. except:
  314. return None
  315. def _normalize_post_in_place(platform: str, post: Dict[str, Any]) -> None:
  316. """对 cache 里读出的 post 做平台相关的字段补齐(in-place)。
  317. 早期 cache 可能在 platform-level normalize 函数加上之前就写入了,此处兜底补救:
  318. YouTube: description_snippet -> body_text / thumbnails -> images / url -> videos / ...
  319. sph: title (caption) -> body_text (视频号 title 字段塞的是整段 caption)
  320. """
  321. if platform == "youtube":
  322. try:
  323. from agent.tools.builtin.content.platforms.youtube import _normalize_youtube_post
  324. _normalize_youtube_post(post)
  325. except Exception:
  326. pass
  327. elif platform == "sph":
  328. try:
  329. from agent.tools.builtin.content.platforms.aigc_channel import _normalize_sph_post
  330. _normalize_sph_post(post)
  331. except Exception:
  332. pass
  333. def build_cache_index(cache_dir: Path, trace_ids: Optional[List[str]] = None) -> Dict[Tuple[str, str], Dict[str, Any]]:
  334. """
  335. 构建 (platform, channel_content_id) -> post 映射。
  336. Args:
  337. cache_dir: cache 目录路径
  338. trace_ids: 可选的 trace_id 列表。如果提供,只加载这些特定的 cache 文件;
  339. 否则扫描所有 cache 文件
  340. Returns:
  341. (platform, content_id) -> post 的映射字典
  342. """
  343. index: Dict[Tuple[str, str], Dict[str, Any]] = {}
  344. if not cache_dir.exists():
  345. return index
  346. # 如果提供了 trace_ids,只加载这些特定文件
  347. if trace_ids:
  348. cache_files = [cache_dir / f"{tid}.json" for tid in trace_ids if tid]
  349. cache_files = [f for f in cache_files if f.exists()]
  350. else:
  351. # 否则扫描所有 cache 文件
  352. cache_files = list(cache_dir.glob("*.json"))
  353. for cache_file in cache_files:
  354. try:
  355. with open(cache_file, "r", encoding="utf-8") as f:
  356. data = json.load(f)
  357. except Exception:
  358. continue
  359. for key, entry in data.items():
  360. if not key.startswith("search:"):
  361. continue
  362. platform = key.split(":", 1)[1]
  363. # 新格式:entry = {"history": [...], "latest_index": n}
  364. # 旧格式:entry = {"keyword": ..., "posts": [...]}
  365. if isinstance(entry, dict) and "history" in entry:
  366. post_lists = [h.get("posts", []) for h in entry.get("history", [])]
  367. elif isinstance(entry, dict) and "posts" in entry:
  368. post_lists = [entry.get("posts", [])]
  369. else:
  370. continue
  371. for posts in post_lists:
  372. for post in posts or []:
  373. if not isinstance(post, dict):
  374. continue
  375. # 平台字段 normalize:兜底救援早期 cache(normalize 函数加之前写入的)
  376. _normalize_post_in_place(platform, post)
  377. cid = post.get("channel_content_id")
  378. # YouTube 平台用 video_id 而非 channel_content_id
  379. # (normalize 已经处理过,这里是双保险,对早期未 normalize 的 post 也兜底)
  380. if not cid and post.get("video_id"):
  381. cid = post.get("video_id")
  382. post["channel_content_id"] = cid # 补全字段
  383. if cid:
  384. # 用 (platform, content_id) 作为索引键
  385. index[(platform, str(cid))] = post
  386. # 额外用 link / url 字段建立索引(用于 GZH 等平台)
  387. link = post.get("link") or post.get("url") or ""
  388. if link:
  389. index[("url", link)] = post
  390. # 规范化 URL 索引(排序 query 参数,去掉尾斜杠)
  391. norm = _normalize_url(link)
  392. if norm:
  393. index[("norm_url", norm)] = post
  394. return index
  395. # ── 主入口 ────────────────────────────────
  396. def extract_sources_to_json(
  397. raw_cases_dir: Path,
  398. cache_dir: Optional[Path] = None,
  399. output_name: str = "source.json",
  400. trace_ids: Optional[List[str]] = None,
  401. min_body_len: int = DEFAULT_MIN_BODY_LEN,
  402. min_score: float = DEFAULT_MIN_SCORE,
  403. cutoff_date: Tuple[int, int, int] = DEFAULT_CUTOFF_DATE,
  404. auto_transcribe: bool = True,
  405. transcribe_concurrency: int = 3,
  406. ) -> Dict[str, Any]:
  407. """
  408. 扫描 raw_cases_dir 下的 case_*.json,
  409. 从 cache 中找出原始帖子,并对结果执行统一过滤(body_text / 评分 / 时效)。
  410. 过滤规则(均可通过参数调整):
  411. - body_text 为空或少于 min_body_len 字符 → filter_reason=missing_body_text
  412. - agent evaluation 缺失或 weighted_score < min_score → filter_reason=missing_evaluation / invalid_score / low_score
  413. - publish_timestamp 早于 cutoff_date → filter_reason=outdated
  414. 参数:
  415. min_body_len: body_text 最少字符数,默认 30
  416. min_score: agent 评分下限,默认 70
  417. cutoff_date: 过时截止日期 (year, month, day),默认 (2025, 10, 1)
  418. 返回统计信息 dict。
  419. """
  420. raw_cases_dir = Path(raw_cases_dir)
  421. if cache_dir is None:
  422. # 项目根目录:script 文件往上三级
  423. project_root = Path(__file__).resolve().parent.parent.parent.parent
  424. cache_dir = project_root / ".cache" / "content_search"
  425. cache_dir = Path(cache_dir)
  426. # 1. 构建 cache 索引
  427. cache_index = build_cache_index(cache_dir, trace_ids=trace_ids)
  428. # 2. 加载已有的 source.json(如果存在)
  429. output_file = raw_cases_dir / output_name
  430. existing_sources = []
  431. existing_ids = set() # (platform, channel_content_id) 集合用于去重
  432. if output_file.exists():
  433. try:
  434. with open(output_file, "r", encoding="utf-8") as f:
  435. existing_data = json.load(f)
  436. existing_sources = existing_data.get("sources", [])
  437. # 构建已有的 ID 集合
  438. for src in existing_sources:
  439. key = (src.get("platform"), src.get("channel_content_id"))
  440. existing_ids.add(key)
  441. except Exception as e:
  442. print(f"Warning: Failed to load existing source.json: {e}")
  443. # 2.1 加载已有的 filtered_cases.json(如果存在)
  444. filtered_output_file = raw_cases_dir / "filtered_cases.json"
  445. existing_filtered_sources = []
  446. existing_filtered_ids = set()
  447. if filtered_output_file.exists():
  448. try:
  449. with open(filtered_output_file, "r", encoding="utf-8") as f:
  450. filtered_data = json.load(f)
  451. existing_filtered_sources = filtered_data.get("sources", [])
  452. for src in existing_filtered_sources:
  453. key = (src.get("platform"), src.get("channel_content_id"))
  454. existing_filtered_ids.add(key)
  455. except Exception as e:
  456. print(f"Warning: Failed to load existing filtered_cases.json: {e}")
  457. # 3. 扫描所有 case 文件
  458. matched: List[Dict[str, Any]] = []
  459. unmatched: List[Dict[str, Any]] = []
  460. seen_keys: set = set(existing_ids) # 从已有的 ID 开始
  461. for case_file in sorted(raw_cases_dir.glob("case_*.json")):
  462. # 跳过自己(如果 source.json 误被命名成 case_*)
  463. if case_file.name == output_name:
  464. continue
  465. try:
  466. with open(case_file, "r", encoding="utf-8") as f:
  467. case_data = json.load(f)
  468. except Exception as e:
  469. # 尝试自动修复 JSON 格式错误
  470. try:
  471. from examples.process_pipeline.script.fix_json_quotes import try_fix_and_parse
  472. with open(case_file, "r", encoding="utf-8") as f:
  473. raw_content = f.read()
  474. success, case_data, fix_desc = try_fix_and_parse(raw_content)
  475. if success:
  476. # 修复成功,写回文件
  477. with open(case_file, "w", encoding="utf-8") as f:
  478. json.dump(case_data, f, ensure_ascii=False, indent=2)
  479. print(f" 🔧 [Auto-Fix] Fixed {case_file.name}: {fix_desc}")
  480. else:
  481. unmatched.append({"case_file": case_file.name, "error": str(e)})
  482. continue
  483. except Exception:
  484. unmatched.append({"case_file": case_file.name, "error": str(e)})
  485. continue
  486. entries = extract_entries_from_case(case_data)
  487. for entry in entries:
  488. url = entry["url"]
  489. evaluation = entry.get("evaluation")
  490. # 解析 URL 得到 platform 和 content_id(可能失败,例如 sph 的 export/... token)
  491. parsed = parse_url(url)
  492. platform: Optional[str] = parsed[0] if parsed else None
  493. cid: Optional[str] = parsed[1] if parsed else None
  494. # 多级匹配,按优先级:
  495. # 1. (platform, content_id) 精确匹配(parse_url 成功的情况)
  496. # 2. ("url", link) 完整字符串匹配(兜底,对 sph 这种非 http URL 必需)
  497. # 3. ("norm_url", normalized) 规范化匹配
  498. post = None
  499. if platform and cid:
  500. post = cache_index.get((platform, cid))
  501. if not post:
  502. post = cache_index.get(("url", url))
  503. if not post:
  504. norm = _normalize_url(url)
  505. if norm:
  506. post = cache_index.get(("norm_url", norm))
  507. # 如果 parse_url 失败但 URL 索引命中了 post,从 post 推断 platform / cid
  508. if post and (not platform or not cid):
  509. platform = platform or post.get("channel") or "unknown"
  510. cid = cid or post.get("channel_content_id") or post.get("video_id") or url
  511. # 既无 parsed 又无 cache 命中:真 unmatched
  512. if not post and not platform:
  513. unmatched.append({
  514. "case_file": case_file.name,
  515. "url": url,
  516. "reason": "url_parse_failed",
  517. })
  518. continue
  519. key = (platform, str(cid) if cid else url)
  520. if key in seen_keys:
  521. continue
  522. seen_keys.add(key)
  523. if post:
  524. # 统一用 cache 中的 channel_content_id 生成 case_id
  525. # 这样保证 case_id 和 cache 中的 ID 一致
  526. actual_cid = post.get("channel_content_id") or post.get("video_id") or cid
  527. actual_case_id = f"{platform}_{actual_cid}"
  528. # 合并 video_transcript 到 body_text(视频帖前端展示需要)
  529. post_for_source = _merge_transcript_into_body(post)
  530. matched.append({
  531. "case_id": actual_case_id,
  532. "case_file": case_file.name,
  533. "platform": platform,
  534. "channel_content_id": str(actual_cid),
  535. "source_url": url,
  536. "evaluation": evaluation,
  537. "post": post_for_source,
  538. "comments": post.get("author_comments", []) or [],
  539. })
  540. else:
  541. unmatched.append({
  542. "case_id": f"{platform}_{cid}", # 统一格式的 ID
  543. "case_file": case_file.name,
  544. "platform": platform,
  545. "channel_content_id": cid,
  546. "source_url": url,
  547. "reason": "not_in_cache",
  548. })
  549. # 4. 合并已有数据和新匹配的数据
  550. all_sources = existing_sources + matched
  551. # 4.5. 自动 backfill 视频转写(保底兜底)
  552. # 触发条件:post 有视频源(extract_video_url 非空)且**完全没有 `video_transcript` 字段**
  553. # 空字符串视为"已尝试过"不重跑,跨平台统一。失败也会写 "",下次跳过避免反复浪费 Deepgram 额度。
  554. # 跑完写回所有 cache 文件,让其他 trace 引用同一 post 时直接复用。
  555. auto_transcribe_stats: Dict[str, Any] = {"attempted": 0, "succeeded": 0, "cache_writeback": 0}
  556. if auto_transcribe and all_sources:
  557. try:
  558. transcribe_targets = sum(
  559. 1 for s in all_sources
  560. if isinstance(s.get("post"), dict)
  561. and _needs_transcribe(s.get("platform"), s["post"])
  562. )
  563. if transcribe_targets > 0:
  564. logger.info("extract_sources: auto-transcribe %d post(s)", transcribe_targets)
  565. updates = asyncio.run(
  566. _transcribe_pending_async(all_sources, concurrency=transcribe_concurrency)
  567. )
  568. auto_transcribe_stats["attempted"] = transcribe_targets
  569. auto_transcribe_stats["succeeded"] = len(updates)
  570. # 写回 cache(跨 trace 扩散)
  571. if updates:
  572. n = _writeback_transcript_to_cache(cache_dir, updates)
  573. auto_transcribe_stats["cache_writeback"] = n
  574. # 顺手把 transcript merge 进 body_text,保持跟 _merge_transcript_into_body 一致
  575. for s in all_sources:
  576. post = s.get("post")
  577. if not isinstance(post, dict):
  578. continue
  579. if not post.get("video_transcript"):
  580. continue
  581. merged = _merge_transcript_into_body(post)
  582. if merged is not post:
  583. post["body_text"] = merged.get("body_text", post.get("body_text", ""))
  584. except RuntimeError as e:
  585. # 比如已在 event loop 内 — 跳过 auto-transcribe 不阻塞主流程
  586. logger.warning("auto-transcribe skipped: %s", e)
  587. except Exception as e:
  588. logger.warning("auto-transcribe failed: %s", e)
  589. # 5. 统一过滤:body_text 完整性 / agent 评分 / 时效
  590. from datetime import datetime as _dt
  591. cutoff_ts = int(_dt(*cutoff_date).timestamp())
  592. kept_sources: List[Dict[str, Any]] = []
  593. filtered_sources: List[Dict[str, Any]] = []
  594. reason_counts: Dict[str, int] = {}
  595. for s in all_sources:
  596. reason = _check_filters(s, cutoff_ts, min_body_len, min_score)
  597. if reason is None:
  598. kept_sources.append(s)
  599. else:
  600. # 记录过滤原因
  601. s_copy = dict(s)
  602. s_copy["filter_reason"] = reason
  603. filtered_sources.append(s_copy)
  604. # 统计原因类型(只取冒号前的类别)
  605. category = reason.split(":", 1)[0]
  606. reason_counts[category] = reason_counts.get(category, 0) + 1
  607. all_sources = kept_sources
  608. filtered_count = len(filtered_sources)
  609. # 6. 转换 timestamp 为可读格式
  610. _convert_timestamps(all_sources)
  611. _convert_timestamps(filtered_sources)
  612. # 7. 写 source.json
  613. output = {
  614. "total": len(all_sources),
  615. "cache_dir": str(cache_dir),
  616. "sources": all_sources,
  617. }
  618. output_file.parent.mkdir(parents=True, exist_ok=True)
  619. with open(output_file, "w", encoding="utf-8") as f:
  620. json.dump(output, f, ensure_ascii=False, indent=2)
  621. # 8. 写 filtered_cases.json(被过滤掉的帖子,按原因分组)
  622. if filtered_sources:
  623. for fs in filtered_sources:
  624. key = (fs.get("platform"), fs.get("channel_content_id"))
  625. if key not in existing_filtered_ids:
  626. existing_filtered_sources.append(fs)
  627. existing_filtered_ids.add(key)
  628. # 按原因类别分组
  629. by_reason: Dict[str, List[Dict[str, Any]]] = {}
  630. for fs in existing_filtered_sources:
  631. reason = fs.get("filter_reason", "unknown")
  632. category = reason.split(":", 1)[0]
  633. by_reason.setdefault(category, []).append(fs)
  634. filtered_output = {
  635. "total": len(existing_filtered_sources),
  636. "by_reason": {
  637. category: {
  638. "count": len(items),
  639. "sources": items,
  640. }
  641. for category, items in by_reason.items()
  642. },
  643. }
  644. with open(filtered_output_file, "w", encoding="utf-8") as f:
  645. json.dump(filtered_output, f, ensure_ascii=False, indent=2)
  646. # 9. 下载图片到 raw_cases/images/{case_id}/
  647. images_downloaded = download_images_for_sources(matched, raw_cases_dir)
  648. # 10. 构建被过滤条目的摘要(供续跑 feedback 使用)
  649. filtered_details: List[Dict[str, Any]] = []
  650. for fs in filtered_sources:
  651. post = fs.get("post", {}) or {}
  652. title = post.get("title") or fs.get("source_url", "")
  653. filtered_details.append({
  654. "case_id": fs.get("case_id", ""),
  655. "platform": fs.get("platform", ""),
  656. "title": title[:60] if title else "",
  657. "filter_reason": fs.get("filter_reason", ""),
  658. })
  659. # 返回统计信息
  660. return {
  661. "total_matched": len(matched),
  662. "total_existing": len(existing_sources),
  663. "total_unmatched": len(unmatched),
  664. "filtered_total": filtered_count,
  665. "filtered_reasons": reason_counts,
  666. "filtered_details": filtered_details,
  667. "images_downloaded": images_downloaded,
  668. "auto_transcribe": auto_transcribe_stats,
  669. "output_file": str(output_file),
  670. }
  671. # ── 图片下载 ────────────────────────────────
  672. def _get_image_urls_from_post(post: Dict[str, Any]) -> List[str]:
  673. """从 post 中提取所有图片 URL"""
  674. urls = []
  675. images = post.get("images", [])
  676. if isinstance(images, list):
  677. for img in images:
  678. if isinstance(img, str) and img.startswith("http"):
  679. urls.append(img)
  680. elif isinstance(img, dict) and "url" in img:
  681. urls.append(img["url"])
  682. image_url_list = post.get("image_url_list", [])
  683. if isinstance(image_url_list, list):
  684. for img_obj in image_url_list:
  685. if isinstance(img_obj, dict) and "image_url" in img_obj:
  686. urls.append(img_obj["image_url"])
  687. return urls
  688. def _format_timestamp(ts: Any) -> Optional[str]:
  689. """将时间戳(秒/毫秒)转换为可读格式"""
  690. from datetime import datetime
  691. if ts is None or ts == 0 or ts == "":
  692. return None
  693. try:
  694. ts = int(ts)
  695. # 毫秒级时间戳
  696. if ts > 1000000000000:
  697. ts = ts / 1000
  698. dt = datetime.fromtimestamp(ts)
  699. return dt.strftime("%Y-%m-%d %H:%M:%S")
  700. except Exception:
  701. return None
  702. def _convert_timestamps(sources: List[Dict[str, Any]]) -> None:
  703. """将 source 列表中 post 的时间戳字段替换为可读格式"""
  704. timestamp_fields = ["publish_timestamp", "modify_timestamp", "update_timestamp"]
  705. for src in sources:
  706. post = src.get("post", {})
  707. if not isinstance(post, dict):
  708. continue
  709. for field in timestamp_fields:
  710. if field in post:
  711. readable = _format_timestamp(post.get(field))
  712. if readable:
  713. post[field] = readable
  714. def download_images_for_sources(sources: List[Dict[str, Any]], raw_cases_dir: Path) -> int:
  715. """
  716. 为新匹配的 sources 下载图片到 raw_cases/images/{case_id}/
  717. Returns:
  718. 下载成功的图片总数
  719. """
  720. import urllib.request
  721. import urllib.error
  722. images_base = raw_cases_dir / "images"
  723. total_downloaded = 0
  724. # 设置 headers 避免被拒(X/Twitter 需要 User-Agent)
  725. opener = urllib.request.build_opener()
  726. opener.addheaders = [("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36")]
  727. urllib.request.install_opener(opener)
  728. for src in sources:
  729. case_id = src.get("case_id", "")
  730. post = src.get("post", {})
  731. image_urls = _get_image_urls_from_post(post)
  732. if not image_urls:
  733. continue
  734. case_dir = images_base / case_id
  735. case_dir.mkdir(parents=True, exist_ok=True)
  736. for idx, url in enumerate(image_urls):
  737. # 确定文件扩展名
  738. ext = ".jpg"
  739. if ".png" in url.lower():
  740. ext = ".png"
  741. elif ".webp" in url.lower():
  742. ext = ".webp"
  743. save_path = case_dir / f"{idx:02d}{ext}"
  744. if save_path.exists():
  745. total_downloaded += 1
  746. continue
  747. try:
  748. urllib.request.urlretrieve(url, str(save_path))
  749. total_downloaded += 1
  750. except Exception:
  751. # 下载失败就跳过,不中断流程
  752. pass
  753. return total_downloaded
  754. if __name__ == "__main__":
  755. # CLI:python extract_sources.py <raw_cases_dir> [cache_dir]
  756. import sys
  757. if len(sys.argv) < 2:
  758. print("Usage: python extract_sources.py <raw_cases_dir> [cache_dir]")
  759. sys.exit(1)
  760. raw_cases_dir = Path(sys.argv[1])
  761. cache_dir = Path(sys.argv[2]) if len(sys.argv) > 2 else None
  762. result = extract_sources_to_json(raw_cases_dir, cache_dir=cache_dir)
  763. print(f"[OK] Matched: {result['total_matched']}, Unmatched: {result['total_unmatched']}")
  764. print(f" Output: {raw_cases_dir / 'source.json'}")