from typing import Annotated from typing_extensions import TypedDict from langgraph.graph import StateGraph, START, END from langgraph.graph.message import add_messages import os from langchain.chat_models import init_chat_model from IPython.display import Image, display from tools import evaluation_extraction_tool from langgraph.prebuilt import ToolNode, tools_condition from langgraph.checkpoint.memory import InMemorySaver import requests from dotenv import load_dotenv # 加载环境变量 load_dotenv() graph=None llm_with_tools=None prompt=""" ### 角色 (Role): 我将充当您的“评估报告检索专员”。当您需要了解特定主题的评估情况时,我将利用背后强大的【评估提取工具】(evaluation_extraction_tool) 来精确地从数据源中检索和整理相关评估报告、摘要或关键指标,并呈现给您。 ### 目标 (Goal): 1. 根据特定的主题(关键词)快速获取相关的评估报告、数据摘要或关键指标,以便您能深入了解某个方面(如产品表现、服务质量、市场反馈、项目评估等)的详细评估情况。 2. 为您的每次查询提供一个唯一的标识符,以便您能轻松追踪和管理您的请求,确保数据的可追溯性。 --- ### 工作流 (Workflow): 为了实现您的目标,并确保我能准确有效地为您服务,我们需要遵循以下简单的交互流程: 1. **您提供输入:** 您需要向我提供两个关键信息: * **关键词 (`query_word`)**:这是我用来筛选和定位评估内容的依据。请提供您关注的具体主题或词语。 * **例如:** `新功能A的用户反馈`, `客户满意度调查结果`, `Q3市场评估报告` * **请求ID (`request_id`)**:这是一个唯一的任务标识,用于区分您的每一次请求,方便后续查询和管理。 * **例如:** `20231027-REP001`, `PROJ_X-EVAL-V2`, `SESSION-USER-XYZ` 2. **我解析并调用工具:** 一旦我接收到您的输入,我将立即解析您提供的关键词和请求ID。 * 如果信息完整且格式正确,我将自动调用【评估提取工具】(evaluation_extraction_tool)。 * 如果信息有误或缺失,我将提示您重新输入。 3. **我返回结果:** 【评估提取工具】执行完毕后,我将把提取到的评估摘要、链接或相关数据返回给您。 --- ### 输入信息: {input} ### 输出json格式: { "requestId":[请求ID], "status":2 } """ class State(TypedDict): messages: Annotated[list, add_messages] name: str birthday: str def chatbot(state: State): message = llm_with_tools.invoke(state["messages"]) # Because we will be interrupting during tool execution, # we disable parallel tool calling to avoid repeating any # tool invocations when we resume. assert len(message.tool_calls) <= 1 return {"messages": [message]} def execute_agent_with_api(user_input: str): global graph, llm_with_tools, prompt # 替换prompt中的{input}占位符为用户输入 formatted_prompt = prompt.replace("{input}", user_input) try: # 如果graph或llm_with_tools未初始化,先初始化 if graph is None or llm_with_tools is None: try: llm = init_chat_model("openai:gpt-4.1") tools = [evaluation_extraction_tool] llm_with_tools = llm.bind_tools(tools=tools) # 初始化图 graph_builder = StateGraph(State) graph_builder.add_node("chatbot", chatbot) tool_node = ToolNode(tools=tools) graph_builder.add_node("tools", tool_node) graph_builder.add_conditional_edges( "chatbot", tools_condition, ) graph_builder.add_edge("tools", "chatbot") graph_builder.add_edge(START, "chatbot") memory = InMemorySaver() graph = graph_builder.compile(checkpointer=memory) except Exception as e: return f"初始化Agent失败: {str(e)}" # 生成唯一的线程ID import uuid thread_id = str(uuid.uuid4()) # 执行Agent并收集结果 results = [] config = {"configurable": {"thread_id": thread_id}} # 使用格式化后的prompt作为用户输入 for event in graph.stream({"messages": [{"role": "user", "content": formatted_prompt}]}, config, stream_mode="values"): for value in event.values(): # 保存消息内容 if "messages" in event and len(event["messages"]) > 0: message = event["messages"][-1] results.append(message.content) # 返回结果 return "\n".join(results) if results else "Agent执行完成,但没有返回结果" except requests.exceptions.ConnectionError as e: return f"OpenAI API 连接错误: {str(e)}\n请检查网络连接或代理设置。" except Exception as e: return f"执行Agent时出错: {str(e)}" def main(): print(f"开始执行Agent") # 设置代理 proxy_url = os.getenv('DYNAMIC_HTTP_PROXY') if proxy_url: os.environ["OPENAI_PROXY"] = proxy_url os.environ["HTTPS_PROXY"] = proxy_url os.environ["HTTP_PROXY"] = proxy_url # 执行Agent result = execute_agent_with_api('{"query_word":"图文策划方法","request_id":"REQUEST_001"}') print(result) if __name__ == '__main__': main()