Browse Source

增加存储任务

jihuaqiang 3 tuần trước cách đây
mục cha
commit
79aeda14e5
2 tập tin đã thay đổi với 134 bổ sung0 xóa
  1. 24 0
      agent.py
  2. 110 0
      agents/store_agent/agent.py

+ 24 - 0
agent.py

@@ -28,6 +28,7 @@ from pydantic import BaseModel, Field
 import uvicorn
 from agents.clean_agent.agent import execute_agent_with_api, execute
 from agents.expand_agent.agent import execute_expand_agent_with_api, _update_expansion_status
+from agents.store_agent.agent import execute_store_agent
 
 # LangGraph 相关导入
 try:
@@ -75,6 +76,9 @@ class ExtractRequest(BaseModel):
     requestId: str = Field(..., description="请求ID")
     query: str = Field(..., description="查询词")
 
+class StoreRequest(BaseModel):
+    requestId: str = Field(..., description="请求ID")
+
 # 全局变量
 identify_tool = None
 # 全局线程池
@@ -916,6 +920,26 @@ def update_extract_status(request_id: str, status: int):
     except Exception as e:
         logger.error(f"更新请求状态异常: requestId={request_id}, status={status}, error={e}")
 
+@app.post("/store")
+async def store(request: StoreRequest):
+    try:
+        requestId = request.requestId
+        logger.info(f"收到存储请求: requestId={requestId}")
+
+        # 在线程池中异步执行存储任务,不阻塞接口返回
+        def _execute_store_sync():
+            try:
+                total, success = execute_store_agent(requestId)
+                logger.info(f"异步存储任务完成: requestId={requestId}, total={total}, success={success}")
+            except Exception as e:
+                logger.error(f"异步存储任务失败: requestId={requestId}, error={e}")
+
+        THREAD_POOL.submit(_execute_store_sync)
+
+        return {"status": 1, "requestId": requestId, "message": "存储任务已启动并在后台处理"}
+    except Exception as e:
+        logger.error(f"启动存储任务失败: requestId={getattr(request, 'requestId', '')}, error={e}")
+        raise HTTPException(status_code=500, detail=f"启动存储任务失败: {str(e)}")
 if __name__ == "__main__":
     # 从环境变量获取配置
     import os

+ 110 - 0
agents/store_agent/agent.py

@@ -0,0 +1,110 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+import json
+import time
+from typing import List, Tuple
+
+import requests
+
+from utils.logging_config import get_logger
+from utils.mysql_db import MysqlHelper
+
+
+logger = get_logger('StoreAgent')
+
+
+CHUNK_API_URL = "http://61.48.133.26:8001/api/chunk"
+DATASET_ID = 14
+SCORE_THRESHOLD = 70
+
+
+def _update_store_status(request_id: str, status: int) -> None:
+    try:
+        sql = "UPDATE knowledge_request SET store_status = %s WHERE request_id = %s"
+        MysqlHelper.update_values(sql, (status, request_id))
+        logger.info(f"更新store状态成功: requestId={request_id}, status={status}")
+    except Exception as e:
+        logger.error(f"更新store状态失败: requestId={request_id}, status={status}, error={e}")
+
+
+def _fetch_query(request_id: str) -> str:
+    sql = "SELECT query FROM knowledge_request WHERE request_id = %s"
+    rows = MysqlHelper.get_values(sql, (request_id,))
+    if rows and len(rows[0]) > 0:
+        return rows[0][0] or ""
+    return ""
+
+
+def _fetch_extraction_data(request_id: str) -> List[str]:
+    sql = (
+        "SELECT data FROM knowledge_extraction_content "
+        "WHERE request_id = %s AND data IS NOT NULL AND data != '' AND score >= %s"
+    )
+    rows = MysqlHelper.get_values(sql, (request_id, SCORE_THRESHOLD))
+    return [row[0] for row in rows] if rows else []
+
+
+def _upload_chunk(text: str, query: str, max_retries: int = 3, backoff_sec: float = 1.0) -> bool:
+    payload = {
+        "dataset_id": DATASET_ID,
+        "title": "",
+        "text": text,
+        "ext": {"query": query, "channel": ""},
+    }
+    headers = {"Content-Type": "application/json"}
+
+    for attempt in range(max_retries):
+        try:
+            resp = requests.post(CHUNK_API_URL, headers=headers, json=payload, timeout=15)
+            if resp.status_code >= 200 and resp.status_code < 300:
+                return True
+            logger.warning(f"上传失败,状态码: {resp.status_code}, 第{attempt+1}次重试")
+        except Exception as e:
+            logger.warning(f"上传异常: {e}, 第{attempt+1}次重试")
+        time.sleep(backoff_sec * (2 ** attempt))
+    return False
+
+
+def execute_store_agent(request_id: str) -> Tuple[int, int]:
+    """
+    执行存储流程:
+    1) 更新 store_status = 1
+    2) 读取 query 和符合条件的 data 列表
+    3) 逐条上传到外部接口
+    4) 全部成功 -> store_status = 2;否则 -> store_status = 3
+
+    返回: (total, success)
+    """
+    _update_store_status(request_id, 1)
+
+    try:
+        query = _fetch_query(request_id)
+        data_list = _fetch_extraction_data(request_id)
+
+        total = len(data_list)
+        success = 0
+
+        if total == 0:
+            # 没有可上传数据,按失败处理
+            _update_store_status(request_id, 3)
+            logger.info(f"无可上传数据: requestId={request_id}")
+            return (0, 0)
+
+        for text in data_list:
+            ok = _upload_chunk(text, query)
+            success += 1 if ok else 0
+
+        if success == total:
+            _update_store_status(request_id, 2)
+        else:
+            _update_store_status(request_id, 3)
+
+        logger.info(f"store完成: requestId={request_id}, total={total}, success={success}")
+        return (total, success)
+    except Exception as e:
+        logger.error(f"store流程异常: requestId={request_id}, error={e}")
+        _update_store_status(request_id, 3)
+        return (0, 0)
+
+