#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 视频识别脚本 主要功能:使用 Gemini API 分析视频内容 """ import os import json import time import sys from typing import Dict, Any, List, Optional from dotenv import load_dotenv # 导入自定义模块 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from gemini import GeminiProcessor class VideoIdentifier: def __init__(self): # 加载环境变量 load_dotenv() # 初始化Gemini客户端 self.gemini = GeminiProcessor() # 系统提示词 self.video_system_prompt = """你是一个专业的视频内容分析专家。请分析视频中的内容,包括: 1. 视频的主要内容和主题 2. 视频中的文字内容(如果有) 3. 视频的风格和特点 4. 视频可能表达的情感或意图 5. 视频的背景音乐或语音内容(如果有) 请用简洁、准确的语言描述视频内容,重点关注文字内容和主要视觉元素。""" def extract_video_urls(self, formatted_content: Dict[str, Any]) -> List[Dict[str, Any]]: """提取视频URL列表""" video_data = [] video_url_list = formatted_content.get('video_url_list', []) for video_item in video_url_list: if isinstance(video_item, dict) and 'video_url' in video_item: video_data.append({ 'url': video_item['video_url'], 'duration': video_item.get('video_duration', 0) }) return video_data def analyze_videos_with_gemini(self, video_data: List[Dict[str, Any]]) -> Dict[str, Any]: """使用Gemini API分析视频内容""" try: if not video_data: return {"videos_comprehension": [], "error": "没有视频需要分析"} print(f"正在使用Gemini API分析 {len(video_data)} 个视频...") videos_comprehension = [] for i, video in enumerate(video_data): print(f" 分析视频 {i+1}/{len(video_data)}: {video['url'][:50]}...") # 构建分析提示 prompt = f"""请分析以下视频内容: 视频时长: {video['duration']}秒 视频链接: {video['url']} 请从以下角度分析视频内容: 1. 视频的主要内容和主题 2. 视频中的文字内容(如果有) 3. 视频的风格和特点 4. 视频可能表达的情感或意图 5. 视频的背景音乐或语音内容(如果有) 请用简洁、准确的语言描述视频内容。""" # 调用Gemini API try: response = self.gemini.process( content=prompt, system_prompt=self.video_system_prompt, model_name="gemini-2.5-flash" ) if response: videos_comprehension.append({ 'video_url': video['url'], 'duration': video['duration'], 'comprehension': response, 'analysis_timestamp': int(time.time() * 1000) }) else: videos_comprehension.append({ 'video_url': video['url'], 'duration': video['duration'], 'comprehension': 'Gemini API分析失败', 'analysis_timestamp': int(time.time() * 1000) }) # 添加延迟避免API限制 time.sleep(1) except Exception as e: print(f" 视频 {i+1} 分析失败: {e}") videos_comprehension.append({ 'video_url': video['url'], 'duration': video['duration'], 'comprehension': f'分析失败: {str(e)}', 'analysis_timestamp': int(time.time() * 1000) }) return {"videos_comprehension": videos_comprehension} except Exception as e: print(f"Gemini API调用失败: {e}") return {"videos_comprehension": [], "error": f"Gemini API调用失败: {str(e)}"} def process_videos(self, formatted_content: Dict[str, Any]) -> Dict[str, Any]: """处理视频识别的主函数""" print("开始视频识别处理...") # 提取视频URL video_data = self.extract_video_urls(formatted_content) print(f"提取到 {len(video_data)} 个视频") if not video_data: print("没有视频需要分析") return {"videos_comprehension": [], "error": "没有视频需要分析"} # 分析视频 result = self.analyze_videos_with_gemini(video_data) if result.get("videos_comprehension"): print(f"视频识别完成,共分析 {len(result['videos_comprehension'])} 个视频") else: print("视频识别失败") return result def main(): """测试函数""" # 模拟数据 test_content = { "video_url_list": [ { "video_url": "http://example.com/video1.mp4", "video_duration": 30 } ] } identifier = VideoIdentifier() result = identifier.process_videos( test_content["title"], test_content["body_text"], test_content ) print(f"识别结果: {json.dumps(result, ensure_ascii=False, indent=2)}") if __name__ == '__main__': main()