123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 |
- from typing import Annotated
- from typing_extensions import TypedDict
- from langgraph.graph import StateGraph, START, END
- from langgraph.graph.message import add_messages
- import os
- from langchain.chat_models import init_chat_model
- from IPython.display import Image, display
- from tools import evaluation_extraction_tool
- from langgraph.prebuilt import ToolNode, tools_condition
- from langgraph.checkpoint.memory import InMemorySaver
- import requests
- from dotenv import load_dotenv
- # 加载环境变量
- load_dotenv()
- graph=None
- llm_with_tools=None
- prompt="""
- ### 角色 (Role):
- 我将充当您的“评估报告检索专员”。当您需要了解特定主题的评估情况时,我将利用背后强大的【评估提取工具】(evaluation_extraction_tool) 来精确地从数据源中检索和整理相关评估报告、摘要或关键指标,并呈现给您。
- ### 目标 (Goal):
- 1. 根据特定的主题(关键词)快速获取相关的评估报告、数据摘要或关键指标,以便您能深入了解某个方面(如产品表现、服务质量、市场反馈、项目评估等)的详细评估情况。
- 2. 为您的每次查询提供一个唯一的标识符,以便您能轻松追踪和管理您的请求,确保数据的可追溯性。
- ---
- ### 工作流 (Workflow):
- 为了实现您的目标,并确保我能准确有效地为您服务,我们需要遵循以下简单的交互流程:
- 1. **您提供输入:** 您需要向我提供两个关键信息:
- * **关键词 (`query_word`)**:这是我用来筛选和定位评估内容的依据。请提供您关注的具体主题或词语。
- * **例如:** `新功能A的用户反馈`, `客户满意度调查结果`, `Q3市场评估报告`
- * **请求ID (`request_id`)**:这是一个唯一的任务标识,用于区分您的每一次请求,方便后续查询和管理。
- * **例如:** `20231027-REP001`, `PROJ_X-EVAL-V2`, `SESSION-USER-XYZ`
- 2. **我解析并调用工具:** 一旦我接收到您的输入,我将立即解析您提供的关键词和请求ID。
- * 如果信息完整且格式正确,我将自动调用【评估提取工具】(evaluation_extraction_tool)。
- * 如果信息有误或缺失,我将提示您重新输入。
- 3. **我返回结果:** 【评估提取工具】执行完毕后,我将把提取到的评估摘要、链接或相关数据返回给您。
- ---
- ### 输入信息:
- {input}
- ### 输出json格式:
- {
- "requestId":[请求ID],
- "status":2
- }
- """
- class State(TypedDict):
- messages: Annotated[list, add_messages]
- name: str
- birthday: str
- def chatbot(state: State):
- message = llm_with_tools.invoke(state["messages"])
- # Because we will be interrupting during tool execution,
- # we disable parallel tool calling to avoid repeating any
- # tool invocations when we resume.
- assert len(message.tool_calls) <= 1
- return {"messages": [message]}
- def execute_agent_with_api(user_input: str):
- global graph, llm_with_tools, prompt
-
- # 替换prompt中的{input}占位符为用户输入
- formatted_prompt = prompt.replace("{input}", user_input)
- try:
- # 如果graph或llm_with_tools未初始化,先初始化
- if graph is None or llm_with_tools is None:
- try:
- llm = init_chat_model("openai:gpt-4.1")
- tools = [evaluation_extraction_tool]
- llm_with_tools = llm.bind_tools(tools=tools)
-
- # 初始化图
- graph_builder = StateGraph(State)
- graph_builder.add_node("chatbot", chatbot)
-
- tool_node = ToolNode(tools=tools)
- graph_builder.add_node("tools", tool_node)
-
- graph_builder.add_conditional_edges(
- "chatbot",
- tools_condition,
- )
- graph_builder.add_edge("tools", "chatbot")
- graph_builder.add_edge(START, "chatbot")
-
- memory = InMemorySaver()
- graph = graph_builder.compile(checkpointer=memory)
- except Exception as e:
- return f"初始化Agent失败: {str(e)}"
-
- # 生成唯一的线程ID
- import uuid
- thread_id = str(uuid.uuid4())
-
- # 执行Agent并收集结果
- results = []
- config = {"configurable": {"thread_id": thread_id}}
-
- # 使用格式化后的prompt作为用户输入
- for event in graph.stream({"messages": [{"role": "user", "content": formatted_prompt}]}, config, stream_mode="values"):
- for value in event.values():
- # 保存消息内容
- if "messages" in event and len(event["messages"]) > 0:
- message = event["messages"][-1]
- results.append(message.content)
-
- # 返回结果
- return "\n".join(results) if results else "Agent执行完成,但没有返回结果"
- except requests.exceptions.ConnectionError as e:
- return f"OpenAI API 连接错误: {str(e)}\n请检查网络连接或代理设置。"
- except Exception as e:
- return f"执行Agent时出错: {str(e)}"
- def main():
- print(f"开始执行Agent")
- # 设置代理
- proxy_url = os.getenv('DYNAMIC_HTTP_PROXY')
- if proxy_url:
- os.environ["OPENAI_PROXY"] = proxy_url
- os.environ["HTTPS_PROXY"] = proxy_url
- os.environ["HTTP_PROXY"] = proxy_url
- # 执行Agent
- result = execute_agent_with_api('{"query_word":"图文策划方法","request_id":"REQUEST_001"}')
- print(result)
- if __name__ == '__main__':
- main()
|