generate_case.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. #!/usr/bin/env python3
  2. """
  3. 从 raw_cases/source.json 生成标准化的 case.json
  4. 职责:
  5. 1. 读取 raw_cases/source.json(原始 source 格式)
  6. 2. 标准化字段格式(title, body, author, images, url, note)
  7. 3. 下载图片到本地 + 上传到 OSS
  8. 4. 输出到需求目录根下的 case.json
  9. 输出格式:
  10. index, category, user_kept, user_comment, description, method,
  11. cover, title, author, body, images, url, note, _raw, workflow, capabilities
  12. """
  13. import asyncio
  14. import hashlib
  15. import json
  16. from pathlib import Path
  17. from typing import Any, Dict, List, Optional
  18. # ── OSS 工具 ──────────────────────────────────────
  19. CDN_BASE = "https://res.cybertogether.net"
  20. def _is_oss_url(url: str) -> bool:
  21. return url.startswith(CDN_BASE)
  22. def _ext_from_path(path: str) -> str:
  23. """从 URL 猜测扩展名,默认 jpg"""
  24. p = path.split("?")[0].lower()
  25. for ext in ("png", "gif", "webp", "avif", "bmp", "svg", "jpg", "jpeg"):
  26. if p.endswith(f".{ext}"):
  27. return ext
  28. return "jpg"
  29. async def _upload_bytes(data: bytes, filename: str) -> str:
  30. """上传 bytes 到 OSS,返回 CDN URL"""
  31. from agent.tools.builtin.file.image_cdn import _upload_bytes_to_oss
  32. return await _upload_bytes_to_oss(data, filename)
  33. async def _upload_remote(url: str, cache: Dict[str, str]) -> str:
  34. """下载外链图片并上传到 OSS,返回 CDN URL"""
  35. key = hashlib.md5(url.encode()).hexdigest()[:12]
  36. if key in cache:
  37. return cache[key]
  38. from agent.tools.builtin.file.image_cdn import _download_image
  39. data = await _download_image(url)
  40. ext = _ext_from_path(url)
  41. cdn_url = await _upload_bytes(data, f"{key}.{ext}")
  42. cache[key] = cdn_url
  43. return cdn_url
  44. async def ensure_oss_url(url: str, cache: Dict[str, str]) -> str:
  45. """确保图片是 OSS CDN URL"""
  46. if _is_oss_url(url):
  47. return url
  48. if url.startswith("http"):
  49. return await _upload_remote(url, cache)
  50. raise ValueError(f"Invalid image URL: {url}")
  51. # ── 字段提取(各平台差异处理)────────────────────────────────────
  52. def _extract_author(post: Dict[str, Any], platform: str) -> str:
  53. """字段映射:author / channel_account_name / channel"""
  54. if platform == "x":
  55. return post.get("channel_account_name") or post.get("author") or ""
  56. if platform == "youtube":
  57. return post.get("channel") or post.get("author") or ""
  58. return post.get("author") or ""
  59. def _extract_url(post: Dict[str, Any], platform: str) -> str:
  60. """字段映射:url / link / content_link"""
  61. if platform == "youtube":
  62. return post.get("content_link") or post.get("url") or ""
  63. return post.get("url") or post.get("link") or ""
  64. def _extract_body(post: Dict[str, Any], platform: str) -> str:
  65. """字段映射:body_text / description"""
  66. if platform == "youtube":
  67. return post.get("description") or post.get("body_text") or ""
  68. return post.get("body_text") or ""
  69. def _extract_raw_images(post: Dict[str, Any], platform: str) -> List[str]:
  70. """字段映射:images / image_url_list / cover_url"""
  71. # 优先 images 字段
  72. if post.get("images"):
  73. imgs = post["images"]
  74. if isinstance(imgs, list) and imgs:
  75. return [i for i in imgs if i]
  76. # 其次 image_url_list
  77. if post.get("image_url_list"):
  78. raw = post["image_url_list"]
  79. if isinstance(raw, list):
  80. result = []
  81. for item in raw:
  82. if isinstance(item, dict):
  83. result.append(item.get("image_url") or "")
  84. else:
  85. result.append(item or "")
  86. result = [u for u in result if u]
  87. if result:
  88. return result
  89. # 最后兜底 cover_url
  90. if post.get("cover_url"):
  91. return [post["cover_url"]]
  92. return []
  93. # ── 单条记录标准化 ────────────────────────────────────────────────────────────
  94. async def normalize_source_item(
  95. source_item: Dict[str, Any],
  96. index: int,
  97. upload_cache: Dict[str, str],
  98. images_dir: Path,
  99. ) -> Dict[str, Any]:
  100. """
  101. 将单条 source item 转换为标准化的 case 格式
  102. """
  103. # 从 source item 提取字段
  104. platform = source_item.get("platform", "")
  105. post = source_item.get("post", {})
  106. case_id = source_item.get("case_id", f"{platform}_{source_item.get('channel_content_id', '')}")
  107. title = post.get("title", "")
  108. author = _extract_author(post, platform)
  109. body = _extract_body(post, platform)
  110. url = _extract_url(post, platform) or source_item.get("source_url", "")
  111. likes = post.get("like_count", 0)
  112. comments = post.get("comment_count", 0)
  113. # 处理图片:下载到本地 + 上传 OSS
  114. raw_images = _extract_raw_images(post, platform)
  115. images: List[str] = []
  116. case_dir = images_dir / case_id
  117. case_dir.mkdir(parents=True, exist_ok=True)
  118. for idx, img_url in enumerate(raw_images):
  119. ext = _ext_from_path(img_url)
  120. local_path = case_dir / f"{idx:02d}.{ext}"
  121. try:
  122. # 下载到本地
  123. if not local_path.exists():
  124. print(f" 📥 [{idx+1}/{len(raw_images)}] 下载图片...")
  125. from agent.tools.builtin.file.image_cdn import _download_image
  126. data = await _download_image(img_url)
  127. local_path.write_bytes(data)
  128. print(f" 📥 [{idx+1}/{len(raw_images)}] 已保存 {local_path.name} ({len(data)} bytes)")
  129. else:
  130. print(f" 📁 [{idx+1}/{len(raw_images)}] 本地已存在 {local_path.name}")
  131. # 上传到 OSS
  132. if _is_oss_url(img_url):
  133. images.append(img_url)
  134. print(f" ☁️ [{idx+1}/{len(raw_images)}] 已是 CDN URL")
  135. else:
  136. print(f" ☁️ [{idx+1}/{len(raw_images)}] 上传 OSS...")
  137. cdn_url = await ensure_oss_url(img_url, upload_cache)
  138. images.append(cdn_url)
  139. print(f" ☁️ [{idx+1}/{len(raw_images)}] 上传完成")
  140. except Exception as e:
  141. print(f" ⚠ [{idx+1}/{len(raw_images)}] 图片处理失败: {str(e)[:60]}")
  142. # 兜底:对 body 里的外链图片也替换为 CDN
  143. try:
  144. from agent.tools.builtin.file.image_cdn import replace_image_urls
  145. body = await replace_image_urls(body)
  146. except Exception:
  147. pass
  148. cover = images[0] if images else ""
  149. return {
  150. "index": index,
  151. "category": "",
  152. "user_kept": False,
  153. "user_comment": "",
  154. "description": "",
  155. "method": "",
  156. "cover": cover,
  157. "title": title,
  158. "author": author,
  159. "body": body,
  160. "images": images,
  161. "url": url,
  162. "note": f"platform={platform} | likes={likes} | comments={comments}",
  163. "_raw": {
  164. "case_id": case_id,
  165. "platform": platform,
  166. "channel_content_id": source_item.get("channel_content_id", ""),
  167. },
  168. "workflow": None,
  169. "capabilities": None,
  170. }
  171. # ── 主入口 ────────────────────────────────
  172. async def generate_case_from_source(
  173. raw_cases_dir: Path,
  174. output_file: Optional[Path] = None,
  175. ) -> Dict[str, Any]:
  176. """
  177. 从 raw_cases/source.json 生成标准化的 case.json
  178. """
  179. raw_cases_dir = Path(raw_cases_dir)
  180. source_file = raw_cases_dir / "source.json"
  181. if not source_file.exists():
  182. raise FileNotFoundError(f"source.json not found: {source_file}")
  183. # 读取 source.json
  184. with open(source_file, "r", encoding="utf-8") as f:
  185. source_data = json.load(f)
  186. sources = source_data.get("sources", [])
  187. print(f"Processing {len(sources)} sources...")
  188. # 准备图片目录
  189. images_dir = raw_cases_dir / "images"
  190. images_dir.mkdir(parents=True, exist_ok=True)
  191. # 标准化所有 source items
  192. cases: List[Dict[str, Any]] = []
  193. upload_cache: Dict[str, str] = {}
  194. for idx, source_item in enumerate(sources, 1):
  195. try:
  196. case = await normalize_source_item(
  197. source_item=source_item,
  198. index=idx,
  199. upload_cache=upload_cache,
  200. images_dir=images_dir,
  201. )
  202. cases.append(case)
  203. print(f" [{idx}] {case['title'][:40]}")
  204. except Exception as e:
  205. print(f" [{idx}] ✗ 失败: {e}")
  206. # 输出 case.json
  207. if output_file is None:
  208. output_file = raw_cases_dir.parent / "case.json"
  209. output_data = {
  210. "total": len(cases),
  211. "cases": cases,
  212. }
  213. output_file.parent.mkdir(parents=True, exist_ok=True)
  214. with open(output_file, "w", encoding="utf-8") as f:
  215. json.dump(output_data, f, ensure_ascii=False, indent=2)
  216. return {
  217. "total_cases": len(cases),
  218. "output_file": str(output_file),
  219. }
  220. if __name__ == "__main__":
  221. import sys
  222. if len(sys.argv) < 2:
  223. print("Usage: python generate_case.py <raw_cases_dir>")
  224. sys.exit(1)
  225. raw_cases_dir = Path(sys.argv[1])
  226. stats = asyncio.run(generate_case_from_source(raw_cases_dir))
  227. print(f"\n✓ Generated {stats['total_cases']} cases")
  228. print(f"→ {stats['output_file']}")