Browse Source

clean_agent

丁云鹏 1 week ago
parent
commit
473c7d6e41
5 changed files with 199 additions and 64 deletions
  1. 69 4
      README.md
  2. 80 60
      agents/clean_agent/agent.py
  3. 31 0
      api.py
  4. 0 0
      prompt/clean_agent.md
  5. 19 0
      start_api.sh

+ 69 - 4
README.md

@@ -1,6 +1,6 @@
 # Knowledge Agent API
 
-基于 FastAPI + LangGraph 的智能内容识别和结构化处理服务。
+基于 FastAPI + LangGraph 的智能内容识别和结构化处理服务。支持通过API输入指令执行Agent。
 
 ## 🚀 快速开始
 
@@ -14,17 +14,20 @@ pip install -r requirements.txt
 
 #### 方式一:使用启动脚本(推荐)
 ```bash
-./start_service.sh
+./start_service.sh  # 启动主服务
+./start_api.sh      # 启动API服务
 ```
 
 #### 方式二:直接运行
 ```bash
-python3 agent.py
+python3 agent.py  # 运行Agent
+python3 api.py    # 运行API服务
 ```
 
 #### 方式三:使用 uvicorn
 ```bash
-uvicorn agent:app --host 0.0.0.0 --port 8080 --reload
+uvicorn agent:app --host 0.0.0.0 --port 8080 --reload  # 主服务
+uvicorn api:app --host 0.0.0.0 --port 8000 --reload     # API服务
 ```
 
 ### 3. 访问服务
@@ -35,6 +38,34 @@ uvicorn agent:app --host 0.0.0.0 --port 8080 --reload
 
 ## 📚 API 接口
 
+### POST /execute
+通过API执行Agent处理用户指令
+
+**请求体**:
+```json
+{
+  "instruction": "你的指令内容"
+}
+```
+
+**响应**:
+```json
+{
+  "status": "success",
+  "result": "Agent执行结果..."
+}
+```
+
+### GET /
+API服务健康检查
+
+**响应**:
+```json
+{
+  "message": "Knowledge Agent API 服务已启动,请使用 /execute 端点执行Agent"
+}
+```
+
 ### POST /parse
 同步解析内容处理
 
@@ -156,6 +187,40 @@ knowledge-agent/
 5. **RESTful API**: 现代化的 HTTP API 接口
 6. **工作流管理**: 基于 LangGraph 的强大流程控制
 7. **状态管理**: 完整的处理状态跟踪和错误处理
+8. **Agent API**: 支持通过API直接执行Agent处理指令
+
+## 📝 API使用示例
+
+### Python示例
+
+```python
+import requests
+
+# API服务地址
+api_url = "http://localhost:8000/execute"
+
+# 发送请求
+response = requests.post(
+    api_url,
+    json={"instruction": "分析以下内容并提取关键信息:..."},
+)
+
+# 处理响应
+if response.status_code == 200:
+    result = response.json()
+    print(f"状态: {result['status']}")
+    print(f"结果: {result['result']}")
+else:
+    print(f"请求失败: {response.status_code}")
+```
+
+### cURL示例
+
+```bash
+curl -X POST http://localhost:8000/execute \
+  -H "Content-Type: application/json" \
+  -d '{"instruction": "分析以下内容并提取关键信息:..."}'
+```
 
 ## 🚨 注意事项
 

+ 80 - 60
agents/clean_agent/agent.py

@@ -5,8 +5,7 @@ from langgraph.graph.message import add_messages
 import os
 from langchain.chat_models import init_chat_model
 from IPython.display import Image, display
-from tools import multiply, add, divide, human_assistance
-from langchain_tavily import TavilySearch
+from tools import evaluation_extraction_tool
 
 from langgraph.prebuilt import ToolNode, tools_condition
 from langgraph.checkpoint.memory import InMemorySaver
@@ -16,6 +15,41 @@ llm_with_tools=None
 os.environ["OPENAI_API_KEY"] = "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"
 os.environ["TAVILY_API_KEY"] = "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"
 
+prompt="""
+你好!我是一个智能数据助手,专为协助您快速获取和分析评估信息而设计。
+
+---
+### 我的角色 (Role):
+我将充当您的“评估报告检索专员”。当您需要了解特定主题的评估情况时,我将利用背后强大的【评估提取工具】(evaluation_extraction_tool) 来精确地从数据源中检索和整理相关评估报告、摘要或关键指标,并呈现给您。
+
+---
+### 您的目标 (Goal):
+您的目标是:
+1.  根据特定的主题(关键词)快速获取相关的评估报告、数据摘要或关键指标,以便您能深入了解某个方面(如产品表现、服务质量、市场反馈、项目评估等)的详细评估情况。
+2.  为您的每次查询提供一个唯一的标识符,以便您能轻松追踪和管理您的请求,确保数据的可追溯性。
+
+---
+### 工作流 (Workflow):
+为了实现您的目标,并确保我能准确有效地为您服务,我们需要遵循以下简单的交互流程:
+
+1.  **您提供输入:** 您需要向我提供两个关键信息:
+    *   **关键词 (`query_word`)**:这是我用来筛选和定位评估内容的依据。请提供您关注的具体主题或词语。
+        *   **例如:** `新功能A的用户反馈`, `客户满意度调查结果`, `Q3市场评估报告`
+    *   **请求ID (`request_id`)**:这是一个唯一的任务标识,用于区分您的每一次请求,方便后续查询和管理。
+        *   **例如:** `20231027-REP001`, `PROJ_X-EVAL-V2`, `SESSION-USER-XYZ`
+
+2.  **我解析并调用工具:** 一旦我接收到您的输入,我将立即解析您提供的关键词和请求ID。
+    *   如果信息完整且格式正确,我将自动调用【评估提取工具】(evaluation_extraction_tool)。
+    *   如果信息有误或缺失,我将提示您重新输入。
+
+3.  **我返回结果:** 【评估提取工具】执行完毕后,我将把提取到的评估摘要、链接或相关数据返回给您。
+
+---
+### 请您按照以下格式提供信息:
+关键词:{query_word}
+请求ID:{request_id}
+"""
+
 class State(TypedDict):
     messages: Annotated[list, add_messages]
     name: str
@@ -30,69 +64,55 @@ def chatbot(state: State):
     assert len(message.tool_calls) <= 1
     return {"messages": [message]}
 
-def stream_graph_updates(user_input: str, thread_id: str):
-    config = {"configurable": {"thread_id": thread_id}}
-    for event in graph.stream({"messages": [{"role": "user", "content": user_input}]},config,
-    stream_mode="values"):
-        for value in event.values():
-            event["messages"][-1].pretty_print()
-
 def main():
-
-    global llm_with_tools, graph
+    start("Can you look up when LangGraph was released? When you have the answer, use the human_assistance tool for review.")
 
 
-    llm = init_chat_model("openai:gpt-4.1")
-    tool = TavilySearch(max_results=2)
-    tools=[tool, human_assistance]
+def execute_agent_with_api(user_input: str):
+    global graph, llm_with_tools
+    
+    # 如果graph或llm_with_tools未初始化,先初始化
+    if graph is None or llm_with_tools is None:
+        llm = init_chat_model("openai:gpt-4.1")
+        tools = [evaluation_extraction_tool]
+        llm_with_tools = llm.bind_tools(tools=tools)
+        
+        # 初始化图
+        graph_builder = StateGraph(State)
+        graph_builder.add_node("chatbot", chatbot)
+        
+        tool_node = ToolNode(tools=tools)
+        graph_builder.add_node("tools", tool_node)
+        
+        graph_builder.add_conditional_edges(
+            "chatbot",
+            tools_condition,
+        )
+        graph_builder.add_edge("tools", "chatbot")
+        graph_builder.add_edge(START, "chatbot")
+        
+        memory = InMemorySaver()
+        graph = graph_builder.compile(checkpointer=memory)
     
+    # 生成唯一的线程ID
+    import uuid
+    thread_id = str(uuid.uuid4())
+    
+    # 执行Agent并收集结果
+    results = []
+    config = {"configurable": {"thread_id": thread_id}}
+    
+
 
-    llm_with_tools = llm.bind_tools(tools = tools)
-    # The first argument is the unique node name
-    # The second argument is the function or object that will be called whenever
-    # the node is used.
-
-    graph_builder = StateGraph(State)
-    graph_builder.add_node("chatbot", chatbot)
-
-    tool_node = ToolNode(tools=tools)
-    graph_builder.add_node("tools", tool_node)
-
-    graph_builder.add_conditional_edges(
-        "chatbot",
-        tools_condition,
-    )
-    # Any time a tool is called, we return to the chatbot to decide the next step
-    graph_builder.add_edge("tools", "chatbot")
-    graph_builder.add_edge(START, "chatbot")
-
-    memory = InMemorySaver()
-    graph = graph_builder.compile(checkpointer=memory)
-
-    # 尝试显示图形(需要额外依赖)
-    try:
-        graph_image = graph.get_graph().draw_mermaid_png()
-        with open("graph_visualization.png", "wb") as f:
-            f.write(graph_image)
-        print("图形已保存为 'graph_visualization.png'")
-    except Exception as e:
-        print(f"无法生成图形: {e}")
-    thread_id = "1"
-    stream_graph_updates(("Can you look up when LangGraph was released? "
-    "When you have the answer, use the human_assistance tool for review."), thread_id)
-    # while True:
-    #     try:
-    #         user_input = input("User: ")
-    #         if user_input.lower() in ["quit", "exit", "q"]:
-    #             print("Goodbye!")
-    #             break
-    #         stream_graph_updates(user_input)
-    #     except:
-    #         # fallback if input() is not available
-    #         user_input = "What do you know about LangGraph?"
-    #         print("User: " + user_input)
-    #         stream_graph_updates(user_input)
-    #         break
+    for event in graph.stream({"messages": [{"role": "user", "content": user_input}]}, config, stream_mode="values"):
+        for value in event.values():
+            # 保存消息内容
+            if "messages" in event and len(event["messages"]) > 0:
+                message = event["messages"][-1]
+                results.append(message.content)
+    
+    # 返回结果
+    return "\n".join(results) if results else "Agent执行完成,但没有返回结果"
 
 
 if __name__ == '__main__':

+ 31 - 0
api.py

@@ -0,0 +1,31 @@
+from fastapi import FastAPI, HTTPException
+from pydantic import BaseModel
+from agents.clean_agent.agent import execute_agent_with_api
+import uvicorn
+
+app = FastAPI(title="Knowledge Agent API", description="API for executing knowledge agent")
+
+
+@app.post("/execute")
+async def execute_agent(input: str):
+    """
+    执行Agent处理用户指令
+    
+    Args:
+        user_input: 包含用户指令的对象
+        
+    Returns:
+        dict: 包含执行结果的字典
+    """
+    try:
+        result = execute_agent_with_api(input)
+        return {"status": "success", "result": result}
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"执行Agent时出错: {str(e)}")
+
+@app.get("/")
+async def root():
+    return {"message": "Knowledge Agent API 服务已启动,请使用 /execute 端点执行Agent"}
+
+if __name__ == "__main__":
+    uvicorn.run(app, host="0.0.0.0", port=8000)

+ 0 - 0
prompt/clean_agent.md


+ 19 - 0
start_api.sh

@@ -0,0 +1,19 @@
+#!/bin/bash
+
+# 启动Knowledge Agent API服务
+
+# 检查Python环境
+if ! command -v python3 &> /dev/null; then
+    echo "错误: 未找到python3命令"
+    exit 1
+fi
+
+# 检查依赖
+if ! python3 -c "import fastapi" &> /dev/null; then
+    echo "安装必要的依赖..."
+    pip3 install fastapi uvicorn
+fi
+
+# 启动API服务
+echo "启动Knowledge Agent API服务..."
+python3 api.py