| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579 | #!/usr/bin/env python3# -*- coding: utf-8 -*-"""视频识别脚本主要功能:使用 Gemini API 从三个维度分析视频内容1. ASR (Automatic Speech Recognition) - 语音转文字2. OCR - 识别视频画面中的文字3. 关键帧提取与描述 - 提取视频关键帧并进行图像描述"""import osimport jsonimport timeimport sysimport uuidimport requestsfrom typing import Dict, Any, List, Optionalfrom dotenv import load_dotenv# 导入自定义模块sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))from utils.logging_config import get_logger# 创建 loggerlogger = get_logger('VideoIdentifier')from llm.openrouter import OpenRouterProcessor, OpenRouterModel# 导入Google Generative AIimport google.generativeai as genaifrom google.generativeai.types import HarmCategory, HarmBlockThreshold# 缓存目录配置CACHE_DIR = os.path.join(os.path.dirname(__file__), 'cache')# 缓存文件最大保留时间(秒)CACHE_MAX_AGE = 3600  # 1小时class VideoIdentifier:    def __init__(self):        # 加载环境变量        load_dotenv()                # 延迟配置Gemini,在真正使用时再设置        self._configured = False                # 初始化缓存清理时间        self.last_cache_cleanup = time.time()                # 统一的系统提示词 - 三个维度分析        self.unified_system_prompt = """你是一个专业的视频内容分析专家。请从以下两个维度分析视频内容,并以JSON格式输出结果:1. ASR (Automatic Speech Recognition) - 语音转文字:   - 仅提取视频中的语音内容,转换为文字   - 保持原始语音的准确性和完整性   - 不要添加分析、解释或评论2. 关键帧提取与描述(包含OCR文字识别):   - 将视频按照画面场景变化分解为多个关键时间片段   - 对每个时间片段进行以下分析:     * 画面的主要视觉元素和内容, 20个字以内     * 画面中出现的所有文字内容(OCR识别),**注意忽略语音的字幕**   - 每个时间片段应包含:     * content: 画面内容的详细描述,15个字以内     * ocr_content: 该时间段画面中出现的文字内容,仅做画面内文字提取,不要提取字幕文字,不要做任何解释或总结请严格按照以下JSON格式输出,使用中文输出,不要添加任何其他文字:{    "asr_content": "提取的语音文字内容",    "iframe_details": [        {            "time_start": "开始时间(秒)",            "time_end": "结束时间(秒)",            "content": "该时间段画面内容的详细描述",            "ocr_content": "该时间段画面中出现的文字内容"        }    ]}"""        def _ensure_configured(self):        """确保Gemini已配置"""        if not self._configured:            self.api_key = os.getenv('GEMINI_API_KEY') or os.getenv('GEMINI_API_KEY_1')            print(f"配置Gemini: {self.api_key}")            genai.configure(api_key=self.api_key)            self._configured = True        def download_video(self, video_url: str) -> Optional[str]:        """下载视频到本地缓存"""        file_path = os.path.join(CACHE_DIR, f'{str(uuid.uuid4())}.mp4')        try:            # 确保缓存目录存在            try:                os.makedirs(CACHE_DIR, exist_ok=True)            except Exception as e:                print(f'创建缓存目录失败: {e}')                return None                        # 尝试下载视频            for attempt in range(3):                try:                    response = requests.get(url=video_url, timeout=600)                    if response.status_code == 200:                        try:                            with open(file_path, 'wb') as f:                                f.write(response.content)                            # print(f'视频下载成功: {video_url} -> {file_path}')                            return file_path                        except Exception as e:                            print(f'视频保存失败: {e}')                            # 保存失败时清理已创建的文件                            if os.path.exists(file_path):                                try:                                    os.remove(file_path)                                    print(f'已清理下载失败的文件: {file_path}')                                except:                                    pass                            return None                    else:                        print(f'视频下载失败,状态码: {response.status_code}')                        if attempt == 2:  # 最后一次尝试失败                            print(f'所有下载尝试都失败了')                            return None                except Exception as e:                    print(f'下载尝试 {attempt + 1} 失败: {e}')                    if attempt < 2:  # 不是最后一次尝试                        time.sleep(1)                        continue                    else:                        print(f'所有下载尝试都失败了')                        return None                                except Exception as e:            print(f'下载过程异常: {e}')            return None                return None        def cleanup_cache(self):        """清理过期的缓存文件"""        try:            current_time = time.time()            # 每小时清理一次缓存            if current_time - self.last_cache_cleanup < 3600:                return                        if not os.path.exists(CACHE_DIR):                return                        cleaned_count = 0            for filename in os.listdir(CACHE_DIR):                file_path = os.path.join(CACHE_DIR, filename)                if os.path.isfile(file_path):                    file_age = current_time - os.path.getmtime(file_path)                    if file_age > CACHE_MAX_AGE:                        try:                            os.remove(file_path)                            cleaned_count += 1                        except Exception as e:                            print(f'清理缓存文件失败: {file_path}, 错误: {e}')                        if cleaned_count > 0:                print(f'已清理 {cleaned_count} 个过期缓存文件')                        self.last_cache_cleanup = current_time                    except Exception as e:            print(f'清理缓存失败: {e}')        def upload_video_to_gemini(self, video_path: str) -> Optional[Any]:        """上传视频到Gemini进行分析"""        max_retries = 3        retry_delay = 5                for attempt in range(max_retries):            try:                # print(f"  开始上传视频到Gemini... (尝试 {attempt + 1}/{max_retries})")                # print(f"    文件路径: {video_path}")                                # 1. 文件检查                if not os.path.exists(video_path):                    print(f"    错误: 文件不存在")                    return None                                file_size = os.path.getsize(video_path)                # print(f"    文件大小: {file_size / (1024*1024):.2f} MB")                                if file_size == 0:                    print(f"    错误: 文件大小为0")                    return None                                # 2. 文件权限检查                try:                    with open(video_path, 'rb') as f:                        # 尝试读取文件开头,检查是否可读                        f.read(1024)                    # print(f"    文件权限: 可读")                except Exception as e:                    print(f"    错误: 文件无法读取 - {e}")                    return None                                # 4. 尝试上传文件                # print(f"    开始上传文件...")                try:                    video_file = genai.upload_file(path=video_path, mime_type='video/mp4')                    # print(f"    文件上传请求已发送,文件ID: {video_file.name}")                except Exception as e:                    print(f"    错误: 文件上传请求失败 - {e}")                    print(f"    错误类型: {type(e).__name__}")                    print(f"    错误详情: {str(e)}")                                        # 如果是网络相关错误,尝试重试                    if any(keyword in str(e).lower() for keyword in ['broken pipe', 'connection', 'timeout', 'network']):                        if attempt < max_retries - 1:                            print(f"    网络错误,等待 {retry_delay} 秒后重试...")                            time.sleep(retry_delay)                            retry_delay *= 2  # 指数退避                            continue                        else:                            print(f"    所有重试都失败了")                            return None                    else:                        # 非网络错误,直接返回                        return None                                # 5. 等待文件处理完成                print(f"    等待文件处理完成...")                max_wait_time = 120  # 最大等待2分钟                wait_count = 0                                while video_file.state.name == 'PROCESSING' and wait_count < max_wait_time:                    time.sleep(2)  # 每2秒检查一次                    wait_count += 2                                        try:                        # 获取最新状态                        video_file = genai.get_file(name=video_file.name)                        current_state = video_file.state.name                        # print(f"      状态: {current_state} ({wait_count}秒)")                                                # 检查是否有错误状态                        if current_state in ['FAILED', 'ERROR', 'INVALID']:                            print(f"    错误: 文件处理失败,状态: {current_state}")                            if hasattr(video_file, 'error'):                                print(f"    错误详情: {video_file.error}")                                                        # 如果是处理失败,尝试重试                            if attempt < max_retries - 1:                                print(f"    文件处理失败,等待 {retry_delay} 秒后重试...")                                time.sleep(retry_delay)                                retry_delay *= 2                                break  # 跳出等待循环,进行重试                            else:                                return None                                                    except Exception as e:                        print(f"      警告: 获取文件状态失败 - {e}")                        if wait_count > 60:  # 超过1分钟后,尝试继续                            print(f"      继续等待...")                            continue                        else:                            print(f"    错误: 无法获取文件状态")                            return None                                # 6. 检查最终状态                if video_file.state.name == 'ACTIVE':                    print(f'    视频上传成功: {video_file.name}')                    # print(f"    最终状态: {video_file.state.name}")                    return video_file                else:                    print(f'    错误: 视频文件上传失败')                    # print(f"    最终状态: {video_file.state.name}")                    # print(f"    等待时间: {wait_count}秒")                                        # 尝试获取更多错误信息                    try:                        file_info = genai.get_file(name=video_file.name)                        # print(f"    文件信息: {file_info}")                    except Exception as e:                        print(f"    无法获取文件详细信息: {e}")                                        # 如果不是最后一次尝试,进行重试                    if attempt < max_retries - 1:                        print(f"    上传失败,等待 {retry_delay} 秒后重试...")                        time.sleep(retry_delay)                        retry_delay *= 2                        continue                    else:                        return None                                    except Exception as e:                error_type = type(e).__name__                error_msg = str(e)                                print(f'    错误: 视频上传到Gemini失败')                print(f"    错误类型: {error_type}")                print(f"    错误信息: {error_msg}")                                # 针对特定错误的处理建议                if "Broken pipe" in error_msg:                    print(f"    诊断: Broken pipe 错误通常表示:")                    print(f"      - 网络连接不稳定")                    print(f"      - 服务器连接中断")                    print(f"      - 防火墙或代理问题")                    print(f"    建议:")                    print(f"      - 检查网络连接")                    print(f"      - 尝试使用VPN或更换网络")                    print(f"      - 检查防火墙设置")                elif "Connection" in error_msg:                    print(f"    诊断: 连接错误")                    print(f"    建议: 检查网络连接和API密钥")                elif "Timeout" in error_msg:                    print(f"    诊断: 超时错误")                    print(f"    建议: 网络较慢,可以增加超时时间")                elif "Permission" in error_msg:                    print(f"    诊断: 权限错误")                    print(f"    建议: 检查API密钥和权限设置")                                # 如果是网络相关错误,尝试重试                if any(keyword in error_msg.lower() for keyword in ['broken pipe', 'connection', 'timeout', 'network']):                    if attempt < max_retries - 1:                        print(f"    网络错误,等待 {retry_delay} 秒后重试...")                        time.sleep(retry_delay)                        retry_delay *= 2                        continue                    else:                        print(f"    所有重试都失败了")                        return None                else:                    # 非网络错误,直接返回                    print(f"    非网络错误,不进行重试")                    return None                return None        def extract_video_urls(self, formatted_content: Dict[str, Any]) -> List[Dict[str, Any]]:        """提取视频URL列表"""        video_data = []        video_url_list = formatted_content.get('video_url_list', [])                for video_item in video_url_list:            if isinstance(video_item, dict) and 'video_url' in video_item:                video_data.append({                    'url': video_item['video_url'],                    'duration': video_item.get('video_duration', 0)                })                return video_data        def process_videos(self, formatted_content: Dict[str, Any]) -> List[Dict[str, Any]]:        """处理视频识别的主函数"""                # 提取视频URL        video_data = self.extract_video_urls(formatted_content)        if not video_data:            return []        # 使用 OpenRouter 批量处理,避免逐个上传/分析        return self.analyze_videos_with_openrouter(video_data)    def analyze_videos_with_openrouter(self, video_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:        """使用 Gemini 并发(最多5条)完成上传+分析的完整流程"""        try:            # 系统提示:严格限制为"仅提取内容,不做分析" [[memory:7272937]]            system_prompt = self.unified_system_prompt            # 保持输入顺序的结果数组            results: List[Dict[str, Any]] = [{} for _ in range(len(video_data))]            url_to_index = {item['url']: idx for idx, item in enumerate(video_data)}            def complete_video_job(item: Dict[str, Any]) -> Dict[str, Any]:                """完整的视频处理流程:下载->上传->分析->清理"""                print(f"开始处理视频: {item}")                url = item.get('url', '')                duration = item.get('duration', 0)                video_file = None                # 确保Gemini已配置                self._ensure_configured()                logger.info(f"配置Gemini: {self.api_key}")                try:                    # 1. 下载视频                    video_path = self.download_video(url)                    if not video_path:                        return {                            'url': url, 'duration': duration, 'asr_content': '视频下载失败', 'iframe_details': []                        }                    # 2. 上传到 Gemini                    video_file = self.upload_video_to_gemini(video_path)                                        # 清理本地缓存文件                    try:                        if video_path and os.path.exists(video_path):                            os.remove(video_path)                    except Exception:                        pass                    if not video_file:                        return {                            'url': url, 'duration': duration, 'asr_content': '视频上传失败', 'iframe_details': []                        }                    # 3. 使用 Gemini 直接分析视频文件                                        model = genai.GenerativeModel(                        model_name='gemini-2.5-flash',                        generation_config=genai.GenerationConfig(                            response_mime_type='application/json',                            temperature=0.3,                            max_output_tokens=409600                        ),                        safety_settings={                            HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,                            HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE,                        }                    )                    response = model.generate_content(                        contents=[video_file, system_prompt],                        request_options={'timeout': 2000}                    )                    if hasattr(response, '_error') and response._error:                        raise Exception(f"生成错误: {response._error}")                    # 安全获取文本:避免在无 Part 时访问 response.text 抛错                    def safe_extract_text(resp: Any) -> str:                        try:                            # 优先从 candidates 结构中提取                            candidates = getattr(resp, 'candidates', None)                            if candidates and len(candidates) > 0:                                first = candidates[0]                                # 记录 finish_reason 以便错误信息更清晰                                finish_reason = getattr(first, 'finish_reason', None)                                # parts 路径                                content = getattr(first, 'content', None)                                parts = getattr(content, 'parts', None) if content else None                                if parts and len(parts) > 0:                                    # 兼容 text 或直接包含的内容                                    # 常见 part 为 {text: str}                                    part0 = parts[0]                                    text = getattr(part0, 'text', None) if hasattr(part0, 'text') else part0.get('text') if isinstance(part0, dict) else None                                    if isinstance(text, str) and text.strip():                                        return text                                # 若无 parts 或为空,根据 finish_reason 返回清晰错误                                reason = str(finish_reason) if finish_reason is not None else 'unknown'                                raise ValueError(f"无有效输出内容,finish_reason={reason}")                            # 退化到 resp.text(可能抛错)                            if hasattr(resp, 'text') and isinstance(resp.text, str) and resp.text.strip():                                return resp.text                            raise ValueError("响应中没有可用的文本内容")                        except Exception as ex:                            raise ex                    try:                        text_payload = safe_extract_text(response).strip()                        parsed = json.loads(text_payload)                        if not isinstance(parsed, dict):                            raise ValueError("响应格式错误:非字典结构")                                                # 确保包含所有必需字段                        required_fields = ['asr_content', 'iframe_details']                        for field in required_fields:                            if field not in parsed:                                if field == 'iframe_details':                                    parsed[field] = [{                                        'time_start': 0,                                        'time_end': 0,                                        'content': f'{field}分析失败',                                        'ocr_content': f'{field}分析失败'                                    }]                                else:                                    parsed[field] = f"{field}分析失败"                                                asr = parsed.get('asr_content', '')                        frames = parsed.get('iframe_details', [])                        if not isinstance(frames, list):                            frames = []                        return {'url': url, 'duration': duration, 'asr_content': asr, 'iframe_details': frames}                                            except json.JSONDecodeError as e:                        print(f"JSON解析失败: {e}")                        return {                            'url': url, 'duration': duration,                            'asr_content': 'ASR分析失败:JSON解析错误',                            'iframe_details': [{                                'time_start': 0, 'time_end': 0,                                'content': '关键帧分析失败:JSON解析错误',                                'ocr_content': '关键帧分析失败:JSON解析错误'                            }]                        }                    except Exception as e:                        # 捕获无 Part 或 finish_reason 封锁等导致的无法提取文本问题                        err_msg = str(e)                        return {                            'url': url, 'duration': duration,                            'asr_content': f'处理失败: {err_msg}',                            'iframe_details': [{                                'time_start': 0, 'time_end': 0,                                'content': f'处理失败: {err_msg}',                                'ocr_content': f'处理失败: {err_msg}'                            }]                        }                                        except Exception as e:                    return {                        'url': url, 'duration': duration,                        'asr_content': f'处理失败: {str(e)}',                        'iframe_details': [{                            'time_start': 0, 'time_end': 0,                            'content': f'处理失败: {str(e)}',                            'ocr_content': f'处理失败: {str(e)}'                        }]                    }                finally:                    # 4. 清理 Gemini 文件                    if video_file and hasattr(video_file, 'name'):                        try:                            genai.delete_file(name=video_file.name)                        except Exception:                            pass            # 单独遍历处理所有视频            for i, item in enumerate(video_data):                result = complete_video_job(item)                results[i] = result            return results        except Exception as e:            print(f"OpenRouter 批量视频分析失败: {e}")            return [{                'url': item.get('url', ''),                'duration': item.get('duration', 0),                'asr_content': f'处理失败: {str(e)}',                'iframe_details': []            } for item in video_data]def main():    """测试函数"""    # 模拟数据    test_content = {        "video_url_list": [            {                "video_url": "http://rescdn.yishihui.com/pipeline/video/489e7c31-4e7c-44cc-872d-b1b1dd42b12d.mp4",                "video_duration": 187            },            {                "video_url": "http://temp.yishihui.com/pipeline/video/43d11b20-6273-4ece-a146-94f63a3992a8.mp4",                "video_duration": 100            },            # {            #     "video_url": "http://temp.yishihui.com/longvideo/transcode/video/vpc/20250731/57463792ND5eu5PAj95sVLi2gB.mp4",            #     "video_duration": 100            # },            # {            #     "video_url": "http://temp.yishihui.com/longvideo/transcode/crawler_local/video/prod/20250912/2c278614bd39fc2668f210d752141cb678956536.mp4",            #     "video_duration": 100            # },            # {            #     "video_url": "http://temp.yishihui.com/longvideo/transcode/video/vpc/20250809/5870d4dc9ba18ce57e5af27b81ff1398.mp4",            #     "video_duration": 100            # },            # {            #     "video_url": "http://temp.yishihui.com/pipeline/video/202769cd-68a5-41a2-82d9-620d2c72a225.mp4",            #     "video_duration": 100            # }        ]    }        identifier = VideoIdentifier()    result = identifier.process_videos(test_content)        print(f"识别结果: {json.dumps(result, ensure_ascii=False, indent=2)}")if __name__ == '__main__':    main() 
 |