extract_sources.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701
  1. """
  2. 从 raw_cases/case_*.json 中提取 source_url / 帖子链接,
  3. 解析 channel_content_id,再从 .cache/content_search 中查找对应的原始帖子数据。
  4. 主函数:extract_sources_to_json(raw_cases_dir)
  5. - 扫描该目录下所有 case_{platform}.json
  6. - 解析每个 "工序发现[].帖子链接"(新格式)或 "cases[].source_url"(旧格式)
  7. - 从项目根的 .cache/content_search/*.json 中匹配 channel_content_id
  8. - 把匹配到的完整 post 写入 {raw_cases_dir}/source.json
  9. - 同时下载图片到 {raw_cases_dir}/images/{case_id}/
  10. """
  11. import json
  12. import re
  13. from pathlib import Path
  14. from typing import Any, Dict, List, Optional, Tuple
  15. import asyncio
  16. import aiohttp
  17. from urllib.parse import urlparse, parse_qs, urlencode
  18. # ── URL → (platform, content_id) 解析 ────────────────────────────────
  19. _URL_PATTERNS = [
  20. # B站: https://www.bilibili.com/video/BV1xxx
  21. ("bili", re.compile(r"bilibili\.com/video/(BV[\w]+)")),
  22. # 小红书: https://www.xiaohongshu.com/explore/{id} 或 /discovery/item/{id}
  23. ("xhs", re.compile(r"xiaohongshu\.com/(?:explore|discovery/item)/([a-f0-9]+)")),
  24. # YouTube: https://www.youtube.com/watch?v={id} 或 https://youtu.be/{id}
  25. ("youtube", re.compile(r"(?:youtube\.com/watch\?v=|youtu\.be/)([\w-]+)")),
  26. # X/Twitter: https://x.com/{user}/status/{id} 或 twitter.com
  27. ("x", re.compile(r"(?:x\.com|twitter\.com)/[^/]+/status/(\d+)")),
  28. # 知乎: https://zhuanlan.zhihu.com/p/{id} 或 zhihu.com/question/{qid}/answer/{aid}
  29. ("zhihu", re.compile(r"zhuanlan\.zhihu\.com/p/(\d+)")),
  30. ("zhihu", re.compile(r"zhihu\.com/question/\d+/answer/(\d+)")),
  31. # 公众号: 通过 __biz 或整个 URL 作为 id(后备)
  32. ("gzh", re.compile(r"mp\.weixin\.qq\.com/s[/?]([^\s\"']+)")),
  33. # 抖音: https://www.douyin.com/video/{numeric_id} — content_id 直接匹配
  34. # douyin post.channel_content_id
  35. ("douyin", re.compile(r"douyin\.com/video/(\d+)")),
  36. # 视频号: post.link 是 'export/{base64-like token}' 不是 http URL;
  37. # 这里识别出来打上 sph 标签,真实定位走 ("url", link) 索引兜底(见 main 流程)
  38. ("sph", re.compile(r"^export/([A-Za-z0-9+/=_\-]+)$")),
  39. ]
  40. def parse_url(url: str) -> Optional[Tuple[str, str]]:
  41. """从 URL 解析出 (platform, content_id)。返回 None 表示无法解析。"""
  42. if not url or not isinstance(url, str):
  43. return None
  44. for platform, pat in _URL_PATTERNS:
  45. m = pat.search(url)
  46. if m:
  47. return platform, m.group(1)
  48. return None
  49. # ── 从 case 文件中抽取所有条目(URL + evaluation) ────────────────────────────────
  50. def extract_entries_from_case(case_data: Any) -> List[Dict[str, Any]]:
  51. """
  52. 从 case 数据中抽取条目,每条包含 url 和 agent 的 evaluation。
  53. 返回: [{"url": str, "evaluation": dict | None, "title": str | None, "case_id": str | None}]
  54. """
  55. entries: List[Dict[str, Any]] = []
  56. if not isinstance(case_data, dict):
  57. return entries
  58. # 新格式:工序发现[].帖子链接
  59. for item in case_data.get("工序发现", []) or []:
  60. if isinstance(item, dict):
  61. link = item.get("帖子链接") or item.get("source_url")
  62. if link:
  63. entries.append({
  64. "url": link,
  65. "evaluation": item.get("evaluation"),
  66. "title": item.get("title"),
  67. "case_id": item.get("case_id"),
  68. })
  69. # 主格式:cases[]
  70. for item in case_data.get("cases", []) or []:
  71. if isinstance(item, dict):
  72. link = item.get("source_url") or item.get("帖子链接")
  73. if link:
  74. entries.append({
  75. "url": link,
  76. "evaluation": item.get("evaluation"),
  77. "title": item.get("title"),
  78. "case_id": item.get("case_id"),
  79. })
  80. return entries
  81. def extract_urls_from_case(case_data: Any) -> List[str]:
  82. """[Legacy] 旧接口,保留供可能的外部调用。内部改用 extract_entries_from_case。"""
  83. return [e["url"] for e in extract_entries_from_case(case_data)]
  84. # ── 过滤规则(统一入口) ────────────────────────────────
  85. # 默认阈值:body_text 最少字符数、agent 评分下限
  86. DEFAULT_MIN_BODY_LEN = 30
  87. DEFAULT_MIN_SCORE = 70.0
  88. DEFAULT_CUTOFF_DATE = (2025, 10, 1)
  89. _TRANSCRIPT_MARKER = "[视频字幕]"
  90. def _merge_transcript_into_body(post: Dict[str, Any]) -> Dict[str, Any]:
  91. """Return a shallow copy of `post` where `body_text` includes the video transcript.
  92. For sph/douyin/x video posts whose original body_text is just hashtags or empty,
  93. this exposes the real video content (Deepgram transcript or YouTube captions)
  94. under the field the frontend reads. The original transcript field is left intact
  95. so downstream code that wants it separately still works.
  96. Idempotent: if body_text already contains the `[视频字幕]` marker, skip.
  97. """
  98. if not isinstance(post, dict):
  99. return post
  100. transcript = post.get("video_transcript") or post.get("captions") or ""
  101. if not isinstance(transcript, str) or not transcript.strip():
  102. return post
  103. transcript = transcript.strip()
  104. body = post.get("body_text") or post.get("desc") or ""
  105. body = body.strip() if isinstance(body, str) else ""
  106. if body and _TRANSCRIPT_MARKER in body:
  107. return post # 已经合并过
  108. merged = dict(post) # shallow copy — body_text 是 top-level field
  109. if body:
  110. merged["body_text"] = f"{body}\n\n{_TRANSCRIPT_MARKER}\n{transcript}"
  111. else:
  112. merged["body_text"] = f"{_TRANSCRIPT_MARKER}\n{transcript}"
  113. return merged
  114. def _is_before_cutoff(source: Dict[str, Any], cutoff_ts: int) -> bool:
  115. """判断帖子是否早于截止时间戳(秒级)
  116. 如果 timestamp 为 0 或不存在,返回 False(保留)
  117. """
  118. post = source.get("post", {})
  119. if not isinstance(post, dict):
  120. return False
  121. ts = post.get("publish_timestamp")
  122. # 没有 timestamp 或为 0,保留
  123. if not ts or ts == 0:
  124. return False
  125. try:
  126. ts = int(ts)
  127. # 毫秒级转秒级
  128. if ts > 1000000000000:
  129. ts = ts / 1000
  130. return ts < cutoff_ts
  131. except (ValueError, TypeError):
  132. pass
  133. # 尝试解析字符串格式 "2025-05-02 19:25:30"
  134. try:
  135. from datetime import datetime
  136. dt = datetime.strptime(str(ts), "%Y-%m-%d %H:%M:%S")
  137. return dt.timestamp() < cutoff_ts
  138. except Exception:
  139. # 解析失败,保留
  140. return False
  141. def _check_filters(
  142. source: Dict[str, Any],
  143. cutoff_ts: int,
  144. min_body_len: int,
  145. min_score: float,
  146. ) -> Optional[str]:
  147. """
  148. 对一条 source 逐条检查过滤规则。
  149. 返回:
  150. None —— 条目合格
  151. 非 None 字符串 —— 不合格的原因(写入 filter_reason)
  152. """
  153. post = source.get("post", {}) or {}
  154. # 1. 内容完整性:视频帖把 video_transcript 也计入(视频帖 body_text 通常为空)
  155. body = post.get("body_text") or post.get("desc") or ""
  156. transcript = post.get("video_transcript") or post.get("captions") or ""
  157. body_len = len(body.strip()) if isinstance(body, str) else 0
  158. transcript_len = len(transcript.strip()) if isinstance(transcript, str) else 0
  159. total_len = body_len + transcript_len
  160. if total_len < min_body_len:
  161. return f"missing_body_text:body={body_len},transcript={transcript_len}"
  162. # 2. agent 评分
  163. evaluation = source.get("evaluation")
  164. if not isinstance(evaluation, dict):
  165. return "missing_evaluation"
  166. quality = evaluation.get("quality")
  167. if not isinstance(quality, dict):
  168. return "missing_evaluation"
  169. score = quality.get("overall_score")
  170. if not isinstance(score, (int, float)):
  171. return "invalid_score"
  172. if score < min_score:
  173. return f"low_score:{score}"
  174. # 3. 过时(复用原 _is_before_cutoff 对时间戳的解析)
  175. if _is_before_cutoff(source, cutoff_ts):
  176. return "outdated"
  177. return None
  178. # ── 从 cache 中构建 (platform, content_id) → post 索引 ────────────────────────────────
  179. def _normalize_url(url: str) -> Optional[str]:
  180. """规范化 URL:排序 query 参数,去掉尾斜杠"""
  181. try:
  182. parsed = urlparse(url)
  183. if parsed.query:
  184. # 排序 query 参数
  185. params = parse_qs(parsed.query, keep_blank_values=True)
  186. sorted_query = urlencode(sorted(params.items()), doseq=True)
  187. normalized = f"{parsed.scheme}://{parsed.netloc}{parsed.path}?{sorted_query}"
  188. else:
  189. normalized = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
  190. return normalized.rstrip('/')
  191. except:
  192. return None
  193. def build_cache_index(cache_dir: Path, trace_ids: Optional[List[str]] = None) -> Dict[Tuple[str, str], Dict[str, Any]]:
  194. """
  195. 构建 (platform, channel_content_id) -> post 映射。
  196. Args:
  197. cache_dir: cache 目录路径
  198. trace_ids: 可选的 trace_id 列表。如果提供,只加载这些特定的 cache 文件;
  199. 否则扫描所有 cache 文件
  200. Returns:
  201. (platform, content_id) -> post 的映射字典
  202. """
  203. index: Dict[Tuple[str, str], Dict[str, Any]] = {}
  204. if not cache_dir.exists():
  205. return index
  206. # 如果提供了 trace_ids,只加载这些特定文件
  207. if trace_ids:
  208. cache_files = [cache_dir / f"{tid}.json" for tid in trace_ids if tid]
  209. cache_files = [f for f in cache_files if f.exists()]
  210. else:
  211. # 否则扫描所有 cache 文件
  212. cache_files = list(cache_dir.glob("*.json"))
  213. for cache_file in cache_files:
  214. try:
  215. with open(cache_file, "r", encoding="utf-8") as f:
  216. data = json.load(f)
  217. except Exception:
  218. continue
  219. for key, entry in data.items():
  220. if not key.startswith("search:"):
  221. continue
  222. platform = key.split(":", 1)[1]
  223. # 新格式:entry = {"history": [...], "latest_index": n}
  224. # 旧格式:entry = {"keyword": ..., "posts": [...]}
  225. if isinstance(entry, dict) and "history" in entry:
  226. post_lists = [h.get("posts", []) for h in entry.get("history", [])]
  227. elif isinstance(entry, dict) and "posts" in entry:
  228. post_lists = [entry.get("posts", [])]
  229. else:
  230. continue
  231. for posts in post_lists:
  232. for post in posts or []:
  233. if not isinstance(post, dict):
  234. continue
  235. cid = post.get("channel_content_id")
  236. # YouTube 平台用 video_id 而非 channel_content_id
  237. if not cid and post.get("video_id"):
  238. cid = post.get("video_id")
  239. post["channel_content_id"] = cid # 补全字段
  240. if cid:
  241. # 用 (platform, content_id) 作为索引键
  242. index[(platform, str(cid))] = post
  243. # 额外用 link / url 字段建立索引(用于 GZH 等平台)
  244. link = post.get("link") or post.get("url") or ""
  245. if link:
  246. index[("url", link)] = post
  247. # 规范化 URL 索引(排序 query 参数,去掉尾斜杠)
  248. norm = _normalize_url(link)
  249. if norm:
  250. index[("norm_url", norm)] = post
  251. return index
  252. # ── 主入口 ────────────────────────────────
  253. def extract_sources_to_json(
  254. raw_cases_dir: Path,
  255. cache_dir: Optional[Path] = None,
  256. output_name: str = "source.json",
  257. trace_ids: Optional[List[str]] = None,
  258. min_body_len: int = DEFAULT_MIN_BODY_LEN,
  259. min_score: float = DEFAULT_MIN_SCORE,
  260. cutoff_date: Tuple[int, int, int] = DEFAULT_CUTOFF_DATE,
  261. ) -> Dict[str, Any]:
  262. """
  263. 扫描 raw_cases_dir 下的 case_*.json,
  264. 从 cache 中找出原始帖子,并对结果执行统一过滤(body_text / 评分 / 时效)。
  265. 过滤规则(均可通过参数调整):
  266. - body_text 为空或少于 min_body_len 字符 → filter_reason=missing_body_text
  267. - agent evaluation 缺失或 weighted_score < min_score → filter_reason=missing_evaluation / invalid_score / low_score
  268. - publish_timestamp 早于 cutoff_date → filter_reason=outdated
  269. 参数:
  270. min_body_len: body_text 最少字符数,默认 30
  271. min_score: agent 评分下限,默认 70
  272. cutoff_date: 过时截止日期 (year, month, day),默认 (2025, 10, 1)
  273. 返回统计信息 dict。
  274. """
  275. raw_cases_dir = Path(raw_cases_dir)
  276. if cache_dir is None:
  277. # 项目根目录:script 文件往上三级
  278. project_root = Path(__file__).resolve().parent.parent.parent.parent
  279. cache_dir = project_root / ".cache" / "content_search"
  280. cache_dir = Path(cache_dir)
  281. # 1. 构建 cache 索引
  282. cache_index = build_cache_index(cache_dir, trace_ids=trace_ids)
  283. # 2. 加载已有的 source.json(如果存在)
  284. output_file = raw_cases_dir / output_name
  285. existing_sources = []
  286. existing_ids = set() # (platform, channel_content_id) 集合用于去重
  287. if output_file.exists():
  288. try:
  289. with open(output_file, "r", encoding="utf-8") as f:
  290. existing_data = json.load(f)
  291. existing_sources = existing_data.get("sources", [])
  292. # 构建已有的 ID 集合
  293. for src in existing_sources:
  294. key = (src.get("platform"), src.get("channel_content_id"))
  295. existing_ids.add(key)
  296. except Exception as e:
  297. print(f"Warning: Failed to load existing source.json: {e}")
  298. # 2.1 加载已有的 filtered_cases.json(如果存在)
  299. filtered_output_file = raw_cases_dir / "filtered_cases.json"
  300. existing_filtered_sources = []
  301. existing_filtered_ids = set()
  302. if filtered_output_file.exists():
  303. try:
  304. with open(filtered_output_file, "r", encoding="utf-8") as f:
  305. filtered_data = json.load(f)
  306. existing_filtered_sources = filtered_data.get("sources", [])
  307. for src in existing_filtered_sources:
  308. key = (src.get("platform"), src.get("channel_content_id"))
  309. existing_filtered_ids.add(key)
  310. except Exception as e:
  311. print(f"Warning: Failed to load existing filtered_cases.json: {e}")
  312. # 3. 扫描所有 case 文件
  313. matched: List[Dict[str, Any]] = []
  314. unmatched: List[Dict[str, Any]] = []
  315. seen_keys: set = set(existing_ids) # 从已有的 ID 开始
  316. for case_file in sorted(raw_cases_dir.glob("case_*.json")):
  317. # 跳过自己(如果 source.json 误被命名成 case_*)
  318. if case_file.name == output_name:
  319. continue
  320. try:
  321. with open(case_file, "r", encoding="utf-8") as f:
  322. case_data = json.load(f)
  323. except Exception as e:
  324. # 尝试自动修复 JSON 格式错误
  325. try:
  326. from examples.process_pipeline.script.fix_json_quotes import try_fix_and_parse
  327. with open(case_file, "r", encoding="utf-8") as f:
  328. raw_content = f.read()
  329. success, case_data, fix_desc = try_fix_and_parse(raw_content)
  330. if success:
  331. # 修复成功,写回文件
  332. with open(case_file, "w", encoding="utf-8") as f:
  333. json.dump(case_data, f, ensure_ascii=False, indent=2)
  334. print(f" 🔧 [Auto-Fix] Fixed {case_file.name}: {fix_desc}")
  335. else:
  336. unmatched.append({"case_file": case_file.name, "error": str(e)})
  337. continue
  338. except Exception:
  339. unmatched.append({"case_file": case_file.name, "error": str(e)})
  340. continue
  341. entries = extract_entries_from_case(case_data)
  342. for entry in entries:
  343. url = entry["url"]
  344. evaluation = entry.get("evaluation")
  345. # 解析 URL 得到 platform 和 content_id(可能失败,例如 sph 的 export/... token)
  346. parsed = parse_url(url)
  347. platform: Optional[str] = parsed[0] if parsed else None
  348. cid: Optional[str] = parsed[1] if parsed else None
  349. # 多级匹配,按优先级:
  350. # 1. (platform, content_id) 精确匹配(parse_url 成功的情况)
  351. # 2. ("url", link) 完整字符串匹配(兜底,对 sph 这种非 http URL 必需)
  352. # 3. ("norm_url", normalized) 规范化匹配
  353. post = None
  354. if platform and cid:
  355. post = cache_index.get((platform, cid))
  356. if not post:
  357. post = cache_index.get(("url", url))
  358. if not post:
  359. norm = _normalize_url(url)
  360. if norm:
  361. post = cache_index.get(("norm_url", norm))
  362. # 如果 parse_url 失败但 URL 索引命中了 post,从 post 推断 platform / cid
  363. if post and (not platform or not cid):
  364. platform = platform or post.get("channel") or "unknown"
  365. cid = cid or post.get("channel_content_id") or post.get("video_id") or url
  366. # 既无 parsed 又无 cache 命中:真 unmatched
  367. if not post and not platform:
  368. unmatched.append({
  369. "case_file": case_file.name,
  370. "url": url,
  371. "reason": "url_parse_failed",
  372. })
  373. continue
  374. key = (platform, str(cid) if cid else url)
  375. if key in seen_keys:
  376. continue
  377. seen_keys.add(key)
  378. if post:
  379. # 统一用 cache 中的 channel_content_id 生成 case_id
  380. # 这样保证 case_id 和 cache 中的 ID 一致
  381. actual_cid = post.get("channel_content_id") or post.get("video_id") or cid
  382. actual_case_id = f"{platform}_{actual_cid}"
  383. # 合并 video_transcript 到 body_text(视频帖前端展示需要)
  384. post_for_source = _merge_transcript_into_body(post)
  385. matched.append({
  386. "case_id": actual_case_id,
  387. "case_file": case_file.name,
  388. "platform": platform,
  389. "channel_content_id": str(actual_cid),
  390. "source_url": url,
  391. "evaluation": evaluation,
  392. "post": post_for_source,
  393. "comments": post.get("author_comments", []) or [],
  394. })
  395. else:
  396. unmatched.append({
  397. "case_id": f"{platform}_{cid}", # 统一格式的 ID
  398. "case_file": case_file.name,
  399. "platform": platform,
  400. "channel_content_id": cid,
  401. "source_url": url,
  402. "reason": "not_in_cache",
  403. })
  404. # 4. 合并已有数据和新匹配的数据
  405. all_sources = existing_sources + matched
  406. # 5. 统一过滤:body_text 完整性 / agent 评分 / 时效
  407. from datetime import datetime as _dt
  408. cutoff_ts = int(_dt(*cutoff_date).timestamp())
  409. kept_sources: List[Dict[str, Any]] = []
  410. filtered_sources: List[Dict[str, Any]] = []
  411. reason_counts: Dict[str, int] = {}
  412. for s in all_sources:
  413. reason = _check_filters(s, cutoff_ts, min_body_len, min_score)
  414. if reason is None:
  415. kept_sources.append(s)
  416. else:
  417. # 记录过滤原因
  418. s_copy = dict(s)
  419. s_copy["filter_reason"] = reason
  420. filtered_sources.append(s_copy)
  421. # 统计原因类型(只取冒号前的类别)
  422. category = reason.split(":", 1)[0]
  423. reason_counts[category] = reason_counts.get(category, 0) + 1
  424. all_sources = kept_sources
  425. filtered_count = len(filtered_sources)
  426. # 6. 转换 timestamp 为可读格式
  427. _convert_timestamps(all_sources)
  428. _convert_timestamps(filtered_sources)
  429. # 7. 写 source.json
  430. output = {
  431. "total": len(all_sources),
  432. "cache_dir": str(cache_dir),
  433. "sources": all_sources,
  434. }
  435. output_file.parent.mkdir(parents=True, exist_ok=True)
  436. with open(output_file, "w", encoding="utf-8") as f:
  437. json.dump(output, f, ensure_ascii=False, indent=2)
  438. # 8. 写 filtered_cases.json(被过滤掉的帖子,按原因分组)
  439. if filtered_sources:
  440. for fs in filtered_sources:
  441. key = (fs.get("platform"), fs.get("channel_content_id"))
  442. if key not in existing_filtered_ids:
  443. existing_filtered_sources.append(fs)
  444. existing_filtered_ids.add(key)
  445. # 按原因类别分组
  446. by_reason: Dict[str, List[Dict[str, Any]]] = {}
  447. for fs in existing_filtered_sources:
  448. reason = fs.get("filter_reason", "unknown")
  449. category = reason.split(":", 1)[0]
  450. by_reason.setdefault(category, []).append(fs)
  451. filtered_output = {
  452. "total": len(existing_filtered_sources),
  453. "by_reason": {
  454. category: {
  455. "count": len(items),
  456. "sources": items,
  457. }
  458. for category, items in by_reason.items()
  459. },
  460. }
  461. with open(filtered_output_file, "w", encoding="utf-8") as f:
  462. json.dump(filtered_output, f, ensure_ascii=False, indent=2)
  463. # 9. 下载图片到 raw_cases/images/{case_id}/
  464. images_downloaded = download_images_for_sources(matched, raw_cases_dir)
  465. # 10. 构建被过滤条目的摘要(供续跑 feedback 使用)
  466. filtered_details: List[Dict[str, Any]] = []
  467. for fs in filtered_sources:
  468. post = fs.get("post", {}) or {}
  469. title = post.get("title") or fs.get("source_url", "")
  470. filtered_details.append({
  471. "case_id": fs.get("case_id", ""),
  472. "platform": fs.get("platform", ""),
  473. "title": title[:60] if title else "",
  474. "filter_reason": fs.get("filter_reason", ""),
  475. })
  476. # 返回统计信息
  477. return {
  478. "total_matched": len(matched),
  479. "total_existing": len(existing_sources),
  480. "total_unmatched": len(unmatched),
  481. "filtered_total": filtered_count,
  482. "filtered_reasons": reason_counts,
  483. "filtered_details": filtered_details,
  484. "images_downloaded": images_downloaded,
  485. "output_file": str(output_file),
  486. }
  487. # ── 图片下载 ────────────────────────────────
  488. def _get_image_urls_from_post(post: Dict[str, Any]) -> List[str]:
  489. """从 post 中提取所有图片 URL"""
  490. urls = []
  491. images = post.get("images", [])
  492. if isinstance(images, list):
  493. for img in images:
  494. if isinstance(img, str) and img.startswith("http"):
  495. urls.append(img)
  496. elif isinstance(img, dict) and "url" in img:
  497. urls.append(img["url"])
  498. image_url_list = post.get("image_url_list", [])
  499. if isinstance(image_url_list, list):
  500. for img_obj in image_url_list:
  501. if isinstance(img_obj, dict) and "image_url" in img_obj:
  502. urls.append(img_obj["image_url"])
  503. return urls
  504. def _format_timestamp(ts: Any) -> Optional[str]:
  505. """将时间戳(秒/毫秒)转换为可读格式"""
  506. from datetime import datetime
  507. if ts is None or ts == 0 or ts == "":
  508. return None
  509. try:
  510. ts = int(ts)
  511. # 毫秒级时间戳
  512. if ts > 1000000000000:
  513. ts = ts / 1000
  514. dt = datetime.fromtimestamp(ts)
  515. return dt.strftime("%Y-%m-%d %H:%M:%S")
  516. except Exception:
  517. return None
  518. def _convert_timestamps(sources: List[Dict[str, Any]]) -> None:
  519. """将 source 列表中 post 的时间戳字段替换为可读格式"""
  520. timestamp_fields = ["publish_timestamp", "modify_timestamp", "update_timestamp"]
  521. for src in sources:
  522. post = src.get("post", {})
  523. if not isinstance(post, dict):
  524. continue
  525. for field in timestamp_fields:
  526. if field in post:
  527. readable = _format_timestamp(post.get(field))
  528. if readable:
  529. post[field] = readable
  530. def download_images_for_sources(sources: List[Dict[str, Any]], raw_cases_dir: Path) -> int:
  531. """
  532. 为新匹配的 sources 下载图片到 raw_cases/images/{case_id}/
  533. Returns:
  534. 下载成功的图片总数
  535. """
  536. import urllib.request
  537. import urllib.error
  538. images_base = raw_cases_dir / "images"
  539. total_downloaded = 0
  540. # 设置 headers 避免被拒(X/Twitter 需要 User-Agent)
  541. opener = urllib.request.build_opener()
  542. opener.addheaders = [("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36")]
  543. urllib.request.install_opener(opener)
  544. for src in sources:
  545. case_id = src.get("case_id", "")
  546. post = src.get("post", {})
  547. image_urls = _get_image_urls_from_post(post)
  548. if not image_urls:
  549. continue
  550. case_dir = images_base / case_id
  551. case_dir.mkdir(parents=True, exist_ok=True)
  552. for idx, url in enumerate(image_urls):
  553. # 确定文件扩展名
  554. ext = ".jpg"
  555. if ".png" in url.lower():
  556. ext = ".png"
  557. elif ".webp" in url.lower():
  558. ext = ".webp"
  559. save_path = case_dir / f"{idx:02d}{ext}"
  560. if save_path.exists():
  561. total_downloaded += 1
  562. continue
  563. try:
  564. urllib.request.urlretrieve(url, str(save_path))
  565. total_downloaded += 1
  566. except Exception:
  567. # 下载失败就跳过,不中断流程
  568. pass
  569. return total_downloaded
  570. if __name__ == "__main__":
  571. # CLI:python extract_sources.py <raw_cases_dir> [cache_dir]
  572. import sys
  573. if len(sys.argv) < 2:
  574. print("Usage: python extract_sources.py <raw_cases_dir> [cache_dir]")
  575. sys.exit(1)
  576. raw_cases_dir = Path(sys.argv[1])
  577. cache_dir = Path(sys.argv[2]) if len(sys.argv) > 2 else None
  578. result = extract_sources_to_json(raw_cases_dir, cache_dir=cache_dir)
  579. print(f"[OK] Matched: {result['total_matched']}, Unmatched: {result['total_unmatched']}")
  580. print(f" Output: {raw_cases_dir / 'source.json'}")