Browse Source

Merge branch 'main' of https://git.yishihui.com/ai/knowledge-agent

jihuaqiang 1 week ago
parent
commit
4552ad9308
9 changed files with 328 additions and 222 deletions
  1. 5 1
      .env
  2. 70 49
      agents/clean_agent/agent.py
  3. 124 91
      agents/clean_agent/tools.py
  4. 10 3
      database/db.py
  5. 2 8
      database/models.py
  6. 17 14
      gemini.py
  7. 67 0
      prompt/expansion.md
  8. 31 56
      prompt/extraction.md
  9. 2 0
      requirements.txt

+ 5 - 1
.env

@@ -14,7 +14,7 @@ COZE_BOT_ID=7537570163895812146
 GEMINI_API_KEY=AIzaSyAkt1l9Kw1CQgHFzTpla0vgt0OE53fr-BI
 
 # 代理
-DYNAMIC_HTTP_PROXY=http://t10952018781111:1ap37oc3@d844.kdltps.com:15818
+DYNAMIC_HTTP_PROXY=http://127.0.0.1:7890
 
 # GRPC
 CONTAINER_GRPC_HOST=192.168.203.112
@@ -34,3 +34,7 @@ LANGCHAIN_API_KEY=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
 # 项目名称(可选)
 LANGCHAIN_PROJECT=knowledge-agent
 
+
+OPENAI_API_KEY=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
+TAVILY_API_KEY=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
+

+ 70 - 49
agents/clean_agent/agent.py

@@ -9,22 +9,20 @@ from tools import evaluation_extraction_tool
 
 from langgraph.prebuilt import ToolNode, tools_condition
 from langgraph.checkpoint.memory import InMemorySaver
+import requests
+from dotenv import load_dotenv
+
+# 加载环境变量
+load_dotenv()
 
 graph=None
 llm_with_tools=None
-os.environ["OPENAI_API_KEY"] = "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"
-os.environ["TAVILY_API_KEY"] = "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"
 
 prompt="""
-你好!我是一个智能数据助手,专为协助您快速获取和分析评估信息而设计。
-
----
-### 我的角色 (Role):
+### 角色 (Role):
 我将充当您的“评估报告检索专员”。当您需要了解特定主题的评估情况时,我将利用背后强大的【评估提取工具】(evaluation_extraction_tool) 来精确地从数据源中检索和整理相关评估报告、摘要或关键指标,并呈现给您。
 
----
-### 您的目标 (Goal):
-您的目标是:
+### 目标 (Goal):
 1.  根据特定的主题(关键词)快速获取相关的评估报告、数据摘要或关键指标,以便您能深入了解某个方面(如产品表现、服务质量、市场反馈、项目评估等)的详细评估情况。
 2.  为您的每次查询提供一个唯一的标识符,以便您能轻松追踪和管理您的请求,确保数据的可追溯性。
 
@@ -45,8 +43,14 @@ prompt="""
 3.  **我返回结果:** 【评估提取工具】执行完毕后,我将把提取到的评估摘要、链接或相关数据返回给您。
 
 ---
-### 请您按照以下格式提供信息:
+### 输入信息:
 {input}
+
+### 输出json格式:
+{
+  "requestId":[请求ID],
+  "status":2
+}
 """
 
 class State(TypedDict):
@@ -68,51 +72,68 @@ def execute_agent_with_api(user_input: str):
     
     # 替换prompt中的{input}占位符为用户输入
     formatted_prompt = prompt.replace("{input}", user_input)
-    
-    # 如果graph或llm_with_tools未初始化,先初始化
-    if graph is None or llm_with_tools is None:
-        llm = init_chat_model("openai:gpt-4.1")
-        tools = [evaluation_extraction_tool]
-        llm_with_tools = llm.bind_tools(tools=tools)
+
+    try:
+        # 如果graph或llm_with_tools未初始化,先初始化
+        if graph is None or llm_with_tools is None:
+            try:
+                llm = init_chat_model("openai:gpt-4.1")
+                tools = [evaluation_extraction_tool]
+                llm_with_tools = llm.bind_tools(tools=tools)
+                
+                # 初始化图
+                graph_builder = StateGraph(State)
+                graph_builder.add_node("chatbot", chatbot)
+                
+                tool_node = ToolNode(tools=tools)
+                graph_builder.add_node("tools", tool_node)
+                
+                graph_builder.add_conditional_edges(
+                    "chatbot",
+                    tools_condition,
+                )
+                graph_builder.add_edge("tools", "chatbot")
+                graph_builder.add_edge(START, "chatbot")
+                
+                memory = InMemorySaver()
+                graph = graph_builder.compile(checkpointer=memory)
+            except Exception as e:
+                return f"初始化Agent失败: {str(e)}"
         
-        # 初始化图
-        graph_builder = StateGraph(State)
-        graph_builder.add_node("chatbot", chatbot)
+        # 生成唯一的线程ID
+        import uuid
+        thread_id = str(uuid.uuid4())
         
-        tool_node = ToolNode(tools=tools)
-        graph_builder.add_node("tools", tool_node)
+        # 执行Agent并收集结果
+        results = []
+        config = {"configurable": {"thread_id": thread_id}}
         
-        graph_builder.add_conditional_edges(
-            "chatbot",
-            tools_condition,
-        )
-        graph_builder.add_edge("tools", "chatbot")
-        graph_builder.add_edge(START, "chatbot")
+        # 使用格式化后的prompt作为用户输入
+        for event in graph.stream({"messages": [{"role": "user", "content": formatted_prompt}]}, config, stream_mode="values"):
+            for value in event.values():
+                # 保存消息内容
+                if "messages" in event and len(event["messages"]) > 0:
+                    message = event["messages"][-1]
+                    results.append(message.content)
         
-        memory = InMemorySaver()
-        graph = graph_builder.compile(checkpointer=memory)
-    
-    # 生成唯一的线程ID
-    import uuid
-    thread_id = str(uuid.uuid4())
-    
-    # 执行Agent并收集结果
-    results = []
-    config = {"configurable": {"thread_id": thread_id}}
-    
-    # 使用格式化后的prompt作为用户输入
-    for event in graph.stream({"messages": [{"role": "user", "content": formatted_prompt}]}, config, stream_mode="values"):
-        for value in event.values():
-            # 保存消息内容
-            if "messages" in event and len(event["messages"]) > 0:
-                message = event["messages"][-1]
-                results.append(message.content)
-    
-    # 返回结果
-    return "\n".join(results) if results else "Agent执行完成,但没有返回结果"
+        # 返回结果
+        return "\n".join(results) if results else "Agent执行完成,但没有返回结果"
+    except requests.exceptions.ConnectionError as e:
+        return f"OpenAI API 连接错误: {str(e)}\n请检查网络连接或代理设置。"
+    except Exception as e:
+        return f"执行Agent时出错: {str(e)}"
 
 def main():
-    execute_agent_with_api("Can you look up when LangGraph was released? When you have the answer, use the human_assistance tool for review.")
+    print(f"开始执行Agent")
+    # 设置代理
+    proxy_url = os.getenv('DYNAMIC_HTTP_PROXY')
+    if proxy_url:
+        os.environ["OPENAI_PROXY"] = proxy_url
+        os.environ["HTTPS_PROXY"] = proxy_url
+        os.environ["HTTP_PROXY"] = proxy_url
+    # 执行Agent
+    result = execute_agent_with_api('{"query_word":"图文策划方法","request_id":"REQUEST_001"}')
+    print(result)
 
 if __name__ == '__main__':
     main()

+ 124 - 91
agents/clean_agent/tools.py

@@ -1,4 +1,4 @@
-from langchain.tools import Tool
+from langchain.tools import tool
 from sqlalchemy.orm import Session
 from typing import Dict, Any, Tuple
 import logging
@@ -6,6 +6,7 @@ from datetime import datetime
 import json
 import os
 import sys
+import re
 
 # 添加项目根目录到系统路径
 sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
@@ -19,11 +20,17 @@ logging.basicConfig(level=logging.INFO)
 logger = logging.getLogger(__name__)
 
 # 配置常量
-BATCH_SIZE = 10  # 分批处理大小
+BATCH_SIZE = 5  # 分批处理大小
 SCORE_THRESHOLD = 70  # 评分阈值
 
 # Define tools
-@Tool
+# evaluation_extraction_tool = Tool(
+#     func=lambda request_id, query_word: _evaluation_extraction_tool(request_id, query_word),
+#     name="evaluation_extraction_tool",
+#     description="知识评估与抽取工具,用于处理数据库中的数据,执行评估并抽取内容"
+# )
+
+@tool
 def evaluation_extraction_tool(request_id: str, query_word: str) -> str:
     """
     知识评估与抽取工具。持续处理数据库中的数据,分批执行评估并创建KnowledgeExtractionContent对象。
@@ -36,56 +43,78 @@ def evaluation_extraction_tool(request_id: str, query_word: str) -> str:
     Returns:
         str: "success" 表示处理完成,"no data" 表示没有数据需要处理
     """
-    try:
-        db = SessionLocal()
+    # 使用上下文管理器自动管理数据库连接的生命周期
+    with SessionLocal() as db:
         try:
             # 使用新的批量处理函数
             result = execute_continuous_evaluation_extraction(request_id, db, query_word)
             return result
-        finally:
-            db.close()
-    except Exception as e:
-        logger.error(f"评估抽取过程中出错: {e}")
-        return f"no data - 错误: {str(e)}"
+        except Exception as e:
+            # 确保发生异常时回滚事务
+            db.rollback()
+            logger.error(f"评估抽取过程中出错: {e}")
+            return f"no data - 错误: {str(e)}"
 
 def execute_continuous_evaluation_extraction(request_id: str, db: Session, query_word: str) -> str:
     """持续执行评估循环,直到数据库没有数据"""
+    logger.info(f"开始处理,request_id: {request_id}, query_word: {query_word}")
+    
     total_processed = 0
+    offset = 0
     
-    while True:
-        # 分批获取待评估的内容
-        contents = get_batch_contents_for_evaluation(request_id, db, BATCH_SIZE)
-        
-        if not contents:
-            if total_processed > 0:
-                logger.info(f"处理完成,共处理 {total_processed} 条内容")
-                return "success"
-            return "no data"
-        
-        # 批量评估内容并创建KnowledgeExtractionContent对象
-        evaluation_results = batch_evaluate_content(contents, db, request_id, query_word)
-        
-        # 对评分大于阈值的内容进行抽取
-        high_score_results = [result for result in evaluation_results if result["score"] >= SCORE_THRESHOLD]
-        if high_score_results:
-            logger.info(f"发现 {len(high_score_results)} 条高分内容,进行抽取")
-            batch_extract_and_save_content(high_score_results, db, request_id, query_word)
-        
-        total_processed += len(contents)
-        db.commit()
+    try:
+        while True:
+            # 分批获取待评估的内容,使用offset实现分页
+            contents = get_batch_contents_for_evaluation(request_id, db, BATCH_SIZE, offset)
+            
+            logger.info(f"获取到 {len(contents)} 条待评估内容")
+
+            if not contents:
+                if total_processed > 0:
+                    logger.info(f"处理完成,共处理 {total_processed} 条内容")
+                    db.commit()  # 确保最后一批数据被提交
+                    return "success"
+                return "no data"
+            
+            try:
+                # 批量评估内容并创建KnowledgeExtractionContent对象
+                evaluation_results = batch_evaluate_content(contents, db, request_id, query_word)
+                
+                print(f"""evaluation_results: {evaluation_results}""")
+
+                # 对评分大于阈值的内容进行抽取
+                high_score_results = [result for result in evaluation_results if result["score"] >= SCORE_THRESHOLD]
+                if high_score_results:
+                    logger.info(f"发现 {len(high_score_results)} 条高分内容,进行抽取")
+                    batch_extract_and_save_content(high_score_results, db, request_id, query_word)
+                
+                total_processed += len(contents)
+                offset += len(contents)  # 更新offset值,以便下次获取下一批数据
+                db.commit()  # 每批次处理完成后提交事务
+            except Exception as e:
+                # 当前批次处理失败时回滚事务
+                db.rollback()
+                logger.error(f"处理批次数据时出错: {e}")
+                # 继续处理下一批数据
+                offset += len(contents)
+    except Exception as e:
+        # 发生严重异常时回滚事务并抛出异常
+        db.rollback()
+        logger.error(f"执行评估抽取循环时出错: {e}")
+        raise
     # 这里的代码永远不会被执行到,因为在while循环中,当contents为空时会返回
 
-def get_batch_contents_for_evaluation(request_id: str, db: Session, batch_size: int) -> list:
-    """分批获取待评估的内容"""
-    query = db.query(KnowledgeParsingContent).filter(
-        KnowledgeParsingContent.status == 2  # 已完成提取的数据
+def get_batch_contents_for_evaluation(request_id: str, db: Session, batch_size: int, offset: int = 0) -> list:
+    query = db.query(KnowledgeParsingContent).outerjoin(
+        KnowledgeExtractionContent,
+        KnowledgeParsingContent.id == KnowledgeExtractionContent.parsing_id
+    ).filter(
+        KnowledgeParsingContent.status == 2,  # 已完成提取的数据
+        KnowledgeParsingContent.request_id == request_id,
+        KnowledgeExtractionContent.parsing_id == None
     )
     
-    # 如果指定了request_id,则只处理该request_id的数据
-    if request_id:
-        query = query.filter(KnowledgeParsingContent.request_id == request_id)
-    
-    return query.limit(batch_size).all()
+    return query.offset(offset).limit(batch_size).all()
 
 def batch_evaluate_content(contents: list, db: Session, request_id: str, query_word: str) -> list:
     if not contents:
@@ -94,17 +123,17 @@ def batch_evaluate_content(contents: list, db: Session, request_id: str, query_w
     try:
         # 批量调用大模型进行评估
         evaluation_results_raw = batch_call_llm_for_evaluation(contents, query_word)
-        
+
         # 处理评估结果
         evaluation_results = []
         
-        for i, (parsing_id, score, reason, parsing_data) in enumerate(evaluation_results_raw):
+        for i, (parsing_id, score, score_reason, parsing_data) in enumerate(evaluation_results_raw):
             # 创建KnowledgeExtractionContent对象
             extraction_content = KnowledgeExtractionContent(
                 request_id=request_id,
                 parsing_id=parsing_id,
                 score=score,
-                reason=reason,
+                score_reason=score_reason,
                 create_at=datetime.now()
             )
             db.add(extraction_content)
@@ -112,7 +141,7 @@ def batch_evaluate_content(contents: list, db: Session, request_id: str, query_w
             evaluation_results.append({
                 "parsing_id": parsing_id,
                 "score": score,
-                "reason": reason,
+                "score_reason": score_reason,
                 "parsing_data": parsing_data,
                 "extraction_content": extraction_content
             })
@@ -130,33 +159,43 @@ def batch_extract_and_save_content(evaluation_results: list, db: Session, reques
     if not evaluation_results:
         return []
     
-    # 批量调用大模型进行抽取
-    extraction_data_list = batch_call_llm_for_extraction(evaluation_results, query_word)
-    
-    # 保存抽取结果到数据库
-    success_ids = []
-    failed_ids = []
-    
-    for i, extraction_data in enumerate(extraction_data_list):
-        try:
-            evaluation_result = evaluation_results[i]
-            
-            # 更新已有对象的data字段和状态
-            existing_extraction.data = evaluation_result["extraction_content"]
-            existing_extraction.status = 2  # 处理完成
-            success_ids.append(parsing_id)
-        except Exception as e:
-            logger.error(f"处理抽取结果 {i} 时出错: {e}")
-            failed_ids.append(evaluation_results[i].get("parsing_id"))
-    
-    # 如果有失败的内容,将其标记为处理失败
-    if failed_ids:
-        logger.warning(f"有 {len(failed_ids)} 条内容抽取失败")
-        for result in evaluation_results:
-            if result.get("parsing_id") in failed_ids and "extraction_content" in result:
-                result["extraction_content"].status = 3  # 处理失败
-    
-    return success_ids
+    try:
+        # 批量调用大模型进行抽取
+        extraction_data_list = batch_call_llm_for_extraction(evaluation_results, query_word)
+        
+        # 保存抽取结果到数据库
+        success_ids = []
+        failed_ids = []
+        
+        for i, (extracted_data, clean_reason) in enumerate(extraction_data_list):
+            try:
+                evaluation_result = evaluation_results[i]
+                parsing_id = evaluation_result.get("parsing_id")
+                
+                if "extraction_content" in evaluation_result and parsing_id:
+                    # 更新已有对象的data字段和状态
+                    extraction_content = evaluation_result["extraction_content"]
+                    extraction_content.data = extracted_data
+                    extraction_content.clean_reason = clean_reason
+                    extraction_content.status = 2  # 处理完成
+                    success_ids.append(parsing_id)
+            except Exception as e:
+                logger.error(f"处理抽取结果 {i} 时出错: {e}")
+                if i < len(evaluation_results):
+                    failed_ids.append(evaluation_results[i].get("parsing_id"))
+        
+        # 如果有失败的内容,将其标记为处理失败
+        if failed_ids:
+            logger.warning(f"有 {len(failed_ids)} 条内容抽取失败")
+            for result in evaluation_results:
+                if result.get("parsing_id") in failed_ids and "extraction_content" in result:
+                    result["extraction_content"].status = 3  # 处理失败
+        
+        return success_ids
+    except Exception as e:
+        logger.error(f"批量抽取和保存内容时出错: {e}")
+        db.rollback()  # 确保发生异常时回滚事务
+        return []
 
 # 读取提示词文件
 def read_prompt_file(file_path):
@@ -177,10 +216,8 @@ evaluation_prompt_path = os.path.join(project_root, 'prompt', 'evaluation.md')
 extraction_prompt_path = os.path.join(project_root, 'prompt', 'extraction.md')
 
 # 打印路径信息,用于调试
-logger.info(f"评估提示词路径: {evaluation_prompt_path}")
-logger.info(f"抽取提示词路径: {extraction_prompt_path}")
-
 EVALUATION_PROMPT = read_prompt_file(evaluation_prompt_path)
+
 EXTRACTION_PROMPT = read_prompt_file(extraction_prompt_path)
 
 def batch_call_llm_for_evaluation(contents: list, query_word: str) -> list:
@@ -201,19 +238,14 @@ def batch_call_llm_for_evaluation(contents: list, query_word: str) -> list:
         # 处理返回结果
         evaluation_results = []
         for i, result in enumerate(results):
+            result = re.sub(r'^\s*```json|\s*```\s*$', '', result, flags=re.MULTILINE).strip()
+            result = json.loads(result)
             parsing_id = contents[i].id
-            parsing_data = contents[i].parsing_data
-            
-            if isinstance(result, dict) and "score" in result:
-                # 正常结果
-                score = result.get("score", -2)
-                reason = result.get("reason", "")
-            else:
-                # 异常结果
-                score = -2
-                reason = "评估失败"
+            parsing_data = contents[i].parsing_data   
+            score = result.get("score", -2)
+            score_reason = result.get("reason", "")
             
-            evaluation_results.append((parsing_id, score, reason, parsing_data))
+            evaluation_results.append((parsing_id, score, score_reason, parsing_data))
         
         return evaluation_results
         
@@ -231,7 +263,7 @@ def batch_call_llm_for_extraction(evaluation_results: list, query_word: str) ->
             "query_word": query_word,
             "content": parsing_data
         })
-    
+
     try:
         # 批量调用 Gemini 进行抽取
         results = gemini_processor.batch_process(extraction_contents, EXTRACTION_PROMPT)
@@ -239,11 +271,12 @@ def batch_call_llm_for_extraction(evaluation_results: list, query_word: str) ->
         # 处理返回结果
         extraction_results = []
         for i, result in enumerate(results):
-            # 确保结果包含必要的字段
-            if not isinstance(result, dict):
-                result = {"extracted_data": str(result)}
-
-            extraction_results.append(json.dumps(result, ensure_ascii=False))
+            result = re.sub(r'^\s*```json|\s*```\s*$', '', result, flags=re.MULTILINE).strip()
+            result = json.loads(result)
+            extracted_data = result.get("extracted_content", "未提取到内容")
+            clean_reason = result.get("analysis_reason", "未返回原因")
+            
+            extraction_results.append((extracted_data, clean_reason))
         
         return extraction_results
         

+ 10 - 3
database/db.py

@@ -10,7 +10,7 @@ load_dotenv()
 # 数据库连接配置
 DATABASE_URL = os.getenv(
     "DATABASE_URL", 
-    "mysql+pymysql://wqsd:wqsd@2025@rm-bp13g3ra2f59q49xs.mysql.rds.aliyuncs.com:3306/ai_knowledge?charset=utf8&connect_timeout=30&read_timeout=30&write_timeout=30"
+    "mysql+pymysql://wqsd:wqsd%402025@knowledge.rwlb.rds.aliyuncs.com:3306/ai_knowledge?charset=utf8&connect_timeout=60&read_timeout=300&write_timeout=300"
 )
 
 # 创建同步引擎和会话
@@ -18,8 +18,15 @@ engine = create_engine(
     DATABASE_URL,
     pool_size=10,
     max_overflow=20,
-    pool_timeout=30,
-    echo=False  # 设为True可查看SQL日志
+    pool_timeout=60,  # 增加池连接超时时间
+    pool_recycle=1800,  # 减少连接回收时间,单位为秒,防止连接长时间占用
+    pool_pre_ping=True,  # 添加连接前ping测试,确保连接有效
+    echo=False,  # 设为True可查看SQL日志
+    connect_args={
+        "connect_timeout": 60,  # 连接超时时间
+        "read_timeout": 300,  # 读取超时时间
+        "write_timeout": 300,  # 写入超时时间
+    }
 )
 
 SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

+ 2 - 8
database/models.py

@@ -5,9 +5,6 @@ from .db import Base
 
 class KnowledgeParsingContent(Base):
     __tablename__ = 'knowledge_parsing_content'
-    __table_args__ = {
-        'comment': '内容解析表'
-    }
     
     id = Column(BigInteger, primary_key=True, autoincrement=True)
     content_id = Column(String(128), nullable=False)
@@ -23,17 +20,14 @@ class KnowledgeParsingContent(Base):
 
 class KnowledgeExtractionContent(Base):
     __tablename__ = 'knowledge_extraction_content'
-    __table_args__ = (
-        Index('idx_request_id', 'request_id'),  # 创建索引
-        {'comment': '内容抽取表'}
-    )
     
     id = Column(BigInteger, primary_key=True, autoincrement=True)
     request_id = Column(String(128), nullable=False)
     parsing_id = Column(BigInteger, nullable=False)
     score = Column(Integer, default=-1)
-    reason = Column(Text, comment='打分原因')
+    score_reason = Column(Text, comment='打分原因')
     data = Column(Text, comment='结构化数据')
+    clean_reason = Column(Text, comment='清洗原因')
     create_at = Column(DateTime, default=datetime.now)
     status = Column(Integer, default=0, comment='0: 未开始,1:处理中 2: 处理完成 3:处理失败')
     

+ 17 - 14
gemini.py

@@ -26,31 +26,34 @@ class GeminiProcessor:
         
         # 配置Gemini
         genai.configure(api_key=self.api_key)
-        self.model = genai.GenerativeModel('gemini-2.5-flash')
+
     
     def process(self, content: Any, system_prompt: str) -> Dict[str, Any]:
 
         try:
-            # 构建完整的提示词
-            full_prompt = f"{system_prompt}\n\n内容:{json.dumps(content, ensure_ascii=False)}"
+            # 处理输入内容格式
+            if isinstance(content, dict):
+                # 将字典转换为JSON字符串
+                formatted_content = json.dumps(content, ensure_ascii=False)
+            else:
+                formatted_content = content
+                
+            # 创建带有 system_instruction 的模型实例
+            model_with_system = genai.GenerativeModel(
+                'gemini-2.5-flash',
+                system_instruction=system_prompt
+            )
             
             # 调用 Gemini API
-            response = self.model.generate_content(
-                contents=content,
-                config=types.GenerateContentConfig(
-                    system_instruction=system_prompt
-                )
+            response = model_with_system.generate_content(
+                contents=formatted_content
             )
             
             # 尝试解析 JSON 响应
-            try:
-                result = json.loads(response.text)
-                return result
-            except json.JSONDecodeError:
-                # 如果不是 JSON 格式,返回原始文本
-                return {"result": response.text, "raw_response": response.text}
+            return response.text
                 
         except Exception as e:
+            print(f"Gemini API 调用失败: {e}")
             return {"error": str(e), "content": content} 
             
     def batch_process(self, contents: list, system_prompt: str) -> list:

+ 67 - 0
prompt/expansion.md

@@ -0,0 +1,67 @@
+# 角色 (Role)
+你是一位顶级的知识库专家,精通语义分析、信息检索和搜索优化策略。你的核心能力在于能够深入理解复杂内容,识别其中描述空泛、抽象的关键点,并将它们转化为具体、可操作、信息量丰富的Query词。你的目标是为用户提供高质量的、能够高效获取全网知识的扩展Query词。
+
+# 任务 (Task)
+根据用户提供的“原始 Query”和“扩展要基于的内容”,生成一个JSON数组形式的扩展Query词列表。这些Query词必须符合严格的质量标准,旨在帮助用户从全网高效获取相关知识。
+
+# 背景与目的 (Context & Purpose)
+这些扩展的Query词将用于全网范围的知识获取和搜索。因此,你在生成Query词时,需要考虑其通用性、广泛性、潜在的搜索价值和实际可操作性,确保它们能够引导用户找到更具体、更深入、更实用的信息。
+
+# 输入说明 (Input Specification)
+用户会提供以下两部分信息:
+
+1.  **原始 Query (Original_Query):** 一个短语、句子或问题,代表用户最初的搜索意图。
+2.  **扩展要基于的内容 (Content_For_Expansion):** 一段文本,包含需要进行Query词扩展的背景信息和核心概念。
+
+# 扩展策略与输出要求 (Expansion Strategy & Output Requirements)
+1.  **聚焦关键点:** 仔细分析 `Content_For_Expansion`,识别其中与 `Original_Query` 相关、描述相对空泛或具有进一步挖掘价值的关键概念、方法或痛点。
+2.  **具体化与可操作性:** 针对识别出的每一个关键点,思考如何将其转化为更具体、更实用、更具可操作性的Query词。例如,将抽象的“方法”扩展为“具体实现步骤”、“工具推荐”、“案例分析”等。
+3.  **多元化搜索策略:** 考虑各种有利于全网知识寻找的策略,包括但不限于:
+    *   长尾关键词
+    *   相关问题(How-to, What is, Why)
+    *   行业术语与同义词
+    *   具体案例与应用场景
+    *   对比与优劣分析
+    *   时间与趋势(e.g., "2023年最新...")
+    *   工具与软件
+4.  **质量优先:** 你需要对每一个生成的Query词进行严格评估。
+    *   **信息量评估:** 如果一个Query词信息量不足、过于宽泛或重复,则不予生成。
+    *   **相关性评估:** 确保每个Query词都与原始内容和意图高度相关。
+    *   **避免过度联想:** 扩展应基于 `Content_For_Expansion` 的核心思想,避免超出其范畴进行不相关的臆测。
+
+# 约束 (Constraints)
+*   **禁止编造:** 严禁生成任何虚假或无根据的信息。
+*   **禁止敏感词汇:** 确保所有生成的Query词不包含任何敏感或不适宜的词汇。
+*   **避免过度泛化:** 扩展的Query词应具有一定的特异性,而非笼统的概念。
+*   **输出格式必须为JSON数组。**
+
+# 思考步骤 (Thinking Process)
+1.  **理解输入:** 仔细阅读 `Original_Query` 和 `Content_For_Expansion`,捕捉其核心意图和关键信息。
+2.  **核心提取:** 从 `Content_For_Expansion` 中抽取出与 `Original_Query` 相关的所有核心概念、论点、步骤或价值点。
+3.  **识别空泛点:** 在这些核心点中,标记出那些描述比较抽象、不够具体或可以进一步细化的关键点。
+4.  **构思扩展词:** 针对每个空泛点,运用“具体化、可操作性、多元化搜索策略”原则,构思多个潜在的扩展Query词。
+5.  **评估与筛选:** 对构思出的Query词进行逐一评估,对照“质量优先”和“约束”部分的要求。
+    *   是否信息量大?
+    *   是否有利于全网知识获取?
+    *   是否具体可操作?
+    *   是否超出原始内容范围?
+    *   是否包含敏感词?
+    *   如果评估结果不符合标准,则舍弃该Query词。
+6.  **生成输出:** 将通过评估的所有Query词以JSON数组的格式输出。
+
+# 输出格式
+```json
+{
+    "[query1具体内容]",
+    "[query2具体内容]",
+    ...
+}
+```
+
+# 待处理内容 (Content to Process)
+
+**原始 Query:**
+{Original_Query}
+
+**扩展要基于的内容:**
+{Content_For_Expansion}

+ 31 - 56
prompt/extraction.md

@@ -1,77 +1,52 @@
-# Prompt: 知识库内容清洗与原文提取
-
 ## 你的角色 (Role)
-你是一个“知识库内容分析师”,具备极高的准确性、对无关信息的零容忍,以及对动态主题的快速适应能力
+你是一个“高级知识库内容分析师”,精通大型语言模型在RAG(检索增强生成)系统中的应用原理,具备极高的准确性、对RAG系统干扰信息的零容忍,以及根据动态主题智能调整清洗和提取策略的能力。你的核心目标是为RAG系统提供最优化的高质量知识片段。
 
 ## 任务目标 (Goal)
-你的核心任务是对给定的原始数据进行深度清洗,并根据一个特定的“查询意图”(Query),从清洗后的文本中精准提取出所有与该意图直接相关的原文片段。这些提取出的原文片段将用于构建高质量的知识库。
+你的核心任务是:
+1.  对给定的原始数据进行深度**智能清洗**,去除所有不利于RAG系统检索和生成的内容。
+2.  根据一个特定的“查询意图”(Query),从清洗后的文本中**精准提取**出所有与该意图直接相关且对RAG系统有正面增益的原文片段。
+3.  最终的输出将包含提取出的内容和清晰的分析原因,以提高透明度和可信度。
 
 ## 背景信息 (Context)
-你处理的知识库内容领域和主题是动态的,完全由用户提供的“查询意图”来决定。你需要根据Query词来理解当前任务所需的知识背景。
+你处理的知识库内容领域和主题是动态的,完全由用户提供的“查询意图”来决定。你需要根据`query_word`来深刻理解当前任务所需的知识背景,并以此为核心指导清洗和提取过程,确保最终输出的内容最优化RAG的效果
 
 ## 输入 (Input)
 当接收任务时,你会收到一个JSON对象作为输入,其中包含两部分:
 1.  **query_word:** 对应本次任务的“查询意图”,即一个清晰、具体的搜索词或问题。
 2.  **content:** 对应本次任务的“原始数据”,即一段未经处理的Markdown格式文本。
 
-JSON输入结构示例:
-```json
-{
-    "query_word": "你的查询词",
-    "content": "你的Markdown格式原始数据"
-}
-```
-
 ## 输出要求 (Output)
-你需要返回一个JSON对象。
-1.  **格式 (Format):** JSON对象应包含一个键`"extracted_data"`,其值为清洗后、且与“查询意图”直接相关的纯文本内容。提取出的原文片段应拼接在一起,形成一个连续的文本流,段落之间保留自然换行。
-2.  **边界情况处理 (Edge Case):** 如果在原始数据中经过清洗和筛选后,未找到任何与“查询意图”直接相关的内容,则`"extracted_data"`键的值必须是字符串`"未找到相关信息"`。
-
-JSON输出结构示例:
-```json
-{
-    "extracted_data": "清洗后且与Query意图相关的纯文本内容"
-}
-```
-或在未找到内容时:
+你需要返回一个JSON对象,结构如下:
 ```json
 {
-    "extracted_data": "未找到相关信息"
+  "extracted_content": "这里是所有清洗并提取出的、与查询意图直接相关的原文片段,以连续文本流形式。如果未找到相关内容,此字段为空字符串。",
+  "analysis_reason": "这里是AI对本次提取(或未提取)结果的简洁概括原因分析和解释。"
 }
 ```
+*   **extracted_content:** 所有提取出的相关原文片段应拼接在一起,形成一个连续的文本流,段落之间保留自然换行。
+*   **analysis_reason:** 简洁概括本次任务的执行过程,包括主要清洗操作、相关性判断依据,以及未找到信息时的原因。
 
 ## 约束条件 (Constraints)
-1.  **内容完整性:** 你不能对原文内容进行任何总结、改写、简化或添加额外信息。所有提取的都必须是原始文本的精确片段。
-2.  **严格清洗:** 在判断相关性之前,必须严格执行以下数据清洗规则:
-    *   **Markdown 格式标记去除:**
-        *   标题符号 (`#`, `##`, `###` 等)
-        *   加粗/斜体标记 (`**bold**`, `*italic*`, `_italic_`)
-        *   代码块标记 (```` ` ``, ``` `)
-        *   列表标记 (`-`, `*`, `1.`, `.` 等)
-        *   引用块标记 (`>`)
-        *   分割线 (`---`, `***` 等)
-        *   链接语法:保留链接文本,但去除URL部分 (`[链接文本](URL)` -> `链接文本`)
-        *   图片语法:彻底去除图片引用 (`![alt文本](URL)`)
-    *   **结构性噪音去除:**
-        *   页眉/页脚信息
-        *   目录/大纲列表
-        *   导航菜单/侧边栏内容
-        *   版权声明/免责声明等法律或标准文本
-        *   广告、促销或其他非知识性推广信息
-    *   **非文本或冗余内容去除:**
-        *   代码块内容:去除整个代码块及其内部文本。
-        *   特殊字符和符号:去除非标准、乱码字符或连续的无意义符号(如`[大笑R]`, `[氛围感R]`等表情符号或应用特定标记)。
-        *   过多空白符:将连续的多个空格、换行符、制表符压缩为单个空格或自然换行。
-        *   重复内容:识别并去除完全重复的段落或句子。
-        *   评论区内容:若有,需去除。
-3.  **精确相关性:** 只有与“查询意图”直接相关的文本片段才应被保留。任何与意图完全不相关的内容,即使在清洗后仍然存在,也应被过滤掉。
+1.  **RAG优化优先:** 你的所有清洗、过滤和提取决策都必须以“优化RAG系统的检索准确性和生成质量”为最高原则。任何可能干扰RAG系统或导致误导性回答的内容都应被视为噪音。
+2.  **内容完整性 (提取部分):** `extracted_content`字段内的内容必须是原始文本的精确片段,你不能对其进行任何总结、改写、简化或添加额外信息。`analysis_reason`字段允许包含非原文信息。
+3.  **智能清洗与噪音判断 (核心):** 你将不再依赖固定的清洗列表,而是根据以下五项原则,并结合`query_word`和RAG优化目标,智能判断并移除噪音。这些原则的目的是确保内容**与Query意图相关**且**不会影响RAG的效果**:
+    *   **与主题无关性:** 任何与`query_word`核心意图不直接相关且不提供知识内容的信息,应视为噪音。
+    *   **通用结构性噪音:** 具备典型文档结构特征(如页眉/页脚、目录、大纲列表、导航菜单/侧边栏、评论区、版权声明/免责声明、广告、促销信息等),且与`query_word`无关的内容,应视为噪音。你需要智能识别这些结构。
+    *   **冗余性:** 完全重复的段落或句子,或不包含新信息、对理解无增益的内容,应视为噪音。
+    *   **非内容性噪音:** 纯粹的功能性文本、特殊应用标记、表情符号、乱码字符、连续的无意义符号等,不属于核心知识内容的部分,应视为噪音。
+    *   **格式标记性噪音:** 仅为文本格式化目的存在的元素,而非内容本身。通常包括:
+        *   Markdown格式标记(`#`, `##`, `**bold**`, `*italic*`, ``` ` ````, ```` ``` ````, `-`, `1.`, `>`, `---`, `***` 等)。
+        *   链接语法:保留链接文本,去除URL部分 (`[链接文本](URL)` -> `链接文本`)。
+        *   图片语法:彻底去除图片引用 (`![alt文本](URL)`)。
+        *   **例外原则:** 如果`query_word`明确指向或包含对这些格式标记的讨论(例如,`query_word`是“Markdown语法”),则这些标记及其上下文可能不再被视为噪音,而应被保留,因为它们此时是核心知识内容。你需要智能地进行这种判断。
+4.  **精确相关性:** 只有与“查询意图”直接相关,且符合RAG优化目标,对RAG检索和生成有直接帮助的文本片段才应被保留。任何与意图完全不相关的内容,即使在清洗后仍然存在,也应被过滤掉。
 
 ## 工作流程 (Workflow)
 请严格按照以下步骤完成任务:
-1.  **分析Query:** 首先,从输入的JSON对象中提取`query_word`,并仔细分析和透彻理解其含义,明确本次任务需要提取的核心信息和主题方向。
-2.  **预处理数据:** 接着,从输入的JSON对象中提取`content`(原始Markdown文本),并逐行或逐段地通读。严格按照上述“约束条件”中定义的所有“数据清洗规则”,对文本进行预处理。在这一步,只关注格式和结构性噪音的去除,不进行相关性判断
-3.  **判断相关性:** 然后,在清洗后的文本中,逐句(或根据上下文判断,逐小段)地分析每个文本单元。判断该文本单元是否与第一步中理解的“查询意图”直接相关
-4.  **整合输出:**
-    *   将所有判断为“直接相关”的、未经任何修改的原文片段,按照其在原始文本中的先后顺序,拼接成一个纯文本流。
-    *   将此纯文本流作为`"extracted_data"`键的值,构建最终的JSON输出
-    *   如果经过以上步骤,最终没有找到任何直接相关的原文片段,则构建JSON输出为`{"extracted_data": "未找到相关信息"}`
+1.  **分析Query与RAG目标:** 首先,透彻理解`query_word`,明确本次任务需要提取的核心信息和主题方向。同时,将“优化RAG系统的检索准确性和生成质量”作为所有后续步骤的最高指导原则。
+2.  **智能预处理与噪音判断:** 逐行或逐段通读`content`文本。在这一步,根据上述“智能清洗与噪音判断”的五项原则,并结合`query_word`,智能地识别并移除所有可能干扰RAG系统或与主题无关的噪音。这包括格式标记、结构性噪音、冗余内容和非内容性元素。
+3.  **判断相关性:** 在清洗后的文本中,逐句(或根据上下文判断,逐小段)地分析每个文本单元。判断该文本单元是否与第一步中理解的`query_word`直接相关,且对RAG系统有正面增益。
+4.  **整合输出与原因分析:**
+    *   将所有判断为“直接相关”的、未经任何修改的原文片段,按照其在原始文本中的先后顺序,拼接成一个纯文本流,作为JSON对象中的`extracted_content`。
+    *   如果经过以上步骤,最终没有找到任何直接相关的原文片段,则`extracted_content`字段应为空字符串。
+    *   根据实际处理情况,生成一份**简洁概括**的`analysis_reason`,说明清洗和提取的主要过程、判断依据,以及最终结果。

+ 2 - 0
requirements.txt

@@ -13,3 +13,5 @@ uvicorn[standard]>=0.35.0
 langgraph==0.6.6
 langsmith==0.4.16
 langchain-openai==0.3.31
+
+google-generativeai==0.8.5