test.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. 测试脚本:视频下载、上传到Gemini并进行分析
  5. """
  6. import json
  7. import sys
  8. import os
  9. import time
  10. import requests
  11. import re
  12. from pathlib import Path
  13. from typing import Optional
  14. # 添加项目根目录到路径
  15. project_root = Path(__file__).parent.parent
  16. sys.path.insert(0, str(project_root))
  17. # 手动加载.env文件
  18. def load_env_file(env_path):
  19. """手动加载.env文件"""
  20. if not env_path.exists():
  21. return False
  22. with open(env_path, 'r') as f:
  23. for line in f:
  24. line = line.strip()
  25. if not line or line.startswith('#'):
  26. continue
  27. if '=' in line:
  28. key, value = line.split('=', 1)
  29. os.environ[key.strip()] = value.strip()
  30. return True
  31. env_path = project_root / ".env"
  32. load_env_file(env_path)
  33. from google import genai
  34. client = genai.Client()
  35. # Gemini 文件处理相关常量
  36. FILE_PROCESS_TIMEOUT = 300
  37. API_RETRY_INTERVAL = 5
  38. def load_video_data(directory: str) -> dict:
  39. """加载视频详情.json文件
  40. Args:
  41. directory: 目录名(如"53009047")
  42. Returns:
  43. 视频详情字典
  44. """
  45. video_data_path = Path(__file__).parent / directory / "视频详情.json"
  46. if not video_data_path.exists():
  47. raise FileNotFoundError(f"未找到视频详情文件:{video_data_path}")
  48. with open(video_data_path, "r", encoding="utf-8") as f:
  49. data = json.load(f)
  50. return data
  51. def download_video(video_url: str, output_dir: Path, filename: str = None) -> Optional[str]:
  52. """下载视频到指定目录"""
  53. try:
  54. output_dir.mkdir(parents=True, exist_ok=True)
  55. if not filename:
  56. filename = video_url.split('/')[-1].split('?')[0]
  57. if not filename.endswith('.mp4'):
  58. filename = 'video.mp4'
  59. output_path = output_dir / filename
  60. print(f"下载视频: {output_path.name}")
  61. response = requests.get(video_url, stream=True, timeout=120)
  62. response.raise_for_status()
  63. with open(output_path, 'wb') as f:
  64. for chunk in response.iter_content(chunk_size=8192):
  65. if chunk:
  66. f.write(chunk)
  67. return str(output_path)
  68. except Exception as e:
  69. print(f"❌ 下载视频失败: {e}")
  70. return None
  71. def wait_for_file_processing(uploaded_file: "genai.types.File") -> Optional["genai.types.File"]:
  72. """等待 Gemini 完成文件处理"""
  73. start = time.time()
  74. current = uploaded_file
  75. if not hasattr(current, 'state'):
  76. return current
  77. state_name = getattr(current.state, 'name', None) if hasattr(current.state, 'name') else str(current.state)
  78. if state_name != "PROCESSING":
  79. if state_name == "FAILED":
  80. print(f"❌ 文件处理失败: {current.state}")
  81. return None
  82. elif state_name in ["ACTIVE", "COMPLETE", "READY"]:
  83. return current
  84. print("等待文件处理...", end="", flush=True)
  85. while True:
  86. elapsed = time.time() - start
  87. if elapsed > FILE_PROCESS_TIMEOUT:
  88. print(f"\n❌ 文件处理超时(超过 {FILE_PROCESS_TIMEOUT} 秒)")
  89. return None
  90. time.sleep(API_RETRY_INTERVAL)
  91. if hasattr(current, 'name'):
  92. try:
  93. current = client.files.get(name=current.name)
  94. if not hasattr(current, 'state'):
  95. break
  96. state_name = getattr(current.state, 'name', None) if hasattr(current.state, 'name') else str(current.state)
  97. if state_name == "FAILED":
  98. print(f"\n❌ 文件处理失败: {current.state}")
  99. return None
  100. elif state_name in ["ACTIVE", "COMPLETE", "READY"]:
  101. break
  102. except Exception as e:
  103. print(f"\n❌ 获取文件状态失败: {e}")
  104. return None
  105. else:
  106. return None
  107. print(" 完成")
  108. return current
  109. def upload_to_gemini(local_file_path: str) -> Optional["genai.types.File"]:
  110. """上传视频到Gemini"""
  111. try:
  112. print("上传视频到Gemini...", end=" ", flush=True)
  113. uploaded_file = client.files.upload(file=local_file_path)
  114. processed_file = wait_for_file_processing(uploaded_file)
  115. if not processed_file:
  116. raise Exception("文件处理失败")
  117. print("完成")
  118. return processed_file
  119. except Exception as e:
  120. print(f"\n❌ 上传失败: {e}")
  121. return None
  122. def analyze_video_outline(file_obj: "genai.types.File", video_title: str) -> Optional[dict]:
  123. """使用Gemini进行视频大纲理解"""
  124. try:
  125. system_prompt = """
  126. ## 角色
  127. 你是一个专业的视频内容分析助手。请分析视频内容,提取出视频的大纲信息,需要有明确的时间段分割和汇总。
  128. ## 任务
  129. 逐帧分析视频内容,提取出视频的大纲信息,需要有明确的时间段分割和汇总,每个时间段需要有内容摘要和最核心的内容点20个字概括,以及口播语音(需要有时间戳)。
  130. ## 输出格式
  131. {
  132. "title": "视频标题/主题",
  133. "summary": "视频内容摘要(100-200字)",
  134. "words": "视频口播文案",
  135. "summaries": {
  136. "时间段1": {
  137. "summary": "时间段1的内容摘要和最核心的内容点20个字概括",
  138. "words": "视频时间段1的口播语音(需要有时间戳)"
  139. },
  140. "时间段2": {
  141. "summary": "时间段1的内容摘要和最核心的内容点20个字概括",
  142. "words": "视频时间段2的口播语音(需要有时间戳)"
  143. }
  144. },
  145. "duration_estimate": "视频时长估算(如果可识别)",
  146. "category": "视频类别/分类"
  147. }
  148. """
  149. print("分析视频大纲...", end=" ", flush=True)
  150. response = client.models.generate_content(model="gemini-2.5-flash", contents=[file_obj, system_prompt])
  151. if hasattr(response, 'text'):
  152. content = response.text.strip()
  153. elif hasattr(response, 'content'):
  154. if isinstance(response.content, str):
  155. content = response.content.strip()
  156. elif hasattr(response.content, 'text'):
  157. content = response.content.text.strip()
  158. else:
  159. content = str(response.content).strip()
  160. else:
  161. content = str(response).strip()
  162. print("完成")
  163. try:
  164. json_match = re.search(r'\{[\s\S]*\}', content)
  165. if json_match:
  166. return json.loads(json_match.group())
  167. else:
  168. return {"raw_response": content, "parsed": False}
  169. except json.JSONDecodeError:
  170. return {"raw_response": content, "parsed": False}
  171. except Exception as e:
  172. print(f"\n❌ 分析失败: {e}")
  173. return None
  174. def main():
  175. """主函数"""
  176. directory = "53009047"
  177. current_dir = Path(__file__).parent
  178. # 读取视频详情
  179. try:
  180. video_data = load_video_data(directory)
  181. video_url = video_data.get("video", "")
  182. title = video_data.get("title", "")
  183. if not video_url:
  184. print(f"❌ 视频详情文件中未找到video字段")
  185. return
  186. except Exception as e:
  187. print(f"❌ 读取视频详情失败: {e}")
  188. return
  189. # 下载视频
  190. video_filename = f"{directory}.mp4"
  191. video_path = download_video(video_url, current_dir, video_filename)
  192. if not video_path:
  193. return
  194. # 上传到Gemini
  195. file_obj = upload_to_gemini(video_path)
  196. if not file_obj:
  197. return
  198. # 分析视频大纲
  199. analysis_result = analyze_video_outline(file_obj, title)
  200. # 保存结果
  201. if analysis_result:
  202. output_path = current_dir / f"{directory}_analysis_result.json"
  203. with open(output_path, "w", encoding="utf-8") as f:
  204. json.dump(analysis_result, f, ensure_ascii=False, indent=2)
  205. print(f"结果已保存: {output_path}")
  206. else:
  207. print("❌ 未能获取分析结果")
  208. if __name__ == "__main__":
  209. main()