import os import time import uuid from typing import Optional, Tuple import cv2 import google.generativeai as genai import orjson import requests from google.generativeai.types import (HarmBlockThreshold, HarmCategory) from loguru import logger CACHE_DIR = '/Users/z/Downloads/' PROXY_ADDR = 'http://localhost:1081' os.environ['http_proxy'] = PROXY_ADDR os.environ['https_proxy'] = PROXY_ADDR class GoogleAI(object): @classmethod def get_video_duration(cls, video_link: str) -> int: cap = cv2.VideoCapture(video_link) if cap.isOpened(): rate = cap.get(5) frame_num = cap.get(7) duration = int(frame_num / rate) return duration return 0 @classmethod def download_video(cls, video_link: str) -> Optional[str]: file_path = os.path.join(CACHE_DIR, f'{str(uuid.uuid4())}.mp4') for _ in range(3): try: response = requests.get(url=video_link, timeout=60) if response.status_code == 200: with open(file_path, 'wb') as f: f.write(response.content) logger.info(f'[内容分析] 视频链接: {video_link}, 存储地址: {file_path}') return file_path except Exception: time.sleep(1) continue return @classmethod def run(cls, api_key, video_url): try: genai.configure(api_key=api_key) video_path = cls.download_video(video_link=video_url) if not video_path: logger.error(f'[内容分析] 视频下载失败, 跳过任务') if os.path.exists(video_path): os.remove(video_path) logger.info(f"[内容分析] 文件已删除: {video_path}") return "视频下载失败" video = genai.upload_file(path=video_path, mime_type='video/mp4') while video.state.name == 'PROCESSING': time.sleep(1) video = genai.get_file(name=video.name) if video.state.name != 'ACTIVE': genai.delete_file(name=video.name) return model = genai.GenerativeModel( model_name='gemini-1.5-flash', generation_config=genai.GenerationConfig(response_mime_type='application/json'), safety_settings={ HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE, }, ) response = model.generate_content( contents=[ video, "你是一名专业的短视频分析师,请你输出这个视频的完整口播,只输出文字即可。使用一下JSON格式输出:{'text': string}", ], stream=False, request_options={ 'timeout': 600, }, ) text = orjson.loads(response.text.strip())['text'] genai.delete_file(name=video.name) os.remove(video_path) return text except Exception as e: logger.error(f"[内容分析] 处理异常,异常信息{e}") return if __name__ == '__main__': GoogleAI.run("AIzaSyAwGqthDADh5NPVe3BMcOJBQkJaf0HWBuQ", "http://rescdn.yishihui.com/jq_oss/video/2025012215472528213")