from typing import Annotated from typing_extensions import TypedDict from langgraph.graph import StateGraph, START, END from langgraph.graph.message import add_messages import os from langchain_openai import ChatOpenAI from .tools import evaluation_extraction_tool from langgraph.prebuilt import ToolNode, tools_condition from langgraph.checkpoint.memory import InMemorySaver import requests from dotenv import load_dotenv from utils.logging_config import get_logger # 配置日志 logger = get_logger('CleanAgent') # 加载环境变量 load_dotenv() graph=None llm_with_tools=None prompt=""" ### 角色 (Role): 您是一个专业的评估报告检索助手,我的任务是根据用户的查询关键词,从评估报告中提取相关信息。 ### 目标 (Goal): 1. 根据特定的主题(关键词)快速获取相关的评估报告、数据摘要或关键指标,以便您能深入了解某个方面(如产品表现、服务质量、市场反馈、项目评估等)的详细评估情况。 2. 为您的每次查询提供一个唯一的标识符,以便您能轻松追踪和管理您的请求,确保数据的可追溯性。 --- ### 工作流 (Workflow): 1. 从输入信息中提取关键词(query_word)和请求ID(request_id) 2. 调用工具evaluation_extraction_tool,进行评估解析 3. 返回结果 --- ### 输入信息: {input} ### 输出json格式: { "requestId":[请求ID], "status":2 } """ class State(TypedDict): messages: Annotated[list, add_messages] name: str birthday: str def chatbot(state: State): message = llm_with_tools.invoke(state["messages"]) # Because we will be interrupting during tool execution, # we disable parallel tool calling to avoid repeating any # tool invocations when we resume. assert len(message.tool_calls) <= 1 return {"messages": [message]} def execute_agent_with_api(user_input: str): global graph, llm_with_tools, prompt # 替换prompt中的{input}占位符为用户输入 formatted_prompt = prompt.replace("{input}", user_input) try: # 如果graph或llm_with_tools未初始化,先初始化 if graph is None or llm_with_tools is None: try: # 使用新版本的 ChatOpenAI llm = ChatOpenAI(model="gpt-4") tools = [evaluation_extraction_tool] llm_with_tools = llm.bind_tools(tools=tools) # 初始化图 graph_builder = StateGraph(State) graph_builder.add_node("chatbot", chatbot) tool_node = ToolNode(tools=tools) graph_builder.add_node("tools", tool_node) graph_builder.add_conditional_edges( "chatbot", tools_condition, ) graph_builder.add_edge("tools", "chatbot") graph_builder.add_edge(START, "chatbot") # memory = InMemorySaver() # graph = graph_builder.compile(checkpointer=memory) graph = graph_builder.compile() except Exception as e: logger.error(f"初始化Agent失败: {str(e)}") return f"初始化Agent失败: {str(e)}" # 生成唯一的线程ID import uuid thread_id = str(uuid.uuid4()) # 执行Agent并收集结果 results = [] config = {"configurable": {"thread_id": thread_id}} # 使用格式化后的prompt作为用户输入 for event in graph.stream({"messages": [{"role": "user", "content": formatted_prompt}]}, config, stream_mode="values"): for value in event.values(): # 保存消息内容 if "messages" in event and len(event["messages"]) > 0: message = event["messages"][-1] results.append(message.content) # 返回结果 return "\n".join(results) if results else "Agent执行完成,但没有返回结果" except requests.exceptions.ConnectionError as e: return f"OpenAI API 连接错误: {str(e)}\n请检查网络连接或代理设置。" except Exception as e: return f"执行Agent时出错: {str(e)}" def main(): print(f"开始执行Agent") # 设置代理 proxy_url = os.getenv('DYNAMIC_HTTP_PROXY') if proxy_url: os.environ["OPENAI_PROXY"] = proxy_url os.environ["HTTPS_PROXY"] = proxy_url os.environ["HTTP_PROXY"] = proxy_url # 执行Agent result = execute_agent_with_api('{"query_word":"图文策划方法","request_id":"REQUEST_001"}') print(result) if __name__ == '__main__': main()