run_batch_script.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. 批量运行脚本理解工作流 (ScriptWorkflow)。
  5. 读取 examples/output_demo.json 中三点解构的结果,
  6. 将每条里的 video_data 和 result 组合后传入 ScriptWorkflow,
  7. 并将脚本理解结果增量写入 examples/output_demo_script.json。
  8. """
  9. import json
  10. import os
  11. import sys
  12. from datetime import datetime
  13. from pathlib import Path
  14. from typing import Dict, Any, List
  15. # 添加项目根目录到路径
  16. project_root = Path(__file__).parent.parent
  17. sys.path.insert(0, str(project_root))
  18. from src.workflows.script_workflow import ScriptWorkflow
  19. from src.utils.logger import get_logger
  20. logger = get_logger(__name__)
  21. def load_json(path: Path) -> Dict[str, Any]:
  22. if not path.exists():
  23. return {}
  24. with path.open("r", encoding="utf-8") as f:
  25. return json.load(f)
  26. def save_json(path: Path, data: Dict[str, Any]) -> None:
  27. tmp_path = path.with_suffix(".tmp")
  28. with tmp_path.open("w", encoding="utf-8") as f:
  29. json.dump(data, f, ensure_ascii=False, indent=2)
  30. tmp_path.replace(path)
  31. def build_script_input(video_data: Dict[str, Any], result: Dict[str, Any]) -> Dict[str, Any]:
  32. """根据 output_demo.json 单条结果构造 ScriptWorkflow 的输入结构。"""
  33. video_info = result.get("视频信息", {}) or {}
  34. three_points = result.get("三点解构", {}) or {}
  35. topic_understanding = result.get("选题理解", {}) or {}
  36. # 基本视频信息
  37. video_url = video_data.get("video") or video_info.get("视频URL", "")
  38. channel_content_id = video_data.get("channel_content_id", "")
  39. title = video_data.get("title") or video_info.get("标题", "")
  40. body_text = video_info.get("正文", "")
  41. # 三点解构映射到脚本理解需要的字段
  42. inspiration_points: List[Dict[str, Any]] = three_points.get("灵感点", []) or []
  43. purpose_block = three_points.get("目的点", {}) or {}
  44. purpose_points: List[Dict[str, Any]] = purpose_block.get("purposes", []) or []
  45. key_points_block = three_points.get("关键点", {}) or {}
  46. key_points: List[Dict[str, Any]] = key_points_block.get("key_points", []) or []
  47. input_data: Dict[str, Any] = {
  48. "video": video_url,
  49. "channel_content_id": channel_content_id,
  50. "text": {
  51. "title": title,
  52. "body": body_text,
  53. },
  54. "topic_selection_understanding": topic_understanding,
  55. "content_weight": {}, # 目前没有对应数据,留空
  56. "inspiration_points": inspiration_points,
  57. "purpose_points": purpose_points,
  58. "key_points": key_points,
  59. }
  60. return input_data
  61. def main() -> None:
  62. base_dir = Path(__file__).parent
  63. input_path = base_dir / "output_demo.json"
  64. output_path = base_dir / "output_demo_script.json"
  65. if not input_path.exists():
  66. raise FileNotFoundError(f"找不到输入文件: {input_path}")
  67. # 读取原始三点解构结果
  68. raw = load_json(input_path)
  69. raw_results: List[Dict[str, Any]] = raw.get("results", []) or []
  70. # 读取已有的脚本理解输出,支持增量追加
  71. output_data = load_json(output_path)
  72. if not output_data:
  73. output_data = {
  74. "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
  75. "total": 0,
  76. "success_count": 0,
  77. "fail_count": 0,
  78. "results": [],
  79. }
  80. existing_results: List[Dict[str, Any]] = output_data.get("results", []) or []
  81. # 用 channel_content_id + video URL 去重,避免重复处理
  82. processed_keys = {
  83. f"{item.get('video_data', {}).get('channel_content_id','')}|"
  84. f"{item.get('video_data', {}).get('video','')}"
  85. for item in existing_results
  86. }
  87. workflow = ScriptWorkflow()
  88. for item in raw_results:
  89. video_data = item.get("video_data", {}) or {}
  90. result = item.get("result", {}) or {}
  91. key = f"{video_data.get('channel_content_id','')}|{video_data.get('video','')}"
  92. if key in processed_keys:
  93. logger.info(f"已处理过该视频,跳过: {key}")
  94. continue
  95. logger.info(f"处理视频: channel_content_id={video_data.get('channel_content_id')} title={video_data.get('title')}")
  96. try:
  97. script_input = build_script_input(video_data, result)
  98. script_result = workflow.invoke(script_input)
  99. record = {
  100. "video_data": video_data,
  101. "what_deconstruction_result": result,
  102. "script_result": script_result,
  103. "success": True,
  104. "error": None,
  105. }
  106. output_data["success_count"] = output_data.get("success_count", 0) + 1
  107. except Exception as e:
  108. logger.error(f"脚本理解处理失败: {e}", exc_info=True)
  109. record = {
  110. "video_data": video_data,
  111. "what_deconstruction_result": result,
  112. "script_result": None,
  113. "success": False,
  114. "error": str(e),
  115. }
  116. output_data["fail_count"] = output_data.get("fail_count", 0) + 1
  117. output_data["results"].append(record)
  118. output_data["total"] = output_data.get("total", 0) + 1
  119. # 处理完一条就保存一次,避免长任务中途失败导致全部丢失
  120. save_json(output_path, output_data)
  121. logger.info(
  122. f"批量脚本理解完成: total={output_data.get('total')}, "
  123. f"success={output_data.get('success_count')}, fail={output_data.get('fail_count')}"
  124. )
  125. if __name__ == "__main__":
  126. main()
  127. # 脚本解构