Procházet zdrojové kódy

增加音频处理

jihuaqiang před 2 týdny
rodič
revize
c06863506d

+ 0 - 4
.env

@@ -6,10 +6,6 @@ FEISHU_FILE_TOKEN=VEBsbCfaWa3gF3slQILc6Rybnde
 # # 飞书表格配置
 # FEISHU_TABLE_ID=tblNdje7z6Cf3hax
 
-# 扣子
-COZE_API_KEY=pat_pClXS15hyuqohC9TK58vU7130Hp6QmmHlnyW2TjFpKVWKsW2B1VniFwdXkY3eRNB
-COZE_BOT_ID=7537570163895812146
-
 # Gemini
 GEMINI_API_KEY_1=AIzaSyAkt1l9Kw1CQgHFzTpla0vgt0OE53fr-BI
 GEMINI_API_KEY_2=AIzaSyAkt1l9Kw1CQgHFzTpla0vgt0OE53fr-BI

+ 2 - 2
prompt/structure.md

@@ -17,7 +17,7 @@
 1.  **识别主标题:** 使用JSON中的 `title` 字段作为一级标题 (`#`)。
 2.  **处理引言:** 将 `body_text` 的内容作为文章的引言或开场白。如果内容不完整,忠实呈现原文即可。
 3.  **分析并整合核心内容(核心任务):**
-    -   通读 `images_comprehension` 和 `videos_comprehension` 数组中的所有文本,理解其整体内容结构。判断这是“步骤式教程”、“对比清单”还是其他类型。
+    -   通读 `images_comprehension` 和 `videos_comprehension`, `audios_comprehension` 数组中的所有文本,理解其整体内容结构。判断这是“步骤式教程”、“对比清单”还是其他类型。
     -   识别出核心的类别或步骤标题(如“第一步”、“喵星人”、“汪星人”等)。
     -   遍历所有输入,将所有相关的信息点(包括其详细描述)归类到相应的主标题之下。确保将分散在多处的内容合并到一起。
     -   对于重复出现的主标题(如“屁股社交”),如果其描述性内容不同,则应作为独立条目保留,以确保信息的完整性。
@@ -27,4 +27,4 @@
 5.  **处理结尾和标签(如果存在):** 如果输入内容包含明确的结尾或 `#话题标签`,则将它们放在文档的末尾。
 
 ## 输入
-用户将提供一个包含 `title`、`body_text` 和 `images_comprehension`, `videos_comprehension` 的JSON对象。
+用户将提供一个包含 `title`、`body_text` 和 `images_comprehension`, `videos_comprehension`, `audios_comprehension` 的JSON对象。

+ 4 - 1
tools/agent_tools.py

@@ -208,6 +208,7 @@ class IdentifyTool:
                 'content': content,
                 'images': recognition_result.get('image_analysis', {}).get('images_comprehension', []),
                 'videos': recognition_result.get('video_analysis', {}),
+                'audios': recognition_result.get('audio_analysis', []),
                 'meta': {
                     'author': author,
                     'like_count': like_count,
@@ -393,6 +394,7 @@ class StructureTool:
                 'content': str,
                 'images': List[str],
                 'videos': Dict,
+                'audios': List[str],
                 'meta': Dict
             }
             
@@ -405,7 +407,8 @@ class StructureTool:
                 "title": content_data.get('title', ''),
                 "body_text": content_data.get('content', ''),
                 "images_comprehension": content_data.get('images', []),
-                "videos_comprehension": content_data.get('videos', [])
+                "videos_comprehension": content_data.get('videos', []),
+                "audios_comprehension": content_data.get('audios', []),
             }
             
             # 调用结构化处理器

+ 361 - 0
tools/indentify/audio_identifier.py

@@ -0,0 +1,361 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+音频识别脚本
+主要功能:将音频转文字(ASR),参考视频识别模块的结构实现
+支持从 formatted_content 中提取音频 URL,下载后上传至 Gemini 进行转写。
+"""
+
+import os
+import json
+import time
+import sys
+import uuid
+import requests
+from typing import Dict, Any, List, Optional
+from dotenv import load_dotenv
+
+# 导入自定义模块
+sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
+from utils.logging_config import get_logger
+
+# 创建 logger
+logger = get_logger('AudioIdentifier')
+
+# 导入Google Generative AI
+import google.generativeai as genai
+from google.generativeai.types import HarmCategory, HarmBlockThreshold
+
+# 缓存目录配置
+CACHE_DIR = os.path.join(os.path.dirname(__file__), 'cache')
+# 缓存文件最大保留时间(秒)
+CACHE_MAX_AGE = 3600  # 1小时
+
+
+class AudioIdentifier:
+    def __init__(self):
+        # 加载环境变量
+        load_dotenv()
+
+        # 延迟配置Gemini,在真正使用时再设置
+        self._configured = False
+        self.model = None
+        self.api_key = None
+
+        # 初始化缓存清理时间
+        self.last_cache_cleanup = time.time()
+
+        # 系统提示词:仅做语音转文字
+        self.system_prompt = (
+            "你是一个专业的音频转写助手。请严格将音频中的语音内容转换为文字,不要添加任何分析、解释或评论。"
+        )
+
+    def _ensure_configured(self):
+        """确保Gemini已配置"""
+        if not self._configured:
+            # 与图片模块保持一致读取 GEMINI_API_KEY_1,若无则回退 GEMINI_API_KEY
+            self.api_key = os.getenv('GEMINI_API_KEY_1') or os.getenv('GEMINI_API_KEY')
+            if not self.api_key:
+                raise ValueError('请在环境变量中设置 GEMINI_API_KEY_1 或 GEMINI_API_KEY')
+            genai.configure(api_key=self.api_key)
+            # 使用通用多模态模型进行音频理解
+            self.model = genai.GenerativeModel(
+                model_name='gemini-2.5-flash',
+                generation_config=genai.GenerationConfig(
+                    response_mime_type='text/plain',
+                    temperature=0.2,
+                    max_output_tokens=40960
+                ),
+                safety_settings={
+                    HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,
+                    HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE,
+                }
+            )
+            self._configured = True
+
+    def cleanup_cache(self):
+        """清理过期的缓存文件"""
+        try:
+            current_time = time.time()
+            if current_time - self.last_cache_cleanup < 3600:
+                return
+            if not os.path.exists(CACHE_DIR):
+                return
+            cleaned_count = 0
+            for filename in os.listdir(CACHE_DIR):
+                file_path = os.path.join(CACHE_DIR, filename)
+                if os.path.isfile(file_path):
+                    file_age = current_time - os.path.getmtime(file_path)
+                    if file_age > CACHE_MAX_AGE:
+                        try:
+                            os.remove(file_path)
+                            cleaned_count += 1
+                        except Exception as e:
+                            logger.warning(f'清理缓存文件失败: {file_path}, 错误: {e}')
+            if cleaned_count > 0:
+                logger.info(f'已清理 {cleaned_count} 个过期缓存文件')
+            self.last_cache_cleanup = current_time
+        except Exception as e:
+            logger.error(f'清理缓存失败: {e}')
+
+    def download_audio(self, audio_url: str) -> Optional[str]:
+        """下载音频到本地缓存并返回路径"""
+        # 猜测常见音频类型,后续统一按 mp3 保存
+        file_path = os.path.join(CACHE_DIR, f'{str(uuid.uuid4())}.mp3')
+        try:
+            os.makedirs(CACHE_DIR, exist_ok=True)
+        except Exception as e:
+            logger.error(f'创建缓存目录失败: {e}')
+            return None
+
+        try:
+            for attempt in range(3):
+                try:
+                    response = requests.get(url=audio_url, timeout=60)
+                    if response.status_code == 200:
+                        try:
+                            with open(file_path, 'wb') as f:
+                                f.write(response.content)
+                            return file_path
+                        except Exception as e:
+                            logger.error(f'音频保存失败: {e}')
+                            if os.path.exists(file_path):
+                                try:
+                                    os.remove(file_path)
+                                except Exception:
+                                    pass
+                            return None
+                    else:
+                        logger.warning(f'音频下载失败,状态码: {response.status_code}')
+                        if attempt == 2:
+                            return None
+                except Exception as e:
+                    logger.warning(f'下载尝试 {attempt + 1} 失败: {e}')
+                    if attempt < 2:
+                        time.sleep(1)
+                        continue
+                    return None
+        except Exception as e:
+            logger.error(f'下载过程异常: {e}')
+            return None
+
+        return None
+
+    def upload_audio_to_gemini(self, audio_path: str) -> Optional[Any]:
+        """上传音频至 Gemini,返回文件对象"""
+        self._ensure_configured()
+        max_retries = 3
+        retry_delay = 5
+        for attempt in range(max_retries):
+            try:
+                if not os.path.exists(audio_path):
+                    logger.error('错误: 文件不存在')
+                    return None
+                file_size = os.path.getsize(audio_path)
+                if file_size == 0:
+                    logger.error('错误: 文件大小为0')
+                    return None
+                try:
+                    with open(audio_path, 'rb') as f:
+                        f.read(1024)
+                except Exception as e:
+                    logger.error(f'错误: 文件无法读取 - {e}')
+                    return None
+
+                try:
+                    # 使用常见音频 MIME 类型。若后续需要可根据扩展名判断
+                    audio_file = genai.upload_file(path=audio_path, mime_type='audio/mpeg')
+                except Exception as e:
+                    msg = str(e)
+                    logger.error(f'错误: 文件上传请求失败 - {msg}')
+                    if any(k in msg.lower() for k in ['broken pipe', 'connection', 'timeout', 'network']):
+                        if attempt < max_retries - 1:
+                            time.sleep(retry_delay)
+                            retry_delay *= 2
+                            continue
+                        return None
+                    return None
+
+                # 等待处理
+                max_wait_time = 120
+                waited = 0
+                while getattr(audio_file, 'state', None) and getattr(audio_file.state, 'name', '') == 'PROCESSING' and waited < max_wait_time:
+                    time.sleep(2)
+                    waited += 2
+                    try:
+                        audio_file = genai.get_file(name=audio_file.name)
+                        if audio_file.state.name in ['FAILED', 'ERROR', 'INVALID']:
+                            if attempt < max_retries - 1:
+                                time.sleep(retry_delay)
+                                retry_delay *= 2
+                                break
+                            return None
+                    except Exception as e:
+                        logger.warning(f'获取文件状态失败: {e}')
+                        if waited <= 60:
+                            return None
+                        continue
+
+                if getattr(audio_file, 'state', None) and audio_file.state.name == 'ACTIVE':
+                    logger.info(f'音频上传成功: {audio_file.name}')
+                    return audio_file
+                else:
+                    if attempt < max_retries - 1:
+                        time.sleep(retry_delay)
+                        retry_delay *= 2
+                        continue
+                    return None
+            except Exception as e:
+                msg = str(e)
+                if any(k in msg.lower() for k in ['broken pipe', 'connection', 'timeout', 'network']):
+                    if attempt < max_retries - 1:
+                        time.sleep(retry_delay)
+                        retry_delay *= 2
+                        continue
+                    return None
+                logger.error(f'音频上传异常: {msg}')
+                return None
+        return None
+
+    def extract_audio_urls(self, formatted_content: Dict[str, Any]) -> List[str]:
+        """从 formatted_content 中提取音频 URL 列表
+        兼容以下结构:
+        - audio_url_list: [{"audio_url": "..."}, ...]
+        - voice_data: {"url": "..."} 或 [{"url": "..."}, ...]
+        - bgm_data: {"url": "..."}
+        """
+        urls: List[str] = []
+        # audio_url_list
+        for item in (formatted_content.get('audio_url_list') or []):
+            if isinstance(item, dict) and item.get('audio_url'):
+                urls.append(item['audio_url'])
+            elif isinstance(item, str):
+                urls.append(item)
+        # voice_data
+        voice_data = formatted_content.get('voice_data')
+        if isinstance(voice_data, dict) and voice_data.get('url'):
+            urls.append(voice_data['url'])
+        elif isinstance(voice_data, list):
+            for v in voice_data:
+                if isinstance(v, dict) and v.get('url'):
+                    urls.append(v['url'])
+                elif isinstance(v, str):
+                    urls.append(v)
+        # bgm_data
+        bgm_data = formatted_content.get('bgm_data')
+        if isinstance(bgm_data, dict) and bgm_data.get('url'):
+            urls.append(bgm_data['url'])
+
+        # 去重并保持顺序
+        seen = set()
+        deduped: List[str] = []
+        for u in urls:
+            if u and u not in seen:
+                seen.add(u)
+                deduped.append(u)
+        return deduped
+
+    def analyze_audios_with_gemini(self, audio_urls: List[str]) -> List[Dict[str, Any]]:
+        """将音频上传到 Gemini 并进行转写,返回按输入顺序的结果列表"""
+        if not audio_urls:
+            return []
+
+        results: List[Dict[str, Any]] = [{} for _ in range(len(audio_urls))]
+
+        def process_one(idx_and_url) -> Dict[str, Any]:
+            idx, url = idx_and_url
+            audio_file = None
+            local_path: Optional[str] = None
+            try:
+                self._ensure_configured()
+                logger.info(f"配置Gemini: {self.api_key}")
+
+                # 1. 下载
+                local_path = self.download_audio(url)
+                if not local_path:
+                    return {"url": url, "asr_content": "音频下载失败"}
+
+                # 2. 上传
+                audio_file = self.upload_audio_to_gemini(local_path)
+
+                # 清理本地文件
+                try:
+                    if local_path and os.path.exists(local_path):
+                        os.remove(local_path)
+                except Exception:
+                    pass
+
+                if not audio_file:
+                    return {"url": url, "asr_content": "音频上传失败"}
+
+                # 3. 生成
+                response = self.model.generate_content(
+                    contents=[self.system_prompt, audio_file],
+                    request_options={'timeout': 500}
+                )
+
+                # 尝试读取文本
+                try:
+                    text_out = ''
+                    # 优先从 candidates 结构提取,避免某些情况下 .text 不可用
+                    candidates = getattr(response, 'candidates', None)
+                    if candidates and len(candidates) > 0:
+                        first = candidates[0]
+                        content = getattr(first, 'content', None)
+                        parts = getattr(content, 'parts', None) if content else None
+                        if parts and len(parts) > 0:
+                            part0 = parts[0]
+                            text_out = getattr(part0, 'text', None) if hasattr(part0, 'text') else part0.get('text') if isinstance(part0, dict) else ''
+                    if not text_out and hasattr(response, 'text') and isinstance(response.text, str):
+                        text_out = response.text
+                    text_out = (text_out or '').strip()
+                    if not text_out:
+                        return {"url": url, "asr_content": "ASR分析失败:无内容"}
+                    return {"url": url, "asr_content": text_out}
+                except Exception as e:
+                    return {"url": url, "asr_content": f"ASR分析失败:{str(e)}"}
+            except Exception as e:
+                return {"url": url, "asr_content": f"处理失败: {str(e)}"}
+            finally:
+                # 4. 清理远端文件
+                if audio_file and hasattr(audio_file, 'name'):
+                    try:
+                        genai.delete_file(name=audio_file.name)
+                    except Exception:
+                        pass
+
+        # 顺序处理,保持简单稳妥
+        for idx, url in enumerate(audio_urls):
+            result = process_one((idx, url))
+            results[idx] = result
+
+        return results
+
+    def process_audios(self, formatted_content: Dict[str, Any]) -> List[Dict[str, Any]]:
+        """处理音频识别的主函数,返回 [{url, asr_content}]"""
+        try:
+            audio_urls = self.extract_audio_urls(formatted_content)
+            if not audio_urls:
+                return []
+            return self.analyze_audios_with_gemini(audio_urls)
+        finally:
+            # 触发一次缓存清理(若到时间)
+            self.cleanup_cache()
+
+
+def main():
+    """测试函数"""
+    test_content = {
+        "audio_url_list": [
+            {"audio_url": "http://rescdn.yishihui.com/pipeline/audio/09417cf6-60ec-4b62-8ee1-06f9268b13b1.mp3"}
+        ]
+    }
+    identifier = AudioIdentifier()
+    result = identifier.process_audios(test_content)
+    print(json.dumps(result, ensure_ascii=False, indent=2))
+
+
+if __name__ == '__main__':
+    main()
+
+

+ 10 - 1
tools/indentify/indentify.py

@@ -24,6 +24,7 @@ sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
 from utils.mysql_db import MysqlHelper
 from indentify.image_identifier import ImageIdentifier
 from indentify.video_identifier import VideoIdentifier
+from indentify.audio_identifier import AudioIdentifier
 from utils.logging_config import get_logger
 
 
@@ -41,6 +42,7 @@ class ContentIdentifier:
         # 延迟初始化识别模块,确保在需要时使用正确的环境变量
         self.image_identifier = None
         self.video_identifier = None
+        self.audio_identifier = None
     
 
     def get_unprocessed_record(self) -> Optional[Dict[str, Any]]:
@@ -122,18 +124,24 @@ class ContentIdentifier:
             self.image_identifier = ImageIdentifier()
         if self.video_identifier is None:
             self.video_identifier = VideoIdentifier()
+        if self.audio_identifier is None:
+            self.audio_identifier = AudioIdentifier()
         
         # 图片识别
         image_result = self.image_identifier.process_images(formatted_content)
         
         # 视频识别
         video_result = self.video_identifier.process_videos(formatted_content)
+
+        # 音频识别(仅ASR)
+        audio_result = self.audio_identifier.process_audios(formatted_content)
         
 
         # 整合结果
         recognition_result = {
             'image_analysis': image_result,
-            'video_analysis': video_result
+            'video_analysis': video_result,
+            'audio_analysis': audio_result
         }
         
         return recognition_result
@@ -227,6 +235,7 @@ class ContentIdentifier:
                 'content': content,
                 'images': recognition_result.get('image_analysis', {}).get('images_comprehension', []),
                 'videos': recognition_result.get('video_analysis', {}),
+                'audios': recognition_result.get('audio_analysis', []),
                 'meta': {
                     'author': author,
                     'like_count': like_count,