| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- """
- 测试脚本:视频下载、上传到Gemini并进行分析
- """
- import json
- import sys
- import os
- import time
- import requests
- import re
- from pathlib import Path
- from typing import Optional
- # 添加项目根目录到路径
- project_root = Path(__file__).parent.parent
- sys.path.insert(0, str(project_root))
- # 手动加载.env文件
- def load_env_file(env_path):
- """手动加载.env文件"""
- if not env_path.exists():
- return False
- with open(env_path, 'r') as f:
- for line in f:
- line = line.strip()
- if not line or line.startswith('#'):
- continue
- if '=' in line:
- key, value = line.split('=', 1)
- os.environ[key.strip()] = value.strip()
- return True
- env_path = project_root / ".env"
- load_env_file(env_path)
- from google import genai
- client = genai.Client()
- # Gemini 文件处理相关常量
- FILE_PROCESS_TIMEOUT = 300
- API_RETRY_INTERVAL = 5
- def load_video_data(directory: str) -> dict:
- """加载视频详情.json文件
-
- Args:
- directory: 目录名(如"53009047")
-
- Returns:
- 视频详情字典
- """
- video_data_path = Path(__file__).parent / directory / "视频详情.json"
- if not video_data_path.exists():
- raise FileNotFoundError(f"未找到视频详情文件:{video_data_path}")
-
- with open(video_data_path, "r", encoding="utf-8") as f:
- data = json.load(f)
-
- return data
- def download_video(video_url: str, output_dir: Path, filename: str = None) -> Optional[str]:
- """下载视频到指定目录"""
- try:
- output_dir.mkdir(parents=True, exist_ok=True)
-
- if not filename:
- filename = video_url.split('/')[-1].split('?')[0]
- if not filename.endswith('.mp4'):
- filename = 'video.mp4'
-
- output_path = output_dir / filename
- print(f"下载视频: {output_path.name}")
-
- response = requests.get(video_url, stream=True, timeout=120)
- response.raise_for_status()
-
- with open(output_path, 'wb') as f:
- for chunk in response.iter_content(chunk_size=8192):
- if chunk:
- f.write(chunk)
-
- return str(output_path)
- except Exception as e:
- print(f"❌ 下载视频失败: {e}")
- return None
- def wait_for_file_processing(uploaded_file: "genai.types.File") -> Optional["genai.types.File"]:
- """等待 Gemini 完成文件处理"""
- start = time.time()
- current = uploaded_file
-
- if not hasattr(current, 'state'):
- return current
-
- state_name = getattr(current.state, 'name', None) if hasattr(current.state, 'name') else str(current.state)
-
- if state_name != "PROCESSING":
- if state_name == "FAILED":
- print(f"❌ 文件处理失败: {current.state}")
- return None
- elif state_name in ["ACTIVE", "COMPLETE", "READY"]:
- return current
-
- print("等待文件处理...", end="", flush=True)
-
- while True:
- elapsed = time.time() - start
- if elapsed > FILE_PROCESS_TIMEOUT:
- print(f"\n❌ 文件处理超时(超过 {FILE_PROCESS_TIMEOUT} 秒)")
- return None
-
- time.sleep(API_RETRY_INTERVAL)
-
- if hasattr(current, 'name'):
- try:
- current = client.files.get(name=current.name)
- if not hasattr(current, 'state'):
- break
-
- state_name = getattr(current.state, 'name', None) if hasattr(current.state, 'name') else str(current.state)
-
- if state_name == "FAILED":
- print(f"\n❌ 文件处理失败: {current.state}")
- return None
- elif state_name in ["ACTIVE", "COMPLETE", "READY"]:
- break
- except Exception as e:
- print(f"\n❌ 获取文件状态失败: {e}")
- return None
- else:
- return None
-
- print(" 完成")
- return current
- def upload_to_gemini(local_file_path: str) -> Optional["genai.types.File"]:
- """上传视频到Gemini"""
- try:
- print("上传视频到Gemini...", end=" ", flush=True)
- uploaded_file = client.files.upload(file=local_file_path)
-
- processed_file = wait_for_file_processing(uploaded_file)
- if not processed_file:
- raise Exception("文件处理失败")
-
- print("完成")
- return processed_file
- except Exception as e:
- print(f"\n❌ 上传失败: {e}")
- return None
- def analyze_video_outline(file_obj: "genai.types.File", video_title: str) -> Optional[dict]:
- """使用Gemini进行视频大纲理解"""
- try:
- system_prompt = """
- ## 角色
- 你是一个专业的视频内容分析助手。请分析视频内容,提取出视频的大纲信息,需要有明确的时间段分割和汇总。
- ## 任务
- 逐帧分析视频内容,提取出视频的大纲信息,需要有明确的时间段分割和汇总,每个时间段需要有内容摘要和最核心的内容点20个字概括,以及口播语音(需要有时间戳)。
- ## 输出格式
- {
- "title": "视频标题/主题",
- "summary": "视频内容摘要(100-200字)",
- "words": "视频口播文案",
- "summaries": {
- "时间段1": {
- "summary": "时间段1的内容摘要和最核心的内容点20个字概括",
- "words": "视频时间段1的口播语音(需要有时间戳)"
- },
- "时间段2": {
- "summary": "时间段1的内容摘要和最核心的内容点20个字概括",
- "words": "视频时间段2的口播语音(需要有时间戳)"
- }
- },
- "duration_estimate": "视频时长估算(如果可识别)",
- "category": "视频类别/分类"
- }
- """
-
- print("分析视频大纲...", end=" ", flush=True)
- response = client.models.generate_content(model="gemini-2.5-flash", contents=[file_obj, system_prompt])
-
- if hasattr(response, 'text'):
- content = response.text.strip()
- elif hasattr(response, 'content'):
- if isinstance(response.content, str):
- content = response.content.strip()
- elif hasattr(response.content, 'text'):
- content = response.content.text.strip()
- else:
- content = str(response.content).strip()
- else:
- content = str(response).strip()
-
- print("完成")
-
- try:
- json_match = re.search(r'\{[\s\S]*\}', content)
- if json_match:
- return json.loads(json_match.group())
- else:
- return {"raw_response": content, "parsed": False}
- except json.JSONDecodeError:
- return {"raw_response": content, "parsed": False}
-
- except Exception as e:
- print(f"\n❌ 分析失败: {e}")
- return None
- def main():
- """主函数"""
- directory = "53009047"
- current_dir = Path(__file__).parent
-
- # 读取视频详情
- try:
- video_data = load_video_data(directory)
- video_url = video_data.get("video", "")
- title = video_data.get("title", "")
-
- if not video_url:
- print(f"❌ 视频详情文件中未找到video字段")
- return
- except Exception as e:
- print(f"❌ 读取视频详情失败: {e}")
- return
-
- # 下载视频
- video_filename = f"{directory}.mp4"
- video_path = download_video(video_url, current_dir, video_filename)
- if not video_path:
- return
-
- # 上传到Gemini
- file_obj = upload_to_gemini(video_path)
- if not file_obj:
- return
-
- # 分析视频大纲
- analysis_result = analyze_video_outline(file_obj, title)
-
- # 保存结果
- if analysis_result:
- output_path = current_dir / f"{directory}_analysis_result.json"
- with open(output_path, "w", encoding="utf-8") as f:
- json.dump(analysis_result, f, ensure_ascii=False, indent=2)
- print(f"结果已保存: {output_path}")
- else:
- print("❌ 未能获取分析结果")
- if __name__ == "__main__":
- main()
|