Quellcode durchsuchen

合并后再存储

jihuaqiang vor 3 Tagen
Ursprung
Commit
468f0e3b51
3 geänderte Dateien mit 295 neuen und 1 gelöschten Zeilen
  1. 1 1
      agent.py
  2. 249 0
      agents/store_agent/agentMerge.py
  3. 45 0
      prompt/merge.md

+ 1 - 1
agent.py

@@ -28,7 +28,7 @@ from pydantic import BaseModel, Field
 import uvicorn
 from agents.clean_agent.agent import execute_agent_with_api, execute
 from agents.expand_agent.agent import execute_expand_agent_with_api, _update_expansion_status
-from agents.store_agent.agent import execute_store_agent
+from agents.store_agent.agentMerge import execute_store_agent
 
 # LangGraph 相关导入
 try:

+ 249 - 0
agents/store_agent/agentMerge.py

@@ -0,0 +1,249 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+import json
+import os
+import time
+from typing import List, Tuple, Optional, Dict
+
+import requests
+
+from utils.logging_config import get_logger
+from utils.mysql_db import MysqlHelper
+
+
+logger = get_logger('StoreAgent')
+
+# 导入 LLM 处理器
+try:
+    from gemini import GeminiProcessor
+    HAS_GEMINI = True
+except ImportError:
+    HAS_GEMINI = False
+
+
+CHUNK_API_URL = "http://61.48.133.26:8001/api/chunk"
+SCORE_THRESHOLD = 70
+
+
+def _update_store_status(request_id: str, status: int) -> None:
+    try:
+        sql = "UPDATE knowledge_request SET store_status = %s WHERE request_id = %s"
+        MysqlHelper.update_values(sql, (status, request_id))
+        logger.info(f"更新store状态成功: requestId={request_id}, status={status}")
+    except Exception as e:
+        logger.error(f"更新store状态失败: requestId={request_id}, status={status}, error={e}")
+
+def _update_store_content_status(request_id: str, dataset_id: int, data: str, doc_id: str, query: str) -> None:
+    try:
+        sql = "INSERT INTO knowledge_store_content (request_id, doc_id, query, data, dataset_id, create_timestamp) VALUES (%s, %s, %s, %s, %s, NOW())"
+        MysqlHelper.update_values(sql, (request_id, doc_id, query, data, dataset_id))
+        logger.info(f"存储store_content成功: requestId={request_id}, doc_id={doc_id}, query={query}, data={data}, dataset_id={dataset_id}")
+    except Exception as e:
+        logger.error(f"存储store_content失败: requestId={request_id or 'None'}, doc_id={doc_id or 'None'}, query={query or 'None'}, data={data or 'None'}, dataset_id={dataset_id or 'None'}, error={e}")
+
+def _fetch_query(request_id: str) -> str:
+    sql = "SELECT query FROM knowledge_request WHERE request_id = %s"
+    rows = MysqlHelper.get_values(sql, (request_id,))
+    if rows and len(rows[0]) > 0:
+        return rows[0][0] or ""
+    return ""
+
+
+def _fetch_extraction_data(request_id: str) -> List[Dict[str, str]]:
+    sql = (
+        "SELECT parsing_id, data FROM knowledge_extraction_content "
+        "WHERE request_id = %s AND data IS NOT NULL AND data != '' AND score >= %s"
+    )
+    rows = MysqlHelper.get_values(sql, (request_id, SCORE_THRESHOLD))
+    if not rows:
+        return []
+    return [{"parsing_id": str(row[0]), "data": row[1]} for row in rows]
+
+
+def _fetch_content_id_by_parsing_id(parsing_id: str) -> Optional[str]:
+    sql = "SELECT content_id FROM knowledge_parsing_content WHERE id = %s"
+    rows = MysqlHelper.get_values(sql, (parsing_id,))
+    if rows and len(rows[0]) > 0:
+        return rows[0][0]
+    return None
+
+
+def _fetch_channel_by_content_id(content_id: str) -> Optional[str]:
+    sql = "SELECT channel FROM knowledge_crawl_content WHERE content_id = %s LIMIT 1"
+    rows = MysqlHelper.get_values(sql, (content_id,))
+    if rows and len(rows[0]) > 0:
+        return rows[0][0]
+    return None
+
+
+def _resolve_dataset_id(request_id: str) -> int:
+    """根据 knowledge_query.knowledge_type 解析 dataset_id"""
+    try:
+        sql = "SELECT knowledge_type FROM knowledge_query WHERE request_id = %s ORDER BY id DESC LIMIT 1"
+        rows = MysqlHelper.get_values(sql, (request_id,))
+        if rows:
+            knowledge_type = rows[0][0] or ""
+            if knowledge_type == "工具知识":
+                return 12
+            if knowledge_type == "内容知识":
+                return 11
+    except Exception as e:
+        logger.warning(f"解析dataset_id失败,使用默认: requestId={request_id}, error={e}")
+    # 默认兜底
+    return 12
+
+
+def _load_system_prompt() -> str:
+    """加载 merge.md 作为 system prompt"""
+    try:
+        template_path = os.path.join(
+            os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
+            "prompt",
+            "merge.md"
+        )
+        with open(template_path, 'r', encoding='utf-8') as f:
+            return f.read()
+    except Exception as e:
+        logger.error(f"读取 merge.md 失败: {e}")
+        # 备用 system prompt
+        return """你是一名专业的知识整合专家,负责根据同一 query 下的多份互联网清洗知识,生成一份融合性强、内容准确、表达自然的综合知识文本。
+请根据提供的 query 与其对应的多个知识片段,融合生成一份高质量的综合知识文本。
+输出一篇完整的综合知识文本,不输出任何解释、提示语或"以下是结果"。"""
+
+
+def _build_user_message(query: str, data_list: List[Dict[str, str]]) -> str:
+    """按照 merge.md 格式构建用户消息"""
+    # 构建知识片段列表
+    knowledge_parts = []
+    for idx, item in enumerate(data_list, 1):
+        knowledge_parts.append(f"[知识片段{idx}]\n{item.get('data', '')}")
+    
+    knowledge_text = "\n\n".join(knowledge_parts)
+    
+    # 按照 merge.md 中的输入格式构建消息
+    return f"[query]\n{query}\n\n{knowledge_text}"
+
+
+def _merge_with_llm(query: str, data_list: List[Dict[str, str]]) -> Optional[str]:
+    """调用 Gemini LLM 进行知识合并"""
+    if not HAS_GEMINI:
+        logger.error("Gemini 处理器不可用")
+        return None
+    
+    try:
+        # 加载 system prompt
+        system_prompt = _load_system_prompt()
+        
+        # 构建用户消息
+        user_message = _build_user_message(query, data_list)
+        
+        # 调用 Gemini
+        processor = GeminiProcessor()
+        result = processor.process(content=user_message, system_prompt=system_prompt)
+        
+        # 处理返回结果
+        if isinstance(result, dict):
+            if "error" in result:
+                logger.error(f"Gemini API 返回错误: {result['error']}")
+                return None
+            text = result.get("result", "") or result.get("raw_response", "")
+        else:
+            text = str(result)
+        
+        logger.info(f"Gemini 合并成功,结果长度: {len(text) if text else 0}")
+        return text if text else None
+        
+    except Exception as e:
+        logger.error(f"Gemini 调用失败: {e}")
+        return None
+
+
+def _upload_chunk(text: str, query: str, channel: str = "", dataset_id: int = 12, request_id: str = None, max_retries: int = 1, backoff_sec: float = 1.0) -> bool:
+    # ext 需要是字符串 JSON
+    payload = {
+        "dataset_id": dataset_id,
+        "title": "",
+        "text": text,
+        "ext": json.dumps({"query": query, "channel": channel or ""}, ensure_ascii=False),
+    }
+    headers = {"Content-Type": "application/json; charset=utf-8"}
+
+    for attempt in range(max_retries):
+        try:
+            # 以 GET 方法发送,body 为 JSON 字符串
+            body = json.dumps(payload, ensure_ascii=False).encode('utf-8')
+            resp = requests.post(CHUNK_API_URL, headers=headers, data=body, timeout=30)
+            try:
+                logger.info(f"上传chunk成功: resp={resp.json()}")
+            except Exception:
+                logger.info(f"上传chunk返回非JSON: text={resp.text[:500]}")
+            if resp.json().get("doc_id"):
+                # 取出doc_id,存储到knowledge_store_content表的doc_id字段
+                _update_store_content_status(request_id, dataset_id, text, resp.json().get("doc_id"), query)
+                return True
+            logger.warning(f"上传失败,状态码: {resp.status_code}, 第{attempt+1}次重试")
+        except Exception as e:
+            logger.warning(f"上传异常: {e}, 第{attempt+1}次重试")
+        time.sleep(backoff_sec * (2 ** attempt))
+    return False
+
+
+def execute_store_agent(request_id: str) -> Tuple[int, int]:
+    """
+    执行存储流程:
+    1) 更新 store_status = 1
+    2) 读取 query 和符合条件的 data 列表
+    3) 使用 merge.md 结合 query 和内容列表进行汇总合并
+    4) 仅上传合并后的结果
+    5) 成功 -> store_status = 2;失败 -> store_status = 3
+
+    返回: (total, success)
+    """
+    _update_store_status(request_id, 1)
+
+    try:
+        # 1. 获取 query 和数据列表
+        query = _fetch_query(request_id)
+        data_list = _fetch_extraction_data(request_id)
+        dataset_id = _resolve_dataset_id(request_id)
+
+        total = len(data_list)
+
+        if total == 0:
+            # 没有可上传数据,按失败处理
+            _update_store_status(request_id, 2)
+            _update_store_content_status(request_id, dataset_id, '', '', query)
+            logger.info(f"无可上传数据: requestId={request_id}")
+            return (0, 0)
+
+        logger.info(f"开始合并知识: requestId={request_id}, query={query}, 片段数={total}")
+
+        # 2. 使用 LLM 合并内容
+        merged_content = _merge_with_llm(query, data_list)
+
+        if not merged_content:
+            # 合并失败,按失败处理
+            _update_store_status(request_id, 3)
+            logger.error(f"知识合并失败: requestId={request_id}")
+            return (total, 0)
+
+        logger.info(f"知识合并成功: requestId={request_id}, 合并后内容={merged_content[:500]}")
+
+        # 4. 上传合并后的内容
+        ok = _upload_chunk(merged_content, query, '', dataset_id, request_id)
+
+        if ok:
+            _update_store_status(request_id, 2)
+            logger.info(f"store完成: requestId={request_id}, 原始片段数={total}, 合并后上传成功")
+            return (total, 1)
+        else:
+            _update_store_status(request_id, 3)
+            logger.error(f"store失败: requestId={request_id}, 上传合并内容失败")
+            return (total, 0)
+
+    except Exception as e:
+        logger.error(f"store流程异常: requestId={request_id}, error={e}")
+        _update_store_status(request_id, 3)
+        return (0, 0)
+

+ 45 - 0
prompt/merge.md

@@ -0,0 +1,45 @@
+# 角色设定
+你是一名专业的知识整合专家,负责根据同一 query 下的多份互联网清洗知识,生成一份融合性强、内容准确、表达自然的综合知识文本。
+你的任务不是摘要或罗列,而是进行语义理解、内容提炼与整合。
+
+# 任务目标
+请根据提供的 query 与其对应的多个 知识片段,融合生成一份高质量的综合知识文本。
+在整合过程中,请遵循以下规则:
+1. 以 query 为核心
+    - 理解 query 的真实意图,围绕它组织所有知识内容。
+    - 输出结果应自然地回答、阐述或解释该 query。
+
+2. 语义融合
+    - 综合不同片段中的相关、互补信息,形成一个完整、连贯的叙述。
+    - 删除重复、片面或噪声内容,必要时整合示例以丰富表达。
+
+3. 逻辑与语言质量
+    - 保持逻辑清晰、语言自然流畅,语气可参考行业知识解读或工具教程类内容。
+    - 语言风格应信息密度高、可读性强,可直接作为知识库或工具教程参考内容。
+
+4. 冲突处理与泛化
+    - 如果多个片段信息存在矛盾或细节冲突,请进行语义判断并生成合理的统一版本。
+    - 保留核心规律、原则和方法,不必拘泥于具体平台、品牌名或时间线。
+
+5. 改写与创新表达
+    - 不要照抄原文,应通过语义理解与再组织形成自然的融合性表达。
+    - 可在不改变事实的前提下重组语言,使内容更流畅。
+
+# 输入格式
+[query]
+(query词)
+
+[知识片段1]
+(内容)
+
+[知识片段2]
+(内容)
+
+[知识片段3]
+(内容)
+……
+
+# 输出要求
+- 输出一篇完整的综合知识文本;
+- 不输出任何解释、提示语或“以下是结果”;
+- 生成内容应逻辑清晰、语义连贯,可直接作为知识库条目使用。