#!/usr/bin/env python # -*- coding: utf-8 -*- """ 测试脚本:视频下载、上传到Gemini并进行分析 """ import json import sys import os import time import requests import re from pathlib import Path from typing import Optional # 添加项目根目录到路径 project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) # 手动加载.env文件 def load_env_file(env_path): """手动加载.env文件""" if not env_path.exists(): return False with open(env_path, 'r') as f: for line in f: line = line.strip() if not line or line.startswith('#'): continue if '=' in line: key, value = line.split('=', 1) os.environ[key.strip()] = value.strip() return True env_path = project_root / ".env" load_env_file(env_path) from google import genai client = genai.Client() # Gemini 文件处理相关常量 FILE_PROCESS_TIMEOUT = 300 API_RETRY_INTERVAL = 5 def load_video_data(directory: str) -> dict: """加载视频详情.json文件 Args: directory: 目录名(如"53009047") Returns: 视频详情字典 """ video_data_path = Path(__file__).parent / directory / "视频详情.json" if not video_data_path.exists(): raise FileNotFoundError(f"未找到视频详情文件:{video_data_path}") with open(video_data_path, "r", encoding="utf-8") as f: data = json.load(f) return data def download_video(video_url: str, output_dir: Path, filename: str = None) -> Optional[str]: """下载视频到指定目录""" try: output_dir.mkdir(parents=True, exist_ok=True) if not filename: filename = video_url.split('/')[-1].split('?')[0] if not filename.endswith('.mp4'): filename = 'video.mp4' output_path = output_dir / filename print(f"下载视频: {output_path.name}") response = requests.get(video_url, stream=True, timeout=120) response.raise_for_status() with open(output_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) return str(output_path) except Exception as e: print(f"❌ 下载视频失败: {e}") return None def wait_for_file_processing(uploaded_file: "genai.types.File") -> Optional["genai.types.File"]: """等待 Gemini 完成文件处理""" start = time.time() current = uploaded_file if not hasattr(current, 'state'): return current state_name = getattr(current.state, 'name', None) if hasattr(current.state, 'name') else str(current.state) if state_name != "PROCESSING": if state_name == "FAILED": print(f"❌ 文件处理失败: {current.state}") return None elif state_name in ["ACTIVE", "COMPLETE", "READY"]: return current print("等待文件处理...", end="", flush=True) while True: elapsed = time.time() - start if elapsed > FILE_PROCESS_TIMEOUT: print(f"\n❌ 文件处理超时(超过 {FILE_PROCESS_TIMEOUT} 秒)") return None time.sleep(API_RETRY_INTERVAL) if hasattr(current, 'name'): try: current = client.files.get(name=current.name) if not hasattr(current, 'state'): break state_name = getattr(current.state, 'name', None) if hasattr(current.state, 'name') else str(current.state) if state_name == "FAILED": print(f"\n❌ 文件处理失败: {current.state}") return None elif state_name in ["ACTIVE", "COMPLETE", "READY"]: break except Exception as e: print(f"\n❌ 获取文件状态失败: {e}") return None else: return None print(" 完成") return current def upload_to_gemini(local_file_path: str) -> Optional["genai.types.File"]: """上传视频到Gemini""" try: print("上传视频到Gemini...", end=" ", flush=True) uploaded_file = client.files.upload(file=local_file_path) processed_file = wait_for_file_processing(uploaded_file) if not processed_file: raise Exception("文件处理失败") print("完成") return processed_file except Exception as e: print(f"\n❌ 上传失败: {e}") return None def analyze_video_outline(file_obj: "genai.types.File", video_title: str) -> Optional[dict]: """使用Gemini进行视频大纲理解""" try: system_prompt = """ ## 角色 你是一个专业的视频内容分析助手。请分析视频内容,提取出视频的大纲信息,需要有明确的时间段分割和汇总。 ## 任务 逐帧分析视频内容,提取出视频的大纲信息,需要有明确的时间段分割和汇总,每个时间段需要有内容摘要和最核心的内容点20个字概括,以及口播语音(需要有时间戳)。 ## 输出格式 { "title": "视频标题/主题", "summary": "视频内容摘要(100-200字)", "words": "视频口播文案", "summaries": { "时间段1": { "summary": "时间段1的内容摘要和最核心的内容点20个字概括", "words": "视频时间段1的口播语音(需要有时间戳)" }, "时间段2": { "summary": "时间段1的内容摘要和最核心的内容点20个字概括", "words": "视频时间段2的口播语音(需要有时间戳)" } }, "duration_estimate": "视频时长估算(如果可识别)", "category": "视频类别/分类" } """ print("分析视频大纲...", end=" ", flush=True) response = client.models.generate_content(model="gemini-2.5-flash", contents=[file_obj, system_prompt]) if hasattr(response, 'text'): content = response.text.strip() elif hasattr(response, 'content'): if isinstance(response.content, str): content = response.content.strip() elif hasattr(response.content, 'text'): content = response.content.text.strip() else: content = str(response.content).strip() else: content = str(response).strip() print("完成") try: json_match = re.search(r'\{[\s\S]*\}', content) if json_match: return json.loads(json_match.group()) else: return {"raw_response": content, "parsed": False} except json.JSONDecodeError: return {"raw_response": content, "parsed": False} except Exception as e: print(f"\n❌ 分析失败: {e}") return None def main(): """主函数""" directory = "53009047" current_dir = Path(__file__).parent # 读取视频详情 try: video_data = load_video_data(directory) video_url = video_data.get("video", "") title = video_data.get("title", "") if not video_url: print(f"❌ 视频详情文件中未找到video字段") return except Exception as e: print(f"❌ 读取视频详情失败: {e}") return # 下载视频 video_filename = f"{directory}.mp4" video_path = download_video(video_url, current_dir, video_filename) if not video_path: return # 上传到Gemini file_obj = upload_to_gemini(video_path) if not file_obj: return # 分析视频大纲 analysis_result = analyze_video_outline(file_obj, title) # 保存结果 if analysis_result: output_path = current_dir / f"{directory}_analysis_result.json" with open(output_path, "w", encoding="utf-8") as f: json.dump(analysis_result, f, ensure_ascii=False, indent=2) print(f"结果已保存: {output_path}") else: print("❌ 未能获取分析结果") if __name__ == "__main__": main()