#!/usr/bin/env python3 # -*- coding: utf-8 -*- import json import os import time from typing import List, Tuple, Optional, Dict import requests from utils.logging_config import get_logger from utils.mysql_db import MysqlHelper logger = get_logger('StoreAgent') # 导入 LLM 处理器 try: from gemini import GeminiProcessor HAS_GEMINI = True except ImportError: HAS_GEMINI = False CHUNK_API_URL = "http://61.48.133.26:8001/api/chunk" SCORE_THRESHOLD = 70 def _update_store_status(request_id: str, status: int) -> None: try: sql = "UPDATE knowledge_request SET store_status = %s WHERE request_id = %s" MysqlHelper.update_values(sql, (status, request_id)) logger.info(f"更新store状态成功: requestId={request_id}, status={status}") except Exception as e: logger.error(f"更新store状态失败: requestId={request_id}, status={status}, error={e}") def _update_store_content_status(request_id: str, dataset_id: int, data: str, doc_id: str, query: str) -> None: try: sql = "INSERT INTO knowledge_store_content (request_id, doc_id, query, data, dataset_id, create_timestamp) VALUES (%s, %s, %s, %s, %s, NOW())" MysqlHelper.update_values(sql, (request_id, doc_id, query, data, dataset_id)) logger.info(f"存储store_content成功: requestId={request_id}, doc_id={doc_id}, query={query}, data={data}, dataset_id={dataset_id}") except Exception as e: logger.error(f"存储store_content失败: requestId={request_id or 'None'}, doc_id={doc_id or 'None'}, query={query or 'None'}, data={data or 'None'}, dataset_id={dataset_id or 'None'}, error={e}") def _fetch_query(request_id: str) -> str: sql = "SELECT query FROM knowledge_request WHERE request_id = %s" rows = MysqlHelper.get_values(sql, (request_id,)) if rows and len(rows[0]) > 0: return rows[0][0] or "" return "" def _fetch_extraction_data(request_id: str) -> List[Dict[str, str]]: sql = ( "SELECT parsing_id, data FROM knowledge_extraction_content " "WHERE request_id = %s AND data IS NOT NULL AND data != '' AND score >= %s" ) rows = MysqlHelper.get_values(sql, (request_id, SCORE_THRESHOLD)) if not rows: return [] return [{"parsing_id": str(row[0]), "data": row[1]} for row in rows] def _fetch_content_id_by_parsing_id(parsing_id: str) -> Optional[str]: sql = "SELECT content_id FROM knowledge_parsing_content WHERE id = %s" rows = MysqlHelper.get_values(sql, (parsing_id,)) if rows and len(rows[0]) > 0: return rows[0][0] return None def _fetch_channel_by_content_id(content_id: str) -> Optional[str]: sql = "SELECT channel FROM knowledge_crawl_content WHERE content_id = %s LIMIT 1" rows = MysqlHelper.get_values(sql, (content_id,)) if rows and len(rows[0]) > 0: return rows[0][0] return None def _resolve_dataset_id(request_id: str) -> int: """根据 knowledge_query.knowledge_type 解析 dataset_id""" try: sql = "SELECT knowledge_type FROM knowledge_query WHERE request_id = %s ORDER BY id DESC LIMIT 1" rows = MysqlHelper.get_values(sql, (request_id,)) if rows: knowledge_type = rows[0][0] or "" if knowledge_type == "工具知识": return 12 if knowledge_type == "内容知识": return 11 except Exception as e: logger.warning(f"解析dataset_id失败,使用默认: requestId={request_id}, error={e}") # 默认兜底 return 12 def _load_system_prompt() -> str: """加载 merge.md 作为 system prompt""" try: template_path = os.path.join( os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), "prompt", "merge.md" ) with open(template_path, 'r', encoding='utf-8') as f: return f.read() except Exception as e: logger.error(f"读取 merge.md 失败: {e}") # 备用 system prompt return """你是一名专业的知识整合专家,负责根据同一 query 下的多份互联网清洗知识,生成一份融合性强、内容准确、表达自然的综合知识文本。 请根据提供的 query 与其对应的多个知识片段,融合生成一份高质量的综合知识文本。 输出一篇完整的综合知识文本,不输出任何解释、提示语或"以下是结果"。""" def _build_user_message(query: str, data_list: List[Dict[str, str]]) -> str: """按照 merge.md 格式构建用户消息""" # 构建知识片段列表 knowledge_parts = [] for idx, item in enumerate(data_list, 1): knowledge_parts.append(f"[知识片段{idx}]\n{item.get('data', '')}") knowledge_text = "\n\n".join(knowledge_parts) # 按照 merge.md 中的输入格式构建消息 return f"[query]\n{query}\n\n{knowledge_text}" def _merge_with_llm(query: str, data_list: List[Dict[str, str]]) -> Optional[str]: """调用 Gemini LLM 进行知识合并""" if not HAS_GEMINI: logger.error("Gemini 处理器不可用") return None try: # 加载 system prompt system_prompt = _load_system_prompt() # 构建用户消息 user_message = _build_user_message(query, data_list) # 调用 Gemini processor = GeminiProcessor() result = processor.process(content=user_message, system_prompt=system_prompt) # 处理返回结果 if isinstance(result, dict): if "error" in result: logger.error(f"Gemini API 返回错误: {result['error']}") return None text = result.get("result", "") or result.get("raw_response", "") else: text = str(result) logger.info(f"Gemini 合并成功,结果长度: {len(text) if text else 0}") return text if text else None except Exception as e: logger.error(f"Gemini 调用失败: {e}") return None def _upload_chunk(text: str, query: str, channel: str = "", dataset_id: int = 12, request_id: str = None, max_retries: int = 1, backoff_sec: float = 1.0) -> bool: # ext 需要是字符串 JSON payload = { "dataset_id": dataset_id, "title": "", "text": text, "ext": json.dumps({"query": query, "channel": channel or ""}, ensure_ascii=False), } headers = {"Content-Type": "application/json; charset=utf-8"} for attempt in range(max_retries): try: # 以 GET 方法发送,body 为 JSON 字符串 body = json.dumps(payload, ensure_ascii=False).encode('utf-8') resp = requests.post(CHUNK_API_URL, headers=headers, data=body, timeout=30) try: logger.info(f"上传chunk成功: resp={resp.json()}") except Exception: logger.info(f"上传chunk返回非JSON: text={resp.text[:500]}") if resp.json().get("doc_id"): # 取出doc_id,存储到knowledge_store_content表的doc_id字段 _update_store_content_status(request_id, dataset_id, text, resp.json().get("doc_id"), query) return True logger.warning(f"上传失败,状态码: {resp.status_code}, 第{attempt+1}次重试") except Exception as e: logger.warning(f"上传异常: {e}, 第{attempt+1}次重试") time.sleep(backoff_sec * (2 ** attempt)) return False def execute_store_agent(request_id: str) -> Tuple[int, int]: """ 执行存储流程: 1) 更新 store_status = 1 2) 读取 query 和符合条件的 data 列表 3) 使用 merge.md 结合 query 和内容列表进行汇总合并 4) 仅上传合并后的结果 5) 成功 -> store_status = 2;失败 -> store_status = 3 返回: (total, success) """ _update_store_status(request_id, 1) try: # 1. 获取 query 和数据列表 query = _fetch_query(request_id) data_list = _fetch_extraction_data(request_id) dataset_id = _resolve_dataset_id(request_id) total = len(data_list) if total == 0: # 没有可上传数据,按失败处理 _update_store_status(request_id, 2) _update_store_content_status(request_id, dataset_id, '', '', query) logger.info(f"无可上传数据: requestId={request_id}") return (0, 0) logger.info(f"开始合并知识: requestId={request_id}, query={query}, 片段数={total}") # 2. 使用 LLM 合并内容 merged_content = _merge_with_llm(query, data_list) if not merged_content: # 合并失败,按失败处理 _update_store_status(request_id, 3) logger.error(f"知识合并失败: requestId={request_id}") return (total, 0) logger.info(f"知识合并成功: requestId={request_id}, 合并后内容={merged_content[:500]}") # 4. 上传合并后的内容 ok = _upload_chunk(merged_content, query, '', dataset_id, request_id) if ok: _update_store_status(request_id, 2) logger.info(f"store完成: requestId={request_id}, 原始片段数={total}, 合并后上传成功") return (total, 1) else: _update_store_status(request_id, 3) logger.error(f"store失败: requestId={request_id}, 上传合并内容失败") return (total, 0) except Exception as e: logger.error(f"store流程异常: requestId={request_id}, error={e}") _update_store_status(request_id, 3) return (0, 0)