| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- """
- 视频上传Function
- 功能: 下载视频到本地并上传至Gemini,保存上传链接到state
- """
- import os
- import time
- import tempfile
- import requests
- from pathlib import Path
- from urllib.parse import urlparse
- from typing import Dict, Any, Optional, Tuple
- from google import genai
- from src.components.functions.base import BaseFunction
- from src.states.what_deconstruction_state import WhatDeconstructionState
- from src.utils.logger import get_logger
- from src.utils.llm_invoker import LLMInvoker
- logger = get_logger(__name__)
- class VideoUploadFunction(BaseFunction[Dict[str, Any], Dict[str, Any]]):
- """视频上传函数
-
- 功能:
- - 从URL下载视频到本地
- - 上传视频到Gemini File API
- - 保存上传后的文件URI到state
- """
- def __init__(
- self,
- name: str = "video_upload_function",
- description: str = "下载视频并上传至Gemini,保存上传链接"
- ):
- super().__init__(name, description)
- def execute(
- self,
- input_data: Dict[str, Any],
- context: Optional[Dict[str, Any]] = None
- ) -> Dict[str, Any]:
- """执行视频上传
-
- Args:
- input_data: 包含video字段的状态字典
- context: 上下文信息
-
- Returns:
- 更新后的状态字典,包含video_uploaded_uri字段
- """
- try:
- video_url = input_data.get("video", "")
-
- if not video_url:
- logger.warning("未提供视频URL,跳过上传")
- return {
- **input_data,
- "video_uploaded_uri": None,
- "video_upload_error": "未提供视频URL"
- }
- logger.info(f"开始下载视频: {video_url}")
- # 1. 下载视频到本地(或使用examples/videos目录下的现有文件)
- # 从input_data中获取channel_content_id,用于查找examples/videos目录下的文件
- channel_content_id = input_data.get("channel_content_id", "")
- local_video_path, is_temp_file = self._download_video(video_url, channel_content_id)
-
- if not local_video_path:
- return {
- **input_data,
- "video_uploaded_uri": None,
- "video_upload_error": "视频下载失败"
- }
- logger.info(f"视频文件路径: {local_video_path}")
- # 2. 上传视频到Gemini(使用新的API,带state校验)
- video_file = LLMInvoker.upload_video_to_gemini(local_video_path)
-
- # # 3. 清理临时文件(只有临时文件才需要清理)
- # if is_temp_file:
- # try:
- # os.remove(local_video_path)
- # logger.info(f"临时文件已删除: {local_video_path}")
- # except Exception as e:
- # logger.warning(f"删除临时文件失败: {e}")
- # else:
- # logger.info(f"使用examples目录下的文件,不删除: {local_video_path}")
- if not video_file:
- return {
- **input_data,
- "video_uploaded_uri": None,
- "video_file_name": None,
- "video_upload_error": "视频上传到Gemini失败"
- }
- # 获取文件URI和名称
- file_uri = None
- file_name = None
- if hasattr(video_file, 'uri'):
- file_uri = video_file.uri
- elif hasattr(video_file, 'name'):
- file_name = video_file.name
- file_uri = f"https://generativelanguage.googleapis.com/v1beta/files/{file_name}"
-
- logger.info(f"视频上传成功,文件名称: {file_name}")
- # 4. 更新state
- return {
- **input_data,
- "video_uploaded_uri": file_uri, # 兼容旧版本
- "video_file_name": file_name, # 新字段,用于获取文件对象
- "video_upload_error": None
- }
- except Exception as e:
- logger.error(f"视频上传失败: {e}", exc_info=True)
- return {
- **input_data,
- "video_uploaded_uri": None,
- "video_file_name": None,
- "video_upload_error": str(e)
- }
- def _download_video(self, video_url: str, channel_content_id: str = "") -> Tuple[Optional[str], bool]:
- """下载视频到本地,或使用examples/videos目录下的现有文件
-
- Args:
- video_url: 视频URL
- channel_content_id: 频道内容ID,用于查找examples/videos目录下的文件
-
- Returns:
- (本地文件路径, 是否为临时文件) 的元组,失败返回 (None, True)
- 如果使用examples/videos目录下的文件,返回 (文件路径, False)
- 如果下载到examples/videos目录,返回 (文件路径, False)
- """
- try:
- # 1. 首先检查examples/videos目录下是否有对应的mp4文件
- existing_file = self._check_examples_directory(channel_content_id)
- if existing_file:
- logger.info(f"在examples/videos目录下找到现有文件,直接使用: {existing_file}")
- return existing_file, False
-
- # 2. 如果没有找到,则下载到examples/videos目录
- if not channel_content_id:
- logger.warning("未提供channel_content_id,无法保存到examples/videos目录")
- return None, True
-
- logger.info("未在examples/videos目录下找到同名文件,开始下载...")
-
- # 获取项目根目录
- project_root = Path(__file__).parent.parent.parent.parent
- videos_dir = project_root / "examples" / "videos"
-
- # 确保目录存在
- videos_dir.mkdir(parents=True, exist_ok=True)
-
- # 构建文件路径:examples/videos/{channel_content_id}.mp4
- target_path = videos_dir / f"{channel_content_id}.mp4"
-
- # 如果文件已存在(并发情况),直接返回
- if target_path.exists():
- logger.info(f"文件已存在: {target_path}")
- return str(target_path), False
- # 下载视频(带重试机制)
- max_retries = 3
- retry_count = 0
- last_exception = None
-
- while retry_count < max_retries:
- try:
- if retry_count > 0:
- logger.info(f"重试下载视频 (第 {retry_count}/{max_retries-1} 次)...")
-
- # 使用 Session 进行下载
- session = requests.Session()
- session.headers.update({
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
- })
-
- # 下载视频(增加超时时间)
- response = session.get(
- video_url,
- timeout=120, # (连接超时, 读取超时)
- stream=True
- )
- response.raise_for_status()
- # 写入文件
- with open(target_path, "wb") as f:
- for chunk in response.iter_content(chunk_size=8192):
- if chunk:
- f.write(chunk)
-
- # 验证文件大小
- file_size = os.path.getsize(target_path)
- if file_size == 0:
- raise ValueError("下载的文件大小为0")
-
- logger.info(f"视频下载完成,大小: {file_size / 1024 / 1024:.2f} MB,保存到: {target_path}")
- return str(target_path), False
-
- except (requests.exceptions.ChunkedEncodingError,
- requests.exceptions.ConnectionError,
- requests.exceptions.Timeout,
- requests.exceptions.RequestException) as e:
- last_exception = e
- retry_count += 1
-
- # 清理不完整的文件
- if target_path.exists():
- try:
- os.remove(target_path)
- except:
- pass
-
- if retry_count < max_retries:
- wait_time = retry_count * 2 # 递增等待时间:2秒、4秒
- logger.warning(f"下载失败 (尝试 {retry_count}/{max_retries}): {e}")
- logger.info(f"等待 {wait_time} 秒后重试...")
- time.sleep(wait_time)
- else:
- logger.error(f"下载失败,已重试 {max_retries} 次: {e}")
- raise
- except Exception as e:
- # 其他类型的异常直接抛出,不重试
- if target_path.exists():
- try:
- os.remove(target_path)
- except:
- pass
- raise
-
- # 如果所有重试都失败了
- if last_exception:
- raise last_exception
- except Exception as e:
- logger.error(f"下载视频失败: {e}", exc_info=True)
- return None, True
-
- def _check_examples_directory(self, channel_content_id: str) -> Optional[str]:
- """检查examples/videos目录下是否有对应的mp4文件
-
- 文件路径格式:examples/videos/{channel_content_id}.mp4
-
- Args:
- channel_content_id: 频道内容ID
-
- Returns:
- 如果找到文件,返回文件路径;否则返回None
- """
- try:
- # 如果没有提供channel_content_id,无法查找
- if not channel_content_id:
- logger.info("未提供channel_content_id,跳过examples/videos目录检查")
- return None
-
- # 获取项目根目录
- # __file__ 是 src/components/functions/video_upload_function.py
- # 需要往上4层才能到项目根目录
- project_root = Path(__file__).parent.parent.parent.parent
- videos_dir = project_root / "examples" / "videos"
-
- if not videos_dir.exists():
- logger.info(f"examples/videos目录不存在: {videos_dir}")
- return None
-
- # 构建文件路径:examples/videos/{channel_content_id}.mp4
- mp4_file = videos_dir / f"{channel_content_id}.mp4"
- logger.info(f"构建文件路径: {mp4_file}")
-
- # 检查文件是否存在
- if mp4_file.exists() and mp4_file.is_file():
- logger.info(f"在examples/videos目录下找到文件: {mp4_file}")
- return str(mp4_file)
-
- logger.debug(f"在examples/videos目录下未找到文件: {mp4_file}")
- return None
-
- except Exception as e:
- logger.warning(f"检查examples/videos目录时出错: {e}", exc_info=True)
- return None
|