jihuaqiang 3 тижнів тому
батько
коміт
b8008676b2
1 змінених файлів з 39 додано та 8 видалено
  1. 39 8
      agents/store_agent/agent.py

+ 39 - 8
agents/store_agent/agent.py

@@ -3,7 +3,7 @@
 
 import json
 import time
-from typing import List, Tuple
+from typing import List, Tuple, Optional, Dict
 
 import requests
 
@@ -36,21 +36,39 @@ def _fetch_query(request_id: str) -> str:
     return ""
 
 
-def _fetch_extraction_data(request_id: str) -> List[str]:
+def _fetch_extraction_data(request_id: str) -> List[Dict[str, str]]:
     sql = (
-        "SELECT data FROM knowledge_extraction_content "
+        "SELECT parsing_id, data FROM knowledge_extraction_content "
         "WHERE request_id = %s AND data IS NOT NULL AND data != '' AND score >= %s"
     )
     rows = MysqlHelper.get_values(sql, (request_id, SCORE_THRESHOLD))
-    return [row[0] for row in rows] if rows else []
+    if not rows:
+        return []
+    return [{"parsing_id": str(row[0]), "data": row[1]} for row in rows]
 
 
-def _upload_chunk(text: str, query: str, max_retries: int = 3, backoff_sec: float = 1.0) -> bool:
+def _fetch_content_id_by_parsing_id(parsing_id: str) -> Optional[str]:
+    sql = "SELECT content_id FROM knowledge_parsing_content WHERE id = %s"
+    rows = MysqlHelper.get_values(sql, (parsing_id,))
+    if rows and len(rows[0]) > 0:
+        return rows[0][0]
+    return None
+
+
+def _fetch_channel_by_content_id(content_id: str) -> Optional[str]:
+    sql = "SELECT channel FROM knowledge_crawl_content WHERE content_id = %s LIMIT 1"
+    rows = MysqlHelper.get_values(sql, (content_id,))
+    if rows and len(rows[0]) > 0:
+        return rows[0][0]
+    return None
+
+
+def _upload_chunk(text: str, query: str, channel: str = "", max_retries: int = 3, backoff_sec: float = 1.0) -> bool:
     payload = {
         "dataset_id": DATASET_ID,
         "title": "",
         "text": text,
-        "ext": {"query": query, "channel": ""},
+        "ext": {"query": query, "channel": channel or ""},
     }
     headers = {"Content-Type": "application/json"}
 
@@ -91,8 +109,21 @@ def execute_store_agent(request_id: str) -> Tuple[int, int]:
             logger.info(f"无可上传数据: requestId={request_id}")
             return (0, 0)
 
-        for text in data_list:
-            ok = _upload_chunk(text, query)
+        for item in data_list:
+            text = item.get("data", "")
+            parsing_id = item.get("parsing_id")
+
+            channel = ""
+            try:
+                if parsing_id:
+                    content_id = _fetch_content_id_by_parsing_id(parsing_id)
+                    if content_id:
+                        channel_value = _fetch_channel_by_content_id(content_id)
+                        channel = channel_value or ""
+            except Exception as e:
+                logger.warning(f"获取channel失败: parsing_id={parsing_id}, error={e}")
+
+            ok = _upload_chunk(text, query, channel)
             success += 1 if ok else 0
 
         if success == total: