소스 검색

Add toolkit image_describer and message_notifier. Add message_push_agent.

StrayWarrior 1 개월 전
부모
커밋
675f4fe71a

+ 127 - 0
pqai_agent/agents/message_push_agent.py

@@ -0,0 +1,127 @@
+from typing import Optional, List, Dict
+
+from pqai_agent.agents.simple_chat_agent import SimpleOpenAICompatibleChatAgent
+from pqai_agent.chat_service import VOLCENGINE_MODEL_DEEPSEEK_V3
+from pqai_agent.logging_service import logger
+from pqai_agent.toolkit.function_tool import FunctionTool
+from pqai_agent.toolkit.image_describer import ImageDescriber
+from pqai_agent.toolkit.message_notifier import MessageNotifier
+
+DEFAULT_SYSTEM_PROMPT = '''
+<基本设定>
+你是一位熟悉中老年用户交流习惯的微信客服Agent。
+你擅长以下事项:
+* 倾听、引导和共情,在对话中自然促进用户互动
+* 理解中老年人的典型情感需求、对话习惯
+* 分析用户的微信名、头像,以适合的话术与用户建立联系
+
+你的工作方法论:
+* 分析用户请求以确定核心需求
+* 为完成任务制定结构化的计划
+</基本设定>
+
+<语言设定>
+* 默认的工作语言:中文
+* 如果用户指定使用其它语言,则将其作为工作语言
+* 所有的思考和回答都要用工作语言
+</语言设定>
+
+<通用话术>
+* 时间锚点:"早上好!今天阳光这么好,您打算做点什么让自己开心的事呀?"
+* 轻量求助:"听说最近好多长辈在学手机拍照技巧,您有没有什么实用小窍门能教教我呀?"
+* 正向引导:"这个季节最适合喝养生茶啦,您平时喜欢枸杞红枣茶还是菊花茶呀?"
+</通用话术>
+
+<心理学技巧>
+* 怀旧效应:可以用"当年/以前"触发美好回忆
+* 具象化提问:避免抽象问题
+* 正向反馈圈:在后续对话中重复用户的关键词
+</心理学技巧>
+
+<风险规避原则>
+* 避免过度打扰和重复:注意分析历史对话
+* 避免过度解读
+* 文化适配:注意不同地域的用户文化差异
+* 准确性要求:不要使用虚构的信息
+</风险规避原则>
+
+<agent_loop>
+You are operating in an agent loop, iteratively completing tasks through these steps:
+1. Analyze Events: Understand user needs and current state through event stream, focusing on latest user messages and execution results
+2. Select Tools: Choose next tool call based on current state, task planning, relevant knowledge and available data APIs
+3. Wait for Execution: Selected tool action will be executed by sandbox environment with new observations added to event stream
+4. Iterate: Choose only one tool call per iteration, patiently repeat above steps until task completion
+5. Submit Results: Send results to user via message tools, providing deliverables and related files as message attachments
+6. Enter Standby: Enter idle state when all tasks are completed or user explicitly requests to stop, and wait for new tasks
+</agent_loop>
+'''
+
+QUERY_PROMPT_TEMPLATE = """现在,请通过多步思考,选择合适的方法向一位用户发起问候。
+# 已知用户的信息
+用户信息:
+- 姓名:{name}
+- 头像:{avatar}
+- 偏好的称呼:{preferred_nickname}
+- 年龄:{age}
+- 地区:{region}
+- 健康状况:{health_conditions}
+- 用药信息:{medications}
+- 兴趣爱好:{interests}
+# 已知过去的对话
+{dialogue_history}
+
+# 当前上下文信息
+时间:{current_datetime}
+
+注意对话信息的格式为: [角色][时间]对话内容
+注意一定要分析对话信息中的时间,避免和当前时间段不符的内容!注意一定要结合历史的对话情况进行分析和问候方式的选择!
+可以使用analyse_image分析用户头像。
+必须使用message_notify_user发送最终的问候内容,调用message_notify_user时不要传入除了问候内容外的其它任何信息。
+Please think step by step.
+ """
+
+class MessagePushAgent(SimpleOpenAICompatibleChatAgent):
+    """A specialized agent for message push tasks."""
+
+    def __init__(self, model: Optional[str] = VOLCENGINE_MODEL_DEEPSEEK_V3, system_prompt: Optional[str] = None,
+                 tools: Optional[List[FunctionTool]] = None,
+                 generate_cfg: Optional[dict] = None, max_run_step: Optional[int] = None):
+        system_prompt = system_prompt or DEFAULT_SYSTEM_PROMPT
+        tools = tools or []
+        tools = tools.copy()
+        tools.extend([
+            *ImageDescriber().get_tools(),
+            *MessageNotifier().get_tools()
+        ])
+        super().__init__(model, system_prompt, tools, generate_cfg, max_run_step)
+
+    def generate_message(self, user_profile: Dict, context: Dict, dialogue_history: List[Dict]) -> str:
+        query = QUERY_PROMPT_TEMPLATE.format(**user_profile, **context, dialogue_history=dialogue_history)
+        self.run(query)
+        for tool_call in reversed(self.tool_call_records):
+            if tool_call['name'] == MessageNotifier.message_notify_user.__name__:
+                return tool_call['arguments']['message']
+        return ''
+
+if __name__ == '__main__':
+    import pqai_agent.logging_service
+    pqai_agent.logging_service.setup_root_logger()
+    agent = MessagePushAgent()
+    test_user_profile = {
+        'name': '薛岱月',
+        'avatar': 'http://wx.qlogo.cn/mmhead/Q3auHgzwzM5glpnBtDUianJErYf9AQsptLM3N78xP3sOR8SSibsG35HQ/0',
+        'preferred_nickname': '月哥',
+        'age': 65,
+        'region': '北京',
+        'health_conditions': '高血压',
+        'medications': ['降压药'],
+        'interests': ['钓鱼', '旅游']
+    }
+    test_context = {
+        "current_datetime": "2025-05-13 08:00:00",
+    }
+    response = agent.generate_message(test_user_profile, test_context, [])
+    print(response)
+
+
+

+ 86 - 0
pqai_agent/agents/simple_chat_agent.py

@@ -0,0 +1,86 @@
+import json
+from typing import List, Optional
+
+from pqai_agent.agent import DEFAULT_MAX_RUN_STEPS
+from pqai_agent.chat_service import OpenAICompatible, VOLCENGINE_MODEL_DEEPSEEK_V3
+from pqai_agent.logging_service import logger
+from pqai_agent.toolkit.function_tool import FunctionTool
+from pqai_agent.toolkit.image_describer import ImageDescriber
+from pqai_agent.toolkit.message_notifier import MessageNotifier
+
+
+class SimpleOpenAICompatibleChatAgent:
+    """ 最简单的多步Agent实现 """
+    def __init__(self, model: str, system_prompt: str, tools: Optional[List[FunctionTool]] = None,
+                 generate_cfg: Optional[dict] = None, max_run_step: Optional[int] = None):
+        self.model = model
+        self.llm_client = OpenAICompatible.create_client(model)
+        self.system_prompt = system_prompt
+        self.tools = tools or []
+        self.tool_map = {tool.name: tool for tool in self.tools}
+        self.generate_cfg = generate_cfg or {}
+        self.max_run_step = max_run_step or DEFAULT_MAX_RUN_STEPS
+        self.tool_call_records = []
+
+    def run(self, user_input: str) -> str:
+        messages = [{"role": "system", "content": self.system_prompt}]
+        tools = [tool.get_openai_tool_schema() for tool in self.tools]
+        messages.append({"role": "user", "content": user_input})
+
+        n_steps = 0
+        logger.debug(f"start agent loop. messages: {messages}")
+        while n_steps < self.max_run_step:
+            response = self.llm_client.chat.completions.create(model=self.model, messages=messages, tools=tools, **self.generate_cfg)
+            message = response.choices[0].message
+            messages.append(message)
+            logger.debug(f"current step content: {message.content}")
+
+            if message.tool_calls:
+                for tool_call in message.tool_calls:
+                    function_name = tool_call.function.name
+                    arguments = json.loads(tool_call.function.arguments)
+                    logger.debug(f"call function[{function_name}], parameter: {arguments}")
+
+                    if function_name in self.tool_map:
+                        result = self.tool_map[function_name](**arguments)
+                        messages.append({
+                            "role": "tool",
+                            "tool_call_id": tool_call.id,
+                            "content": json.dumps(result, ensure_ascii=False)
+                        })
+                        self.tool_call_records.append({
+                            "name": function_name,
+                            "arguments": arguments,
+                            "result": result
+                        })
+                    else:
+                        logger.error(f"Function {function_name} not found in tool map.")
+                        raise Exception(f"Function {function_name} not found in tool map.")
+            else:
+                return message.content
+            n_steps += 1
+
+        raise Exception("Max run steps exceeded")
+
+if __name__ == '__main__':
+    import pqai_agent.logging_service
+    pqai_agent.logging_service.setup_root_logger()
+    tools = [
+        *ImageDescriber().get_tools(),
+        *MessageNotifier().get_tools()
+    ]
+    system_instruction = "You are a helpful assistant."
+    agent = SimpleOpenAICompatibleChatAgent(
+        model=VOLCENGINE_MODEL_DEEPSEEK_V3,
+        system_prompt=system_instruction,
+        tools=tools
+    )
+
+    user_input = query = """
+分析以下图片的内容:"http://wx.qlogo.cn/mmhead/Q3auHgzwzM5glpnBtDUianJErYf9AQsptLM3N78xP3sOR8SSibsG35HQ/0"
+根据内容联想作一首诗
+Please think step by step.
+ """
+
+    result = agent.run(user_input)
+    print(result)

+ 1 - 1
pqai_agent/chat_service.py

@@ -41,7 +41,7 @@ class ChatServiceType(Enum):
 
 class OpenAICompatible:
     @staticmethod
-    def create_client(model_name, **kwargs):
+    def create_client(model_name, **kwargs) -> OpenAI:
         volcengine_models = [
             VOLCENGINE_MODEL_DOUBAO_PRO_32K,
             VOLCENGINE_MODEL_DOUBAO_PRO_1_5,

+ 1 - 0
pqai_agent/toolkit/function_tool.py

@@ -225,6 +225,7 @@ class FunctionTool:
         self.openai_tool_schema = openai_tool_schema or get_openai_tool_schema(
             func
         )
+        self.name = self.get_function_name()
 
     def __call__(self, *args: Any, **kwargs: Any) -> Any:
         # Pass the extracted arguments to the indicated function

+ 39 - 0
pqai_agent/toolkit/image_describer.py

@@ -0,0 +1,39 @@
+from pqai_agent import chat_service
+from pqai_agent.chat_service import VOLCENGINE_MODEL_DOUBAO_1_5_VISION_PRO
+from pqai_agent.logging_service import logger
+from pqai_agent.toolkit.base import BaseToolkit
+from pqai_agent.toolkit.function_tool import FunctionTool
+
+
+class ImageDescriber(BaseToolkit):
+    def __init__(self):
+        self.model = VOLCENGINE_MODEL_DOUBAO_1_5_VISION_PRO
+        self.llm_client = chat_service.OpenAICompatible.create_client(self.model)
+        super().__init__()
+
+    def analyse_image(self, image_url: str):
+        """Takes an image URL as input and returns a detailed description of the image.
+
+        Args:
+            image_url (str): The URL of the image to be described.
+        Returns:
+            str: A detailed description of the image.
+        """
+        system_prompt = "你是一位图像分析专家。请提供输入图像的详细描述,包括图像中的文本内容(如果存在)"
+
+        messages = [
+            {'role': 'system', 'content': system_prompt},
+            {'role': 'user', 'content': [
+                {
+                    'type': 'image_url',
+                    'image_url': image_url
+                }
+            ]}
+        ]
+        response = self.llm_client.chat.completions.create(messages=messages, model=self.model)
+        response_content = response.choices[0].message.content
+        logger.debug(f"ImageDescriber response: {response_content}")
+        return response_content
+
+    def get_tools(self):
+        return [FunctionTool(self.analyse_image)]

+ 22 - 0
pqai_agent/toolkit/message_notifier.py

@@ -0,0 +1,22 @@
+from pqai_agent.logging_service import logger
+from pqai_agent.toolkit.base import BaseToolkit
+from pqai_agent.toolkit.function_tool import FunctionTool
+
+
+class MessageNotifier(BaseToolkit):
+    def __init__(self):
+        super().__init__()
+
+    def message_notify_user(self, message: str) -> str:
+        """Sends a message to the user.
+        Args:
+            message (str): The message to send.
+        Returns:
+            str: A confirmation message.
+        """
+
+        logger.info(f"Message to user: {message}")
+        return 'Message sent successfully.'
+
+    def get_tools(self):
+        return [FunctionTool(self.message_notify_user)]