run_batch_script_v2.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. 批量运行脚本解构工作流 V2 (ScriptWorkflowV2)。
  5. 读取 examples/demo.json 中的视频列表,
  6. 将每条里的 video / channel_content_id 传入 ScriptWorkflowV2,
  7. 并将解构结果逐条写入 examples/output_demo_script_v2.json(增量写入)。
  8. """
  9. import json
  10. import sys
  11. from datetime import datetime
  12. from pathlib import Path
  13. from typing import Dict, Any, List
  14. # 添加项目根目录到路径
  15. project_root = Path(__file__).parent.parent
  16. sys.path.insert(0, str(project_root))
  17. from src.workflows.script_workflow_v2 import ScriptWorkflowV2
  18. from src.utils.logger import get_logger
  19. logger = get_logger(__name__)
  20. def load_json(path: Path):
  21. """通用 JSON 读取,支持列表或字典。"""
  22. if not path.exists():
  23. return None
  24. with path.open("r", encoding="utf-8") as f:
  25. return json.load(f)
  26. def save_json(path: Path, data) -> None:
  27. """安全写入 JSON(先写临时文件,再替换)。"""
  28. tmp_path = path.with_suffix(".tmp")
  29. with tmp_path.open("w", encoding="utf-8") as f:
  30. json.dump(data, f, ensure_ascii=False, indent=2)
  31. tmp_path.replace(path)
  32. def main() -> None:
  33. base_dir = Path(__file__).parent
  34. input_path = base_dir / "demo.json"
  35. output_path = base_dir / "output_demo_script_v2.json"
  36. if not input_path.exists():
  37. raise FileNotFoundError(f"找不到输入文件: {input_path}")
  38. # 读取 demo.json(为一个列表)
  39. raw_list = load_json(input_path)
  40. if not isinstance(raw_list, list):
  41. raise ValueError(f"demo.json 格式错误,应为列表,实际类型: {type(raw_list)}")
  42. # 读取已有输出,支持增量
  43. output_data = load_json(output_path)
  44. if not output_data:
  45. output_data = {
  46. "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
  47. "total": 0,
  48. "success_count": 0,
  49. "fail_count": 0,
  50. "results": [],
  51. }
  52. existing_results: List[Dict[str, Any]] = output_data.get("results", []) or []
  53. # 用 channel_content_id + video URL 去重,避免重复处理
  54. processed_keys = {
  55. f"{item.get('video_data', {}).get('channel_content_id','')}|"
  56. f"{item.get('video_data', {}).get('video','')}"
  57. for item in existing_results
  58. }
  59. workflow = ScriptWorkflowV2()
  60. for item in raw_list:
  61. video_data = item or {}
  62. channel_content_id = video_data.get("channel_content_id", "")
  63. video_url = video_data.get("video", "")
  64. key = f"{channel_content_id}|{video_url}"
  65. if key in processed_keys:
  66. logger.info(f"已处理过该视频,跳过: {key}")
  67. continue
  68. logger.info(
  69. f"处理视频: channel_content_id={channel_content_id} title={video_data.get('title','')}"
  70. )
  71. try:
  72. # ScriptWorkflowV2 只需要 video 和 channel_content_id
  73. script_input = {
  74. "video": video_url,
  75. "channel_content_id": channel_content_id,
  76. }
  77. script_result = workflow.invoke(script_input)
  78. record = {
  79. "video_data": video_data,
  80. "script_result": script_result,
  81. "success": True,
  82. "error": None,
  83. }
  84. output_data["success_count"] = output_data.get("success_count", 0) + 1
  85. except Exception as e:
  86. logger.error(f"脚本解构 V2 处理失败: {e}", exc_info=True)
  87. record = {
  88. "video_data": video_data,
  89. "script_result": None,
  90. "success": False,
  91. "error": str(e),
  92. }
  93. output_data["fail_count"] = output_data.get("fail_count", 0) + 1
  94. output_data["results"].append(record)
  95. output_data["total"] = output_data.get("total", 0) + 1
  96. # 处理完一条就保存一次,避免长任务中途失败导致全部丢失
  97. save_json(output_path, output_data)
  98. logger.info(
  99. f"批量脚本解构 V2 完成: total={output_data.get('total')}, "
  100. f"success={output_data.get('success_count')}, fail={output_data.get('fail_count')}"
  101. )
  102. if __name__ == "__main__":
  103. main()