import os import time import uuid from typing import Optional import google.generativeai as genai import orjson import requests from google.generativeai.types import (HarmBlockThreshold, HarmCategory) from loguru import logger import traceback from utils.json_utils import parse_general_json CACHE_DIR = '/app/cache/' # CACHE_DIR = '/Users/z/Downloads/' # PROXY_ADDR = 'http://localhost:1081' # os.environ['http_proxy'] = PROXY_ADDR # os.environ['https_proxy'] = PROXY_ADDR class GoogleAI(object): @classmethod def download_video(cls, video_link: str) -> Optional[str]: file_path = os.path.join(CACHE_DIR, f'{str(uuid.uuid4())}.mp4') for _ in range(3): try: response = requests.get(url=video_link, timeout=60) if response.status_code == 200: with open(file_path, 'wb') as f: f.write(response.content) logger.info(f'[内容分析] 视频链接: {video_link}, 存储地址: {file_path}') return file_path except Exception: time.sleep(1) continue return @classmethod def run(cls, api_key, video_path): try: genai.configure(api_key=api_key) video = genai.upload_file(path=video_path, mime_type='video/mp4') while video.state.name == 'PROCESSING': time.sleep(1) video = genai.get_file(name=video.name) if video.state.name != 'ACTIVE': genai.delete_file(name=video.name) return model = genai.GenerativeModel( model_name='gemini-1.5-flash', generation_config=genai.GenerationConfig(response_mime_type='application/json'), safety_settings={ HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE, }, ) response = model.generate_content( contents=[ video, '你是一名专业的短视频分析师,请你输出这个视频的完整口播,只输出文字即可。使用以下JSON格式输出,不要包含任何额外的解释、注释或非JSON文本:{"text": string}', ], stream=False, request_options={ 'timeout': 600, }, ) # 打印响应内容用于调试 logger.info(f"[+] 响应内容: {response.text}") # 使用通用 JSON 解析函数解析响应 text = parse_general_json(response.text, key='text') if text is None: return # text = orjson.loads(response.text.strip())['text'] genai.delete_file(name=video.name) return text except Exception as e: logger.error(f"[内容分析] 处理异常, 异常类型: {type(e).__name__}, 异常信息: {e}\n{traceback.format_exc()}") return if __name__ == '__main__': GoogleAI.run("AIzaSyAwGqthDADh5NPVe3BMcOJBQkJaf0HWBuQ", "http://rescdn.yishihui.com/jq_oss/video/2025012215472528213")