#!/usr/bin/env python3 """ 从 raw_cases/source.json 生成标准化的 case.json 职责: 1. 读取 raw_cases/source.json(原始 source 格式) 2. 标准化字段格式(title, body, author, images, url, note) 3. 下载图片到本地 + 上传到 OSS 4. 输出到需求目录根下的 case.json 输出格式: index, category, user_kept, user_comment, description, method, cover, title, author, body, images, url, note, _raw, workflow, capabilities """ import asyncio import hashlib import json from pathlib import Path from typing import Any, Dict, List, Optional # ── OSS 工具 ────────────────────────────────────── CDN_BASE = "https://res.cybertogether.net" def _is_oss_url(url: str) -> bool: return url.startswith(CDN_BASE) def _ext_from_path(path: str) -> str: """从 URL 猜测扩展名,默认 jpg""" p = path.split("?")[0].lower() for ext in ("png", "gif", "webp", "avif", "bmp", "svg", "jpg", "jpeg"): if p.endswith(f".{ext}"): return ext return "jpg" async def _upload_bytes(data: bytes, filename: str) -> str: """上传 bytes 到 OSS,返回 CDN URL""" from agent.tools.builtin.file.image_cdn import _upload_bytes_to_oss return await _upload_bytes_to_oss(data, filename) async def _upload_remote(url: str, cache: Dict[str, str]) -> str: """下载外链图片并上传到 OSS,返回 CDN URL""" key = hashlib.md5(url.encode()).hexdigest()[:12] if key in cache: return cache[key] from agent.tools.builtin.file.image_cdn import _download_image data = await _download_image(url) ext = _ext_from_path(url) cdn_url = await _upload_bytes(data, f"{key}.{ext}") cache[key] = cdn_url return cdn_url async def ensure_oss_url(url: str, cache: Dict[str, str]) -> str: """确保图片是 OSS CDN URL""" if _is_oss_url(url): return url if url.startswith("http"): return await _upload_remote(url, cache) raise ValueError(f"Invalid image URL: {url}") # ── 字段提取(各平台差异处理)──────────────────────────────────── def _extract_author(post: Dict[str, Any], platform: str) -> str: """字段映射:author / channel_account_name / channel""" if platform == "x": return post.get("channel_account_name") or post.get("author") or "" if platform == "youtube": return post.get("channel") or post.get("author") or "" return post.get("author") or "" def _extract_url(post: Dict[str, Any], platform: str) -> str: """字段映射:url / link / content_link""" if platform == "youtube": return post.get("content_link") or post.get("url") or "" return post.get("url") or post.get("link") or "" def _extract_body(post: Dict[str, Any], platform: str) -> str: """字段映射:body_text / description""" if platform == "youtube": return post.get("description") or post.get("body_text") or "" return post.get("body_text") or "" def _extract_raw_images(post: Dict[str, Any], platform: str) -> List[str]: """字段映射:images / image_url_list / cover_url""" # 优先 images 字段 if post.get("images"): imgs = post["images"] if isinstance(imgs, list) and imgs: return [i for i in imgs if i] # 其次 image_url_list if post.get("image_url_list"): raw = post["image_url_list"] if isinstance(raw, list): result = [] for item in raw: if isinstance(item, dict): result.append(item.get("image_url") or "") else: result.append(item or "") result = [u for u in result if u] if result: return result # 最后兜底 cover_url if post.get("cover_url"): return [post["cover_url"]] return [] def _parse_published_at(timestamp_str: str) -> Optional[str]: """ 解析 publish_timestamp 为 ISO 8601 格式(timestamptz) 支持格式: - "2026-01-09 15:53:00" - "2026-01-09T15:53:00" - 空字符串或 None 返回 None Returns: ISO 8601 格式字符串 (e.g., "2026-01-09T15:53:00+00:00") 或 None """ if not timestamp_str or not isinstance(timestamp_str, str): return None timestamp_str = timestamp_str.strip() if not timestamp_str: return None try: from datetime import datetime, timezone # 尝试解析常见格式 for fmt in [ "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S.%f", ]: try: dt = datetime.strptime(timestamp_str, fmt) # 假设输入是 UTC 时间,添加时区信息 dt = dt.replace(tzinfo=timezone.utc) # 返回 ISO 8601 格式 return dt.isoformat() except ValueError: continue # 如果都失败了,返回 None return None except Exception: return None # ── 单条记录标准化 ──────────────────────────────────────────────────────────── async def normalize_source_item( source_item: Dict[str, Any], index: int, upload_cache: Dict[str, str], images_dir: Path, ) -> Dict[str, Any]: """ 将单条 source item 转换为标准化的 case 格式 """ # 从 source item 提取字段 platform = source_item.get("platform", "") post = source_item.get("post", {}) case_id = source_item.get("case_id", f"{platform}_{source_item.get('channel_content_id', '')}") title = post.get("title", "") author = _extract_author(post, platform) body = _extract_body(post, platform) url = _extract_url(post, platform) or source_item.get("source_url", "") likes = post.get("like_count", 0) comments = post.get("comment_count", 0) # 解析发布时间 publish_timestamp = post.get("publish_timestamp", "") published_at = _parse_published_at(publish_timestamp) # 处理图片:下载到本地 + 上传 OSS raw_images = _extract_raw_images(post, platform) images: List[str] = [] case_dir = images_dir / case_id case_dir.mkdir(parents=True, exist_ok=True) for idx, img_url in enumerate(raw_images): ext = _ext_from_path(img_url) local_path = case_dir / f"{idx:02d}.{ext}" try: # 下载到本地 if not local_path.exists(): print(f" 📥 [{idx+1}/{len(raw_images)}] 下载图片...") from agent.tools.builtin.file.image_cdn import _download_image data = await _download_image(img_url) local_path.write_bytes(data) print(f" 📥 [{idx+1}/{len(raw_images)}] 已保存 {local_path.name} ({len(data)} bytes)") else: print(f" 📁 [{idx+1}/{len(raw_images)}] 本地已存在 {local_path.name}") # 上传到 OSS if _is_oss_url(img_url): images.append(img_url) print(f" ☁️ [{idx+1}/{len(raw_images)}] 已是 CDN URL") else: print(f" ☁️ [{idx+1}/{len(raw_images)}] 上传 OSS...") cdn_url = await ensure_oss_url(img_url, upload_cache) images.append(cdn_url) print(f" ☁️ [{idx+1}/{len(raw_images)}] 上传完成") except Exception as e: print(f" ⚠ [{idx+1}/{len(raw_images)}] 图片处理失败: {str(e)[:60]}") # 兜底:对 body 里的外链图片也替换为 CDN try: from agent.tools.builtin.file.image_cdn import replace_image_urls body = await replace_image_urls(body) except Exception: pass cover = images[0] if images else "" return { "index": index, "category": "", "user_kept": False, "user_comment": "", "description": "", "method": "", "cover": cover, "title": title, "author": author, "body": body, "images": images, "url": url, "note": f"platform={platform} | likes={likes} | comments={comments}", "published_at": published_at, # bigint, nullable "_raw": { "case_id": case_id, "platform": platform, "channel_content_id": source_item.get("channel_content_id", ""), }, "workflow": None, "capabilities": None, } # ── 主入口 ──────────────────────────────── async def generate_case_from_source( raw_cases_dir: Path, output_file: Optional[Path] = None, ) -> Dict[str, Any]: """ 从 raw_cases/source.json 生成标准化的 case.json 如果 case.json 已存在,会保留已有的 workflow 和 capabilities """ raw_cases_dir = Path(raw_cases_dir) source_file = raw_cases_dir / "source.json" if not source_file.exists(): raise FileNotFoundError(f"source.json not found: {source_file}") # 读取 source.json with open(source_file, "r", encoding="utf-8") as f: source_data = json.load(f) sources = source_data.get("sources", []) print(f"Processing {len(sources)} sources...") # 读取已有的 case.json(如果存在) if output_file is None: output_file = raw_cases_dir.parent / "case.json" existing_cases = {} if output_file.exists(): try: with open(output_file, "r", encoding="utf-8") as f: existing_data = json.load(f) existing_list = existing_data.get("cases", []) # 建立 case_id -> case 的映射 for case in existing_list: case_id = case.get("_raw", {}).get("case_id") if case_id: existing_cases[case_id] = case print(f"Found {len(existing_cases)} existing cases, will preserve workflow and capabilities") except Exception as e: print(f"Warning: Failed to read existing case.json: {e}") # 准备图片目录 images_dir = raw_cases_dir / "images" images_dir.mkdir(parents=True, exist_ok=True) # 标准化所有 source items cases: List[Dict[str, Any]] = [] upload_cache: Dict[str, str] = {} for idx, source_item in enumerate(sources, 1): try: case = await normalize_source_item( source_item=source_item, index=idx, upload_cache=upload_cache, images_dir=images_dir, ) # 如果已有该 case,保留其 workflow 和 capabilities case_id = case.get("_raw", {}).get("case_id") if case_id and case_id in existing_cases: existing = existing_cases[case_id] if existing.get("workflow") is not None: case["workflow"] = existing["workflow"] if existing.get("capabilities") is not None: case["capabilities"] = existing["capabilities"] print(f" [{idx}] {case['title'][:40]} (preserved workflow & capabilities)") else: print(f" [{idx}] {case['title'][:40]}") cases.append(case) except Exception as e: print(f" [{idx}] ✗ 失败: {e}") # 输出 case.json output_data = { "total": len(cases), "cases": cases, } output_file.parent.mkdir(parents=True, exist_ok=True) with open(output_file, "w", encoding="utf-8") as f: json.dump(output_data, f, ensure_ascii=False, indent=2) return { "total_cases": len(cases), "output_file": str(output_file), } if __name__ == "__main__": import sys if len(sys.argv) < 2: print("Usage: python generate_case.py ") sys.exit(1) raw_cases_dir = Path(sys.argv[1]) stats = asyncio.run(generate_case_from_source(raw_cases_dir)) print(f"\n✓ Generated {stats['total_cases']} cases") print(f"→ {stats['output_file']}")