Browse Source

异步处理

jihuaqiang 1 month ago
parent
commit
f5d083076f
2 changed files with 37 additions and 33 deletions
  1. 13 9
      agent.py
  2. 24 24
      tools/indentify/video_identifier.py

+ 13 - 9
agent.py

@@ -11,6 +11,7 @@ import os
 import time
 import time
 from typing import Any, Dict, List, Optional, TypedDict, Annotated
 from typing import Any, Dict, List, Optional, TypedDict, Annotated
 from contextlib import asynccontextmanager
 from contextlib import asynccontextmanager
+import asyncio
 
 
 # 保证可以导入本项目模块
 # 保证可以导入本项目模块
 sys.path.append(os.path.dirname(os.path.abspath(__file__)))
 sys.path.append(os.path.dirname(os.path.abspath(__file__)))
@@ -195,7 +196,7 @@ def create_langgraph_workflow():
                 "index": current_index + 1,
                 "index": current_index + 1,
                 "dbInserted": ok,
                 "dbInserted": ok,
                 "identifyError": identify_result.get('error'),
                 "identifyError": identify_result.get('error'),
-                "status": "success" if ok else "failed"
+                "status": 2 if ok else 3
             }
             }
             state["details"].append(detail)
             state["details"].append(detail)
             
             
@@ -210,7 +211,7 @@ def create_langgraph_workflow():
                 "index": current_index + 1,
                 "index": current_index + 1,
                 "dbInserted": False,
                 "dbInserted": False,
                 "identifyError": str(e),
                 "identifyError": str(e),
-                "status": "error"
+                "status": 3
             }
             }
             state["details"].append(detail)
             state["details"].append(detail)
             state["status"] = "item_error"
             state["status"] = "item_error"
@@ -395,7 +396,7 @@ async def parse_processing(request: TriggerRequest, background_tasks: Background
                         "index": idx,
                         "index": idx,
                         "dbInserted": ok,
                         "dbInserted": ok,
                         "identifyError": identify_result.get('error'),
                         "identifyError": identify_result.get('error'),
-                        "status": "success" if ok else "failed"
+                        "status": 2 if ok else 3
                     })
                     })
                     
                     
                 except Exception as e:
                 except Exception as e:
@@ -404,7 +405,7 @@ async def parse_processing(request: TriggerRequest, background_tasks: Background
                         "index": idx,
                         "index": idx,
                         "dbInserted": False,
                         "dbInserted": False,
                         "identifyError": str(e),
                         "identifyError": str(e),
-                        "status": "error"
+                        "status": 3
                     })
                     })
 
 
             result = TriggerResponse(
             result = TriggerResponse(
@@ -426,23 +427,26 @@ async def parse_processing(request: TriggerRequest, background_tasks: Background
         update_request_status(request.requestId, 3)
         update_request_status(request.requestId, 3)
         raise HTTPException(status_code=500, detail=f"处理失败: {str(e)}")
         raise HTTPException(status_code=500, detail=f"处理失败: {str(e)}")
 
 
-@app.post("/parse/async")
+@app.post("/parse/async", status_code=200)
 async def parse_processing_async(request: TriggerRequest, background_tasks: BackgroundTasks):
 async def parse_processing_async(request: TriggerRequest, background_tasks: BackgroundTasks):
     """
     """
     异步解析内容处理(后台任务)
     异步解析内容处理(后台任务)
     
     
     - **requestId**: 请求ID,用于标识处理任务
     - **requestId**: 请求ID,用于标识处理任务
+    
+    行为:立即返回 200,并在后台继续处理任务。
     """
     """
     try:
     try:
         logger.info(f"收到异步解析请求: requestId={request.requestId}")
         logger.info(f"收到异步解析请求: requestId={request.requestId}")
         
         
-        # 添加后台任务
-        background_tasks.add_task(process_request_background, request.requestId)
+        # 直接使用 asyncio 创建后台任务(不阻塞当前请求返回)
+        asyncio.create_task(process_request_background(request.requestId))
         
         
+        # 立即返回(不阻塞)
         return {
         return {
             "requestId": request.requestId,
             "requestId": request.requestId,
-            "status": "processing",
-            "message": "任务已提交到后台处理",
+            "status": 1,
+            "message": "任务已进入队列并在后台处理",
             "langgraph_enabled": HAS_LANGGRAPH
             "langgraph_enabled": HAS_LANGGRAPH
         }
         }
         
         

+ 24 - 24
tools/indentify/video_identifier.py

@@ -95,7 +95,7 @@ class VideoIdentifier:
                         try:
                         try:
                             with open(file_path, 'wb') as f:
                             with open(file_path, 'wb') as f:
                                 f.write(response.content)
                                 f.write(response.content)
-                            print(f'视频下载成功: {video_url} -> {file_path}')
+                            # print(f'视频下载成功: {video_url} -> {file_path}')
                             return file_path
                             return file_path
                         except Exception as e:
                         except Exception as e:
                             print(f'视频保存失败: {e}')
                             print(f'视频保存失败: {e}')
@@ -165,8 +165,8 @@ class VideoIdentifier:
         
         
         for attempt in range(max_retries):
         for attempt in range(max_retries):
             try:
             try:
-                print(f"  开始上传视频到Gemini... (尝试 {attempt + 1}/{max_retries})")
-                print(f"    文件路径: {video_path}")
+                # print(f"  开始上传视频到Gemini... (尝试 {attempt + 1}/{max_retries})")
+                # print(f"    文件路径: {video_path}")
                 
                 
                 # 1. 文件检查
                 # 1. 文件检查
                 if not os.path.exists(video_path):
                 if not os.path.exists(video_path):
@@ -174,7 +174,7 @@ class VideoIdentifier:
                     return None
                     return None
                 
                 
                 file_size = os.path.getsize(video_path)
                 file_size = os.path.getsize(video_path)
-                print(f"    文件大小: {file_size / (1024*1024):.2f} MB")
+                # print(f"    文件大小: {file_size / (1024*1024):.2f} MB")
                 
                 
                 if file_size == 0:
                 if file_size == 0:
                     print(f"    错误: 文件大小为0")
                     print(f"    错误: 文件大小为0")
@@ -185,16 +185,16 @@ class VideoIdentifier:
                     with open(video_path, 'rb') as f:
                     with open(video_path, 'rb') as f:
                         # 尝试读取文件开头,检查是否可读
                         # 尝试读取文件开头,检查是否可读
                         f.read(1024)
                         f.read(1024)
-                    print(f"    文件权限: 可读")
+                    # print(f"    文件权限: 可读")
                 except Exception as e:
                 except Exception as e:
                     print(f"    错误: 文件无法读取 - {e}")
                     print(f"    错误: 文件无法读取 - {e}")
                     return None
                     return None
                 
                 
                 # 4. 尝试上传文件
                 # 4. 尝试上传文件
-                print(f"    开始上传文件...")
+                # print(f"    开始上传文件...")
                 try:
                 try:
                     video_file = genai.upload_file(path=video_path, mime_type='video/mp4')
                     video_file = genai.upload_file(path=video_path, mime_type='video/mp4')
-                    print(f"    文件上传请求已发送,文件ID: {video_file.name}")
+                    # print(f"    文件上传请求已发送,文件ID: {video_file.name}")
                 except Exception as e:
                 except Exception as e:
                     print(f"    错误: 文件上传请求失败 - {e}")
                     print(f"    错误: 文件上传请求失败 - {e}")
                     print(f"    错误类型: {type(e).__name__}")
                     print(f"    错误类型: {type(e).__name__}")
@@ -227,7 +227,7 @@ class VideoIdentifier:
                         # 获取最新状态
                         # 获取最新状态
                         video_file = genai.get_file(name=video_file.name)
                         video_file = genai.get_file(name=video_file.name)
                         current_state = video_file.state.name
                         current_state = video_file.state.name
-                        print(f"      状态: {current_state} ({wait_count}秒)")
+                        # print(f"      状态: {current_state} ({wait_count}秒)")
                         
                         
                         # 检查是否有错误状态
                         # 检查是否有错误状态
                         if current_state in ['FAILED', 'ERROR', 'INVALID']:
                         if current_state in ['FAILED', 'ERROR', 'INVALID']:
@@ -256,17 +256,17 @@ class VideoIdentifier:
                 # 6. 检查最终状态
                 # 6. 检查最终状态
                 if video_file.state.name == 'ACTIVE':
                 if video_file.state.name == 'ACTIVE':
                     print(f'    视频上传成功: {video_file.name}')
                     print(f'    视频上传成功: {video_file.name}')
-                    print(f"    最终状态: {video_file.state.name}")
+                    # print(f"    最终状态: {video_file.state.name}")
                     return video_file
                     return video_file
                 else:
                 else:
                     print(f'    错误: 视频文件上传失败')
                     print(f'    错误: 视频文件上传失败')
-                    print(f"    最终状态: {video_file.state.name}")
-                    print(f"    等待时间: {wait_count}秒")
+                    # print(f"    最终状态: {video_file.state.name}")
+                    # print(f"    等待时间: {wait_count}秒")
                     
                     
                     # 尝试获取更多错误信息
                     # 尝试获取更多错误信息
                     try:
                     try:
                         file_info = genai.get_file(name=video_file.name)
                         file_info = genai.get_file(name=video_file.name)
-                        print(f"    文件信息: {file_info}")
+                        # print(f"    文件信息: {file_info}")
                     except Exception as e:
                     except Exception as e:
                         print(f"    无法获取文件详细信息: {e}")
                         print(f"    无法获取文件详细信息: {e}")
                     
                     
@@ -348,7 +348,7 @@ class VideoIdentifier:
                 request_options={'timeout': 240}
                 request_options={'timeout': 240}
             )
             )
 
 
-            print(f"response: {response.text}")
+            # print(f"response: {response.text}")
             
             
             # 检查错误
             # 检查错误
             if hasattr(response, '_error') and response._error:
             if hasattr(response, '_error') and response._error:
@@ -357,7 +357,7 @@ class VideoIdentifier:
             # 解析JSON响应
             # 解析JSON响应
             try:
             try:
                 result = json.loads(response.text.strip())
                 result = json.loads(response.text.strip())
-                print(f"[视频分析] 响应: {result}")
+                # print(f"[视频分析] 响应: {result}")
                 
                 
                 if not isinstance(result, dict):
                 if not isinstance(result, dict):
                     raise ValueError("响应格式错误:非字典结构")
                     raise ValueError("响应格式错误:非字典结构")
@@ -428,16 +428,16 @@ class VideoIdentifier:
     
     
     def process_video_single(self, video_info: Dict[str, Any]) -> Dict[str, Any]:
     def process_video_single(self, video_info: Dict[str, Any]) -> Dict[str, Any]:
         """处理单个视频的完整流程"""
         """处理单个视频的完整流程"""
-        print(f"开始处理视频: {video_info['url'][:50]}...")
+        # print(f"开始处理视频: {video_info['url'][:50]}...")
         
         
         video_path = None
         video_path = None
         video_file = None
         video_file = None
         try:
         try:
             # 1. 下载视频
             # 1. 下载视频
-            print("  1. 下载视频...")
+            # print("  1. 下载视频...")
             video_path = self.download_video(video_info['url'])
             video_path = self.download_video(video_info['url'])
             if not video_path:
             if not video_path:
-                print("  视频下载失败")
+                # print("  视频下载失败")
                 return {
                 return {
                     'url': video_info['url'],
                     'url': video_info['url'],
                     'duration': video_info['duration'],
                     'duration': video_info['duration'],
@@ -451,15 +451,15 @@ class VideoIdentifier:
                 }
                 }
             
             
             # 2. 上传到Gemini
             # 2. 上传到Gemini
-            print("  2. 上传视频到Gemini...")
+            # print("  2. 上传视频到Gemini...")
             video_file = self.upload_video_to_gemini(video_path)
             video_file = self.upload_video_to_gemini(video_path)
             if not video_file:
             if not video_file:
-                print("  视频上传到Gemini失败")
+                # print("  视频上传到Gemini失败")
                 # 上传失败时也要清理缓存文件
                 # 上传失败时也要清理缓存文件
                 if video_path and os.path.exists(video_path):
                 if video_path and os.path.exists(video_path):
                     try:
                     try:
                         os.remove(video_path)
                         os.remove(video_path)
-                        print(f"  上传失败,缓存文件已清理: {video_path}")
+                        # print(f"  上传失败,缓存文件已清理: {video_path}")
                     except Exception as e:
                     except Exception as e:
                         print(f"  清理缓存文件失败: {e}")
                         print(f"  清理缓存文件失败: {e}")
                 
                 
@@ -476,7 +476,7 @@ class VideoIdentifier:
                 }
                 }
             
             
             # 3. 使用Gemini分析
             # 3. 使用Gemini分析
-            print("  3. 使用Gemini分析视频内容...")
+            # print("  3. 使用Gemini分析视频内容...")
             analysis_result = self.analyze_video_with_gemini(video_file, video_info)
             analysis_result = self.analyze_video_with_gemini(video_file, video_info)
             
             
             # 4. 组合结果
             # 4. 组合结果
@@ -487,7 +487,7 @@ class VideoIdentifier:
                 'iframe_details': analysis_result.get('iframe_details', '关键帧分析失败'),
                 'iframe_details': analysis_result.get('iframe_details', '关键帧分析失败'),
             }
             }
             
             
-            print("  视频分析完成")
+            # print("  视频分析完成")
             return final_result
             return final_result
             
             
         except Exception as e:
         except Exception as e:
@@ -525,7 +525,7 @@ class VideoIdentifier:
             if video_file and hasattr(video_file, 'name'):
             if video_file and hasattr(video_file, 'name'):
                 try:
                 try:
                     genai.delete_file(name=video_file.name)
                     genai.delete_file(name=video_file.name)
-                    print(f"  Gemini文件已清理: {video_file.name}")
+                    # print(f"  Gemini文件已清理: {video_file.name}")
                 except Exception as e:
                 except Exception as e:
                     print(f"  清理Gemini文件失败: {e}")
                     print(f"  清理Gemini文件失败: {e}")
     
     
@@ -569,7 +569,7 @@ def main():
     identifier = VideoIdentifier()
     identifier = VideoIdentifier()
     result = identifier.process_videos(test_content)
     result = identifier.process_videos(test_content)
     
     
-    print(f"识别结果: {json.dumps(result, ensure_ascii=False, indent=2)}")
+    # print(f"识别结果: {json.dumps(result, ensure_ascii=False, indent=2)}")
 
 
 
 
 if __name__ == '__main__':
 if __name__ == '__main__':