123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- import json
- import os
- import time
- from typing import List, Tuple, Optional, Dict
- import requests
- from utils.logging_config import get_logger
- from utils.mysql_db import MysqlHelper
- logger = get_logger('StoreAgent')
- # 导入 LLM 处理器
- try:
- from gemini import GeminiProcessor
- HAS_GEMINI = True
- except ImportError:
- HAS_GEMINI = False
- CHUNK_API_URL = "http://61.48.133.26:8001/api/chunk"
- SCORE_THRESHOLD = 70
- def _update_store_status(request_id: str, status: int) -> None:
- try:
- sql = "UPDATE knowledge_request SET store_status = %s WHERE request_id = %s"
- MysqlHelper.update_values(sql, (status, request_id))
- logger.info(f"更新store状态成功: requestId={request_id}, status={status}")
- except Exception as e:
- logger.error(f"更新store状态失败: requestId={request_id}, status={status}, error={e}")
- def _update_store_content_status(request_id: str, dataset_id: int, data: str, doc_id: str, query: str) -> None:
- try:
- sql = "INSERT INTO knowledge_store_content (request_id, doc_id, query, data, dataset_id, create_timestamp) VALUES (%s, %s, %s, %s, %s, NOW())"
- MysqlHelper.update_values(sql, (request_id, doc_id, query, data, dataset_id))
- logger.info(f"存储store_content成功: requestId={request_id}, doc_id={doc_id}, query={query}, data={data}, dataset_id={dataset_id}")
- except Exception as e:
- logger.error(f"存储store_content失败: requestId={request_id or 'None'}, doc_id={doc_id or 'None'}, query={query or 'None'}, data={data or 'None'}, dataset_id={dataset_id or 'None'}, error={e}")
- def _fetch_query(request_id: str) -> str:
- sql = "SELECT query FROM knowledge_request WHERE request_id = %s"
- rows = MysqlHelper.get_values(sql, (request_id,))
- if rows and len(rows[0]) > 0:
- return rows[0][0] or ""
- return ""
- def _fetch_extraction_data(request_id: str) -> List[Dict[str, str]]:
- sql = (
- "SELECT parsing_id, data FROM knowledge_extraction_content "
- "WHERE request_id = %s AND data IS NOT NULL AND data != '' AND score >= %s"
- )
- rows = MysqlHelper.get_values(sql, (request_id, SCORE_THRESHOLD))
- if not rows:
- return []
- return [{"parsing_id": str(row[0]), "data": row[1]} for row in rows]
- def _fetch_content_id_by_parsing_id(parsing_id: str) -> Optional[str]:
- sql = "SELECT content_id FROM knowledge_parsing_content WHERE id = %s"
- rows = MysqlHelper.get_values(sql, (parsing_id,))
- if rows and len(rows[0]) > 0:
- return rows[0][0]
- return None
- def _fetch_channel_by_content_id(content_id: str) -> Optional[str]:
- sql = "SELECT channel FROM knowledge_crawl_content WHERE content_id = %s LIMIT 1"
- rows = MysqlHelper.get_values(sql, (content_id,))
- if rows and len(rows[0]) > 0:
- return rows[0][0]
- return None
- def _resolve_dataset_id(request_id: str) -> int:
- """根据 knowledge_query.knowledge_type 解析 dataset_id"""
- try:
- sql = "SELECT knowledge_type FROM knowledge_query WHERE request_id = %s ORDER BY id DESC LIMIT 1"
- rows = MysqlHelper.get_values(sql, (request_id,))
- if rows:
- knowledge_type = rows[0][0] or ""
- if knowledge_type == "工具知识":
- return 12
- if knowledge_type == "内容知识":
- return 11
- except Exception as e:
- logger.warning(f"解析dataset_id失败,使用默认: requestId={request_id}, error={e}")
- # 默认兜底
- return 12
- def _load_system_prompt() -> str:
- """加载 merge.md 作为 system prompt"""
- try:
- template_path = os.path.join(
- os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
- "prompt",
- "merge.md"
- )
- with open(template_path, 'r', encoding='utf-8') as f:
- return f.read()
- except Exception as e:
- logger.error(f"读取 merge.md 失败: {e}")
- # 备用 system prompt
- return """你是一名专业的知识整合专家,负责根据同一 query 下的多份互联网清洗知识,生成一份融合性强、内容准确、表达自然的综合知识文本。
- 请根据提供的 query 与其对应的多个知识片段,融合生成一份高质量的综合知识文本。
- 输出一篇完整的综合知识文本,不输出任何解释、提示语或"以下是结果"。"""
- def _build_user_message(query: str, data_list: List[Dict[str, str]]) -> str:
- """按照 merge.md 格式构建用户消息"""
- # 构建知识片段列表
- knowledge_parts = []
- for idx, item in enumerate(data_list, 1):
- knowledge_parts.append(f"[知识片段{idx}]\n{item.get('data', '')}")
-
- knowledge_text = "\n\n".join(knowledge_parts)
-
- # 按照 merge.md 中的输入格式构建消息
- return f"[query]\n{query}\n\n{knowledge_text}"
- def _merge_with_llm(query: str, data_list: List[Dict[str, str]]) -> Optional[str]:
- """调用 Gemini LLM 进行知识合并"""
- if not HAS_GEMINI:
- logger.error("Gemini 处理器不可用")
- return None
-
- try:
- # 加载 system prompt
- system_prompt = _load_system_prompt()
-
- # 构建用户消息
- user_message = _build_user_message(query, data_list)
-
- # 调用 Gemini
- processor = GeminiProcessor()
- result = processor.process(content=user_message, system_prompt=system_prompt)
-
- # 处理返回结果
- if isinstance(result, dict):
- if "error" in result:
- logger.error(f"Gemini API 返回错误: {result['error']}")
- return None
- text = result.get("result", "") or result.get("raw_response", "")
- else:
- text = str(result)
-
- logger.info(f"Gemini 合并成功,结果长度: {len(text) if text else 0}")
- return text if text else None
-
- except Exception as e:
- logger.error(f"Gemini 调用失败: {e}")
- return None
- def _upload_chunk(text: str, query: str, channel: str = "", dataset_id: int = 12, request_id: str = None, max_retries: int = 1, backoff_sec: float = 1.0) -> bool:
- # ext 需要是字符串 JSON
- payload = {
- "dataset_id": dataset_id,
- "title": "",
- "text": text,
- "ext": json.dumps({"query": query, "channel": channel or ""}, ensure_ascii=False),
- }
- headers = {"Content-Type": "application/json; charset=utf-8"}
- for attempt in range(max_retries):
- try:
- # 以 GET 方法发送,body 为 JSON 字符串
- body = json.dumps(payload, ensure_ascii=False).encode('utf-8')
- resp = requests.post(CHUNK_API_URL, headers=headers, data=body, timeout=30)
- try:
- logger.info(f"上传chunk成功: resp={resp.json()}")
- except Exception:
- logger.info(f"上传chunk返回非JSON: text={resp.text[:500]}")
- if resp.json().get("doc_id"):
- # 取出doc_id,存储到knowledge_store_content表的doc_id字段
- _update_store_content_status(request_id, dataset_id, text, resp.json().get("doc_id"), query)
- return True
- logger.warning(f"上传失败,状态码: {resp.status_code}, 第{attempt+1}次重试")
- except Exception as e:
- logger.warning(f"上传异常: {e}, 第{attempt+1}次重试")
- time.sleep(backoff_sec * (2 ** attempt))
- return False
- def execute_store_agent(request_id: str) -> Tuple[int, int]:
- """
- 执行存储流程:
- 1) 更新 store_status = 1
- 2) 读取 query 和符合条件的 data 列表
- 3) 使用 merge.md 结合 query 和内容列表进行汇总合并
- 4) 仅上传合并后的结果
- 5) 成功 -> store_status = 2;失败 -> store_status = 3
- 返回: (total, success)
- """
- _update_store_status(request_id, 1)
- try:
- # 1. 获取 query 和数据列表
- query = _fetch_query(request_id)
- data_list = _fetch_extraction_data(request_id)
- dataset_id = _resolve_dataset_id(request_id)
- total = len(data_list)
- if total == 0:
- # 没有可上传数据,按失败处理
- _update_store_status(request_id, 2)
- _update_store_content_status(request_id, dataset_id, '', '', query)
- logger.info(f"无可上传数据: requestId={request_id}")
- return (0, 0)
- logger.info(f"开始合并知识: requestId={request_id}, query={query}, 片段数={total}")
- # 2. 使用 LLM 合并内容
- merged_content = _merge_with_llm(query, data_list)
- if not merged_content:
- # 合并失败,按失败处理
- _update_store_status(request_id, 3)
- logger.error(f"知识合并失败: requestId={request_id}")
- return (total, 0)
- logger.info(f"知识合并成功: requestId={request_id}, 合并后内容={merged_content[:500]}")
- # 4. 上传合并后的内容
- ok = _upload_chunk(merged_content, query, '', dataset_id, request_id)
- if ok:
- _update_store_status(request_id, 2)
- logger.info(f"store完成: requestId={request_id}, 原始片段数={total}, 合并后上传成功")
- return (total, 1)
- else:
- _update_store_status(request_id, 3)
- logger.error(f"store失败: requestId={request_id}, 上传合并内容失败")
- return (total, 0)
- except Exception as e:
- logger.error(f"store流程异常: requestId={request_id}, error={e}")
- _update_store_status(request_id, 3)
- return (0, 0)
|