| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376 |
- #!/usr/bin/env python3
- """
- 从 raw_cases/source.json 生成标准化的 case.json
- 职责:
- 1. 读取 raw_cases/source.json(原始 source 格式)
- 2. 标准化字段格式(title, body, author, images, url, note)
- 3. 下载图片到本地 + 上传到 OSS
- 4. 输出到需求目录根下的 case.json
- 输出格式:
- index, category, user_kept, user_comment, description, method,
- cover, title, author, body, images, url, note, _raw, workflow, capability
- """
- import asyncio
- import hashlib
- import json
- from pathlib import Path
- from typing import Any, Dict, List, Optional
- # ── OSS 工具 ──────────────────────────────────────
- CDN_BASE = "https://res.cybertogether.net"
- def _is_oss_url(url: str) -> bool:
- return url.startswith(CDN_BASE)
- def _ext_from_path(path: str) -> str:
- """从 URL 猜测扩展名,默认 jpg"""
- p = path.split("?")[0].lower()
- for ext in ("png", "gif", "webp", "avif", "bmp", "svg", "jpg", "jpeg"):
- if p.endswith(f".{ext}"):
- return ext
- return "jpg"
- async def _upload_bytes(data: bytes, filename: str) -> str:
- """上传 bytes 到 OSS,返回 CDN URL"""
- from agent.tools.builtin.file.image_cdn import _upload_bytes_to_oss
- return await _upload_bytes_to_oss(data, filename)
- async def _upload_remote(url: str, cache: Dict[str, str]) -> str:
- """下载外链图片并上传到 OSS,返回 CDN URL"""
- key = hashlib.md5(url.encode()).hexdigest()[:12]
- if key in cache:
- return cache[key]
- from agent.tools.builtin.file.image_cdn import _download_image
- data = await _download_image(url)
- ext = _ext_from_path(url)
- cdn_url = await _upload_bytes(data, f"{key}.{ext}")
- cache[key] = cdn_url
- return cdn_url
- async def ensure_oss_url(url: str, cache: Dict[str, str]) -> str:
- """确保图片是 OSS CDN URL"""
- if _is_oss_url(url):
- return url
- if url.startswith("http"):
- return await _upload_remote(url, cache)
- raise ValueError(f"Invalid image URL: {url}")
- # ── 字段提取(各平台差异处理)────────────────────────────────────
- def _extract_author(post: Dict[str, Any], platform: str) -> str:
- """字段映射:author / channel_account_name / channel"""
- if platform == "x":
- return post.get("channel_account_name") or post.get("author") or ""
- if platform == "youtube":
- return post.get("channel") or post.get("author") or ""
- return post.get("author") or ""
- def _extract_url(post: Dict[str, Any], platform: str) -> str:
- """字段映射:url / link / content_link"""
- if platform == "youtube":
- return post.get("content_link") or post.get("url") or ""
- return post.get("url") or post.get("link") or ""
- def _extract_body(post: Dict[str, Any], platform: str) -> str:
- """字段映射:body_text / description"""
- if platform == "youtube":
- return post.get("description") or post.get("body_text") or ""
- return post.get("body_text") or ""
- def _extract_raw_images(post: Dict[str, Any], platform: str) -> List[str]:
- """字段映射:images / image_url_list / cover_url"""
- # 优先 images 字段
- if post.get("images"):
- imgs = post["images"]
- if isinstance(imgs, list) and imgs:
- return [i for i in imgs if i]
- # 其次 image_url_list
- if post.get("image_url_list"):
- raw = post["image_url_list"]
- if isinstance(raw, list):
- result = []
- for item in raw:
- if isinstance(item, dict):
- result.append(item.get("image_url") or "")
- else:
- result.append(item or "")
- result = [u for u in result if u]
- if result:
- return result
- # 最后兜底 cover_url
- if post.get("cover_url"):
- return [post["cover_url"]]
- return []
- def _parse_published_at(timestamp_str: str) -> Optional[str]:
- """
- 解析 publish_timestamp 为 ISO 8601 格式(timestamptz)
- 支持格式:
- - "2026-01-09 15:53:00"
- - "2026-01-09T15:53:00"
- - 空字符串或 None 返回 None
- Returns:
- ISO 8601 格式字符串 (e.g., "2026-01-09T15:53:00+00:00") 或 None
- """
- if not timestamp_str or not isinstance(timestamp_str, str):
- return None
- timestamp_str = timestamp_str.strip()
- if not timestamp_str:
- return None
- try:
- from datetime import datetime, timezone
- # 尝试解析常见格式
- for fmt in [
- "%Y-%m-%d %H:%M:%S",
- "%Y-%m-%dT%H:%M:%S",
- "%Y-%m-%d %H:%M:%S.%f",
- "%Y-%m-%dT%H:%M:%S.%f",
- ]:
- try:
- dt = datetime.strptime(timestamp_str, fmt)
- # 假设输入是 UTC 时间,添加时区信息
- dt = dt.replace(tzinfo=timezone.utc)
- # 返回 ISO 8601 格式
- return dt.isoformat()
- except ValueError:
- continue
- # 如果都失败了,返回 None
- return None
- except Exception:
- return None
- # ── 单条记录标准化 ────────────────────────────────────────────────────────────
- async def normalize_source_item(
- source_item: Dict[str, Any],
- index: int,
- upload_cache: Dict[str, str],
- images_dir: Path,
- ) -> Dict[str, Any]:
- """
- 将单条 source item 转换为标准化的 case 格式
- """
- # 从 source item 提取字段
- platform = source_item.get("platform", "")
- post = source_item.get("post", {})
- case_id = source_item.get("case_id", f"{platform}_{source_item.get('channel_content_id', '')}")
- body = _extract_body(post, platform)
- title = post.get("title") or source_item.get("title") or post.get("desc") or (body[:30] + "..." if body else "") or case_id
- author = _extract_author(post, platform)
- url = _extract_url(post, platform) or source_item.get("source_url", "")
- # 收集反馈数据(兼容不同平台,没有的字段填 None)
- feedback = {
- "like_count": post.get("like_count") if post.get("like_count") is not None else None,
- "collect_count": post.get("collect_count") if post.get("collect_count") is not None else None,
- "comment_count": post.get("comment_count") if post.get("comment_count") is not None else None,
- "share_count": post.get("share_count") if post.get("share_count") is not None else None,
- }
- # 用于 note 字段的简化显示
- likes = feedback["like_count"] or 0
- comments = feedback["comment_count"] or 0
- # 解析发布时间
- publish_timestamp = post.get("publish_timestamp", "")
- published_at = _parse_published_at(publish_timestamp)
- # 处理图片:下载到本地 + 上传 OSS
- raw_images = _extract_raw_images(post, platform)
- images: List[str] = []
- case_dir = images_dir / case_id
- case_dir.mkdir(parents=True, exist_ok=True)
- for idx, img_url in enumerate(raw_images):
- ext = _ext_from_path(img_url)
- local_path = case_dir / f"{idx:02d}.{ext}"
- try:
- # 下载到本地
- if not local_path.exists():
- print(f" 📥 [{idx+1}/{len(raw_images)}] 下载图片...")
- from agent.tools.builtin.file.image_cdn import _download_image
- data = await _download_image(img_url)
- local_path.write_bytes(data)
- print(f" 📥 [{idx+1}/{len(raw_images)}] 已保存 {local_path.name} ({len(data)} bytes)")
- else:
- print(f" 📁 [{idx+1}/{len(raw_images)}] 本地已存在 {local_path.name}")
- # 上传到 OSS
- if _is_oss_url(img_url):
- images.append(img_url)
- print(f" ☁️ [{idx+1}/{len(raw_images)}] 已是 CDN URL")
- else:
- print(f" ☁️ [{idx+1}/{len(raw_images)}] 上传 OSS...")
- cdn_url = await ensure_oss_url(img_url, upload_cache)
- images.append(cdn_url)
- print(f" ☁️ [{idx+1}/{len(raw_images)}] 上传完成")
- except Exception as e:
- print(f" ⚠ [{idx+1}/{len(raw_images)}] 图片处理失败: {str(e)[:60]}")
- images.append(img_url)
- # 兜底:对 body 里的外链图片也替换为 CDN
- try:
- from agent.tools.builtin.file.image_cdn import replace_image_urls
- body = await replace_image_urls(body)
- except Exception:
- pass
- cover = images[0] if images else ""
- return {
- "index": index,
- "category": "",
- "user_kept": False,
- "user_comment": "",
- "description": "",
- "method": "",
- "cover": cover,
- "title": title,
- "author": author,
- "body": body,
- "images": images,
- "url": url,
- "note": f"platform={platform} | likes={likes} | comments={comments}",
- "published_at": published_at, # bigint, nullable
- "feedback": feedback,
- "_raw": {
- "case_id": case_id,
- "platform": platform,
- "channel_content_id": source_item.get("channel_content_id", ""),
- },
- "workflow": None,
- "capability": None,
- }
- # ── 主入口 ────────────────────────────────
- async def generate_case_from_source(
- raw_cases_dir: Path,
- output_file: Optional[Path] = None,
- ) -> Dict[str, Any]:
- """
- 从 raw_cases/source.json 生成标准化的 case.json
- 如果 case.json 已存在,会保留已有的 workflow 和 capability
- """
- raw_cases_dir = Path(raw_cases_dir)
- source_file = raw_cases_dir / "source.json"
- if not source_file.exists():
- raise FileNotFoundError(f"source.json not found: {source_file}")
- # 读取 source.json
- with open(source_file, "r", encoding="utf-8") as f:
- source_data = json.load(f)
- sources = source_data.get("sources", [])
- print(f"Processing {len(sources)} sources...")
- # 读取已有的 case.json(如果存在)
- if output_file is None:
- output_file = raw_cases_dir.parent / "case.json"
- existing_cases = {}
- if output_file.exists():
- try:
- with open(output_file, "r", encoding="utf-8") as f:
- existing_data = json.load(f)
- existing_list = existing_data.get("cases", [])
- # 建立 case_id -> case 的映射
- for case in existing_list:
- case_id = case.get("_raw", {}).get("case_id")
- if case_id:
- existing_cases[case_id] = case
- print(f"Found {len(existing_cases)} existing cases, will preserve workflow and capability")
- except Exception as e:
- print(f"Warning: Failed to read existing case.json: {e}")
- # 准备图片目录
- images_dir = raw_cases_dir / "images"
- images_dir.mkdir(parents=True, exist_ok=True)
- # 标准化所有 source items
- cases: List[Dict[str, Any]] = []
- upload_cache: Dict[str, str] = {}
- for idx, source_item in enumerate(sources, 1):
- try:
- case = await normalize_source_item(
- source_item=source_item,
- index=idx,
- upload_cache=upload_cache,
- images_dir=images_dir,
- )
- # 如果已有该 case,保留其 workflow_groups
- case_id = case.get("_raw", {}).get("case_id")
- if case_id and case_id in existing_cases:
- existing = existing_cases[case_id]
- if existing.get("workflow_groups") is not None:
- case["workflow_groups"] = existing["workflow_groups"]
- print(f" [{idx}] {case['title'][:40]} (preserved workflow_groups)")
- else:
- print(f" [{idx}] {case['title'][:40]}")
- cases.append(case)
- except Exception as e:
- print(f" [{idx}] ✗ 失败: {e}")
- # 输出 case.json
- output_data = {
- "total": len(cases),
- "cases": cases,
- }
- output_file.parent.mkdir(parents=True, exist_ok=True)
- with open(output_file, "w", encoding="utf-8") as f:
- json.dump(output_data, f, ensure_ascii=False, indent=2)
- return {
- "total_cases": len(cases),
- "output_file": str(output_file),
- }
- if __name__ == "__main__":
- import sys
- if len(sys.argv) < 2:
- print("Usage: python generate_case.py <raw_cases_dir>")
- sys.exit(1)
- raw_cases_dir = Path(sys.argv[1])
- stats = asyncio.run(generate_case_from_source(raw_cases_dir))
- print(f"\n✓ Generated {stats['total_cases']} cases")
- print(f"→ {stats['output_file']}")
|