Browse Source

Merge branch 'feature/2025-05-22-luojunhui-profile-manager' into feature/luojunhui-0521-dev

# Conflicts:
#	pqai_agent_server/api_server.py
#	pqai_agent_server/models/mysql_session_manager.py
luojunhui 1 month ago
parent
commit
4ad9073ddf

+ 7 - 9
pqai_agent/agents/message_push_agent.py

@@ -3,8 +3,8 @@ from typing import Optional, List, Dict
 
 from pqai_agent.agents.simple_chat_agent import SimpleOpenAICompatibleChatAgent
 from pqai_agent.chat_service import VOLCENGINE_MODEL_DEEPSEEK_V3
-from pqai_agent.dialogue_manager import DialogueManager
 from pqai_agent.logging_service import logger
+from pqai_agent.message import MessageType
 from pqai_agent.toolkit.function_tool import FunctionTool
 from pqai_agent.toolkit.image_describer import ImageDescriber
 from pqai_agent.toolkit.message_notifier import MessageNotifier
@@ -109,7 +109,7 @@ QUERY_PROMPT_TEMPLATE = """现在,请通过多步思考,以客服的角色
 # 当前上下文信息
 时间:{current_datetime}
 
-注意对话信息的格式为: [角色][时间]对话内容
+注意对话的格式为: [角色][时间][消息类型]消息内容
 注意分析客服和用户当前的社交阶段,先确立本次问候的目的。
 注意一定要分析对话信息中的时间,避免和当前时间段不符的内容!注意一定要结合历史的对话情况进行分析和问候方式的选择!
 如有必要,可以使用analyse_image分析用户头像。
@@ -153,7 +153,8 @@ class MessagePushAgent(SimpleOpenAICompatibleChatAgent):
             if msg['role'] not in role_map:
                 continue
             format_dt = datetime.datetime.fromtimestamp(msg['timestamp'] / 1000).strftime('%Y-%m-%d %H:%M:%S')
-            messages.append('[{}][{}]{}'.format(role_map[msg['role']], format_dt, msg['content']))
+            msg_type = msg.get('type', MessageType.TEXT).description
+            messages.append('[{}][{}][{}]{}'.format(role_map[msg['role']], format_dt, msg_type, msg['content']))
         return '\n'.join(messages)
 
 class DummyMessagePushAgent(MessagePushAgent):
@@ -197,7 +198,7 @@ if __name__ == '__main__':
     }
     from pqai_agent.utils import prompt_utils
     test_context = {
-        "current_datetime": "2025-05-11 08:00:00",
+        "current_datetime": "2025-05-12 08:00:00",
         "formatted_staff_profile": prompt_utils.format_agent_profile(test_staff_profile),
         **test_user_profile,
     }
@@ -205,10 +206,7 @@ if __name__ == '__main__':
         return datetime.datetime(year, month, day, hour, minute).timestamp() * 1000
     messages = [
         {"role": "assistant", "content": "月哥,早上好!看到您的头像是一片宁静的户外风景,感觉您一定很喜欢大自然吧?今天天气不错,您有什么计划吗?", "timestamp": create_ts(2025, 5, 10, 8, 0)},
-        {"role": "user", "content": "我又不认识你,不要给我发了", "timestamp": create_ts(2025, 5, 10, 8, 30)},
+        # {"role": "user", "content": "我又不认识你,不要给我发了", "timestamp": create_ts(2025, 5, 10, 8, 30)},
     ]
     response = agent.generate_message(test_context, messages)
-    print(response)
-
-
-
+    print(response)

+ 189 - 0
pqai_agent/agents/message_reply_agent.py

@@ -0,0 +1,189 @@
+import datetime
+from typing import Optional, List, Dict
+
+from pqai_agent.agents.simple_chat_agent import SimpleOpenAICompatibleChatAgent
+from pqai_agent.chat_service import VOLCENGINE_MODEL_DEEPSEEK_V3
+from pqai_agent.logging_service import logger
+from pqai_agent.message import MessageType
+from pqai_agent.toolkit.function_tool import FunctionTool
+from pqai_agent.toolkit.image_describer import ImageDescriber
+from pqai_agent.toolkit.message_notifier import MessageNotifier
+
+DEFAULT_SYSTEM_PROMPT = '''
+<基本设定>
+你是一位熟悉中老年用户交流习惯的微信客服。
+你擅长以下事项:
+* 理解中老年人的典型情感需求、对话习惯
+* 倾听、引导和共情,在对话中自然促进用户互动
+
+你的工作方法论:
+* 分析用户请求以确定核心需求
+* 为完成任务制定结构化的计划
+</基本设定>
+
+<语言设定>
+* 默认的工作语言:中文
+* 如果用户指定使用其它语言,则将其作为工作语言
+* 所有的思考和回答都要用工作语言
+</语言设定>
+
+<社交阶段划分>
+* 破冰试探期
+* 角色探索期
+* 情感联结期
+</社交阶段划分>
+
+<心理学技巧>
+* 怀旧效应:可以用"当年/以前"触发美好回忆
+* 具象化提问:避免抽象问题
+* 正向反馈圈:在后续对话中重复用户的关键词
+</心理学技巧>
+
+<风险规避原则>
+* 避免过度打扰和重复:注意分析历史对话
+* 避免过度解读:不要过度解读用户的信息
+* 文化适配:注意不同地域的用户文化差异
+* 准确性要求:不要使用虚构的信息
+</风险规避原则>
+
+<agent_loop>
+You are operating in an agent loop, iteratively completing tasks through these steps:
+1. Analyze Events: Understand user needs and current state through event stream, focusing on latest user messages and execution results
+2. Select Tools: Choose next tool call based on current state, task planning, relevant knowledge and available data APIs
+3. Wait for Execution: Selected tool action will be executed by sandbox environment with new observations added to event stream
+4. Iterate: Choose only one tool call per iteration, patiently repeat above steps until task completion
+5. Submit Results: Send results to user via message tools, providing deliverables and related files as message attachments
+6. Enter Standby: Enter idle state when all tasks are completed or user explicitly requests to stop, and wait for new tasks
+</agent_loop>
+'''
+
+QUERY_PROMPT_TEMPLATE = """现在,请以客服的角色分析以下会话并生成给用户的回复。
+# 客服的基本信息
+{formatted_staff_profile}
+# 用户的信息
+- 微信昵称:{nickname}
+- 姓名:{name}
+- 头像:{avatar}
+- 偏好的称呼:{preferred_nickname}
+- 年龄:{age}
+- 地区:{region}
+- 健康状况:{health_conditions}
+- 用药信息:{medications}
+- 兴趣爱好:{interests}
+# 已知过去的对话
+{dialogue_history}
+
+# 当前上下文信息
+时间:{current_datetime}
+
+注意对话信息的格式为: [角色][时间][消息类型]对话内容
+注意分析客服和用户当前的社交阶段,先确立对话的目的。
+注意一定要分析对话信息中的时间,避免和当前时间段不符的内容!注意一定要结合历史的对话情况进行分析和问候方式的选择!
+使用message_notify_user发送最终的回复内容,调用时不要传入除了回复内容外的其它任何信息。
+请注意这是微信聊天,如果用户使用了表情包,请使用analyse_image描述表情包,并分析其含义和情绪,如果要回复请尽量用简短的emoji或文字进行回复。
+如果用户连续2次以上感到疑惑,请先发送<人工介入>,后接你认为需要人工介入的原因。如果判断对话可自然结束、无需再回复用户,请发送<结束>。如果用户表现出强烈的负向情绪、要求不再对话,请发送<负向情绪结束>。
+以上特殊消息的发送也请使用message_notify_user。
+Now, start to process your task. Please think step by step.
+ """
+
+class MessageReplyAgent(SimpleOpenAICompatibleChatAgent):
+    """A specialized agent for message reply tasks."""
+
+    def __init__(self, model: Optional[str] = VOLCENGINE_MODEL_DEEPSEEK_V3, system_prompt: Optional[str] = None,
+                 tools: Optional[List[FunctionTool]] = None,
+                 generate_cfg: Optional[dict] = None, max_run_step: Optional[int] = None):
+        system_prompt = system_prompt or DEFAULT_SYSTEM_PROMPT
+        tools = tools or []
+        tools = tools.copy()
+        tools.extend([
+            *ImageDescriber().get_tools(),
+            *MessageNotifier().get_tools()
+        ])
+        super().__init__(model, system_prompt, tools, generate_cfg, max_run_step)
+
+    def generate_message(self, context: Dict, dialogue_history: List[Dict]) -> str:
+        formatted_dialogue = MessageReplyAgent.compose_dialogue(dialogue_history)
+        query = QUERY_PROMPT_TEMPLATE.format(**context, dialogue_history=formatted_dialogue)
+        self.run(query)
+        for tool_call in reversed(self.tool_call_records):
+            if tool_call['name'] == MessageNotifier.message_notify_user.__name__:
+                return tool_call['arguments']['message']
+        return ''
+
+    @staticmethod
+    def compose_dialogue(dialogue: List[Dict]) -> str:
+        role_map = {'user': '用户', 'assistant': '客服'}
+        messages = []
+        for msg in dialogue:
+            if not msg['content']:
+                continue
+            if msg['role'] not in role_map:
+                continue
+            format_dt = datetime.datetime.fromtimestamp(msg['timestamp'] / 1000).strftime('%Y-%m-%d %H:%M:%S')
+            msg_type = msg.get('type', MessageType.TEXT).description
+            messages.append('[{}][{}][{}]{}'.format(role_map[msg['role']], format_dt, msg_type, msg['content']))
+        return '\n'.join(messages)
+
+class DummyMessageReplyAgent(MessageReplyAgent):
+    """A dummy agent for testing purposes."""
+
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+
+    def generate_message(self, context: Dict, dialogue_history: List[Dict]) -> str:
+        logger.debug(f"DummyMessageReplyAgent.generate_message called, context: {context}")
+        return "测试消息: {agent_name} -> {nickname}".format(**context)
+
+
+if __name__ == '__main__':
+    import pqai_agent.logging_service
+    pqai_agent.logging_service.setup_root_logger()
+    from pqai_agent.chat_service import VOLCENGINE_MODEL_DEEPSEEK_V3
+    from pqai_agent.toolkit.pq_video_searcher import PQVideoSearcher
+    agent = MessageReplyAgent(model=VOLCENGINE_MODEL_DEEPSEEK_V3,
+                              tools=[*PQVideoSearcher().get_tools()])
+    query = """
+现在,请以客服的角色分析以下会话并生成给用户的回复。
+# 客服的基本信息
+- 名字:周洁
+- 性别:女
+- 年龄:35
+- 所在地:长沙
+- 之前所在地:老家长沙,北京工作三年
+- 学历:本科。师范学校毕业
+- 职业:长沙某中学的老师
+- 工作经历:大学毕业后三年在北京一家“中老年”教育公司任班主任;北漂三年后,回到家乡任职普通中学的语文老师
+- 家庭成员:父亲;母亲;丈夫;女儿
+- 家庭成员职业:父亲:高中教师(已退休);母亲:经营一家商店;丈夫:知名律所专业律师
+# 用户的信息
+- 微信昵称:薛岱月
+- 姓名:薛岱月
+- 头像:http://wx.qlogo.cn/mmhead/Q3auHgzwzM5glpnBtDUianJErYf9AQsptLM3N78xP3sOR8SSibsG35HQ/0
+- 偏好的称呼:月哥
+- 年龄:65
+- 地区:北京
+- 健康状况:
+- 用药信息:[]
+- 兴趣爱好:[]
+# 已知过去的对话
+[用户][2025-05-20 21:58:24][文本]我喜欢跑步
+[客服][2025-05-20 21:58:52][文本]大哥,跑步是特别棒的爱好呢!既锻炼了身体,又能让人心情愉悦~最近有去哪里跑步呀?
+[用户][2025-05-20 21:59:05][文本]我要睡觉了
+[用户][2025-05-20 21:59:13][表情包]http://dl.weshineapp.com/gif/20250503/a7a5afcefa8bc7d293c2d0bfde0007be.gif?id=a7a5afcefa8bc7d293c2d0bfde0007be
+[客服][2025-05-20 21:59:15][文本]晚安啦
+[用户][2025-05-20 21:59:52][文本]晚安
+# 当前上下文信息
+时间:2025-05-20 22:00:00
+
+注意对话信息的格式为: [角色][时间][消息类型]对话内容
+注意分析客服和用户当前的社交阶段,先确立对话的目的。
+注意一定要分析对话信息中的时间,避免和当前时间段不符的内容!注意一定要结合历史的对话情况进行分析和问候方式的选择!
+使用output_multimodal_message发送最终的回复消息,如果有多条消息需要发送,可以多次调用output_multimodal_message,请务必保证所有回复内容都通过output_multimodal_message发出。
+请注意这是微信聊天,如果用户使用了表情包,请使用analyse_image描述表情包,并分析其含义和情绪,如果要回复请尽量用简短的emoji或文字进行回复。
+特殊情况处理:
+如果用户连续2次以上感到疑惑,请先发送<人工介入>,后接你认为需要人工介入的原因。如果判断对话可自然结束、无需再回复用户,请发送<结束>。如果用户表现出强烈的负向情绪、要求不再对话,请发送<负向情绪结束>。
+以上特殊情况的消息发送请使用message_notify_user。
+Now, start to process your task. Please think step by step.
+"""
+    response = agent.run(query)
+    print(response)

+ 3 - 12
pqai_agent/prompt_templates.py

@@ -6,10 +6,7 @@ GENERAL_GREETING_PROMPT = """
 你是一位熟悉中老年用户交流习惯的客服。你当前正准备向用户发起问候。用户是一位中老年人,请以温暖、尊重的口吻进行简短的问候,在合适的时机引导获取基本信息。
 请根据以下信息(用户信息可能为空)生成适当的问候:
 你的信息:
-- 名字:{agent_name}
-- 性别:{agent_gender}
-- 年龄:{agent_age}
-- 地区:{agent_region}
+{formatted_staff_profile}
 用户信息:
 - 姓名:{name}
 - 偏好的称呼:{preferred_nickname}
@@ -59,10 +56,7 @@ CHITCHAT_PROMPT = """
 你是一位熟悉中老年用户交流习惯的客服。用户是一位中老年人,请以温暖、尊重的口吻进行回复,在合适的时机引导获取基本信息。
 请根据以下信息(用户信息可能为空)生成适当的回复:
 你的信息:
-- 名字:{agent_name}
-- 性别:{agent_gender}
-- 年龄:{agent_age}
-- 地区:{agent_region}
+{formatted_staff_profile}
 用户信息:
 - 姓名:{name}
 - 偏好的称呼:{preferred_nickname}
@@ -153,10 +147,7 @@ CHITCHAT_PROMPT_COZE = """
 
 # 输入信息
 你的信息:
-- 名字:{{agent_name}}
-- 性别:{{agent_gender}}
-- 年龄:{{agent_age}}
-- 地区:{{agent_region}}
+{{formatted_staff_profile}}
 用户信息:
 - 姓名:{{name}}
 - 偏好的称呼:{{preferred_nickname}}

+ 8 - 1
pqai_agent/toolkit/image_describer.py

@@ -1,4 +1,5 @@
 import diskcache
+import threading
 
 from pqai_agent import chat_service
 from pqai_agent.chat_service import VOLCENGINE_MODEL_DOUBAO_1_5_VISION_PRO
@@ -6,6 +7,9 @@ from pqai_agent.logging_service import logger
 from pqai_agent.toolkit.base import BaseToolkit
 from pqai_agent.toolkit.function_tool import FunctionTool
 
+# 不同实例间复用cache,但不是很好的实践
+_image_describer_caches = {}
+_cache_mutex = threading.Lock()
 
 class ImageDescriber(BaseToolkit):
     def __init__(self, cache_dir: str = None):
@@ -13,7 +17,10 @@ class ImageDescriber(BaseToolkit):
         self.llm_client = chat_service.OpenAICompatible.create_client(self.model)
         if not cache_dir:
             cache_dir = 'image_descriptions_cache'
-        self.cache = diskcache.Cache(cache_dir, size_limit=100*1024*1024)
+        if cache_dir not in _image_describer_caches:
+            with _cache_mutex:
+                _image_describer_caches[cache_dir] = diskcache.Cache(cache_dir, size_limit=100*1024*1024)
+        self.cache = _image_describer_caches[cache_dir]
         super().__init__()
 
     def analyse_image(self, image_url: str):

+ 26 - 2
pqai_agent/toolkit/message_notifier.py

@@ -1,3 +1,5 @@
+from typing import List, Dict
+
 from pqai_agent.logging_service import logger
 from pqai_agent.toolkit.base import BaseToolkit
 from pqai_agent.toolkit.function_tool import FunctionTool
@@ -9,6 +11,7 @@ class MessageNotifier(BaseToolkit):
 
     def message_notify_user(self, message: str) -> str:
         """Sends a message to the user.
+
         Args:
             message (str): The message to send.
         Returns:
@@ -16,7 +19,28 @@ class MessageNotifier(BaseToolkit):
         """
 
         logger.info(f"Message to user: {message}")
-        return 'Message sent successfully.'
+        return 'success'
+
+    def output_multimodal_message(self, message: Dict[str, str]) -> str:
+        """Outputs a multimodal message to the user.
+        Message schema:
+        {
+            "type": "text|image|gif|video|mini_program",
+            "content": "message content",
+            "title": "only needed if type is video or mini_program",
+            "cover_image": "only needed if type is mini_program",
+        }
+        if message type is image, gif, video or mini_program, the content should be a URL.
+
+        Args:
+            message (Dict[str, str]): The message to output.
+        Returns:
+            str: A confirmation message.
+        """
+        logger.info(f"Multimodal message to user: {message}")
+        return 'success'
+
 
     def get_tools(self):
-        return [FunctionTool(self.message_notify_user)]
+        return [FunctionTool(self.message_notify_user),
+                FunctionTool(self.output_multimodal_message)]

+ 50 - 0
pqai_agent/toolkit/pq_video_searcher.py

@@ -0,0 +1,50 @@
+from typing import Dict, List
+import requests
+
+from pqai_agent.toolkit.base import BaseToolkit
+from pqai_agent.toolkit.function_tool import FunctionTool
+
+class PQVideoSearcher(BaseToolkit):
+    API_URL = "https://vlogapi.piaoquantv.com/longvideoapi/search/userandvideo/list"
+    def search_pq_video(self, keywords: List[str]) -> List[Dict]:
+        """
+        Search for videos on the PQ Video (票圈视频) platform using keywords.
+        使用关键词在票圈视频平台上搜索视频。使用时请按要求提供关键词列表。
+        Args:
+            keywords (List[str]): 关键词列表,至多5个词,且必须按相关性从高到低排列,每个词至多4个字。
+        Returns:
+            List[Dict]: List of video metadata dictionaries.
+        """
+        if not keywords:
+            return []
+        keyword = keywords[0]
+        data = {
+            'keyWord': keyword,
+            'pageSize': "10",
+            'pageNo': "1"
+        }
+
+        try:
+            response = requests.post(self.API_URL, data=data)
+            response.raise_for_status()  # Raise an error for bad responses
+            result = response.json()
+            data = result.get("data", {})
+            videos = data.get("videos", {}).get("videos", [])
+            ret = []
+            for item in videos:
+                if item.get("auditStatus") == 5 and item.get("transcodeStatus") == 3 and item.get("title"):
+                    ret.append({
+                        "id": item["id"],
+                        "coverImgPath": item["coverImg"]["coverImgPath"],
+                        "title": item["title"],
+                        "totalTime": item["totalTime"],
+                        "videoPath": item["videoPath"],
+                        "playCountFormatStr": item["playCountFormatStr"]
+                    })
+            ret = ret[:4]
+            return ret
+        except Exception as e:
+            return []
+
+    def get_tools(self) -> List[FunctionTool]:
+        return [FunctionTool(self.search_pq_video)]

+ 34 - 0
pqai_agent/utils/prompt_utils.py

@@ -21,3 +21,37 @@ def format_agent_profile(profile: Dict) -> str:
         cur_string = f"- {field[1]}:{profile[field[0]]}"
         strings_to_join.append(cur_string)
     return "\n".join(strings_to_join)
+
+def format_user_profile(profile: Dict) -> str:
+    """
+    :param profile:
+    :return: formatted string.
+    example:
+    - 微信昵称:{nickname}
+    - 姓名:{name}
+    - 头像:{avatar}
+    - 偏好的称呼:{preferred_nickname}
+    - 年龄:{age}
+    - 地区:{region}
+    - 健康状况:{health_conditions}
+    - 用药信息:{medications}
+    - 兴趣爱好:{interests}
+    """
+    fields = [
+        ('nickname', '微信昵称'),
+        ('name', '姓名'),
+        ('avatar', '头像'),
+        ('preferred_nickname', '偏好的称呼'),
+        ('age', '年龄'),
+        ('region', '地区'),
+        ('health_conditions', '健康状况'),
+        ('medications', '用药信息'),
+        ('interests', '兴趣爱好')
+    ]
+    strings_to_join = []
+    for field in fields:
+        if not profile.get(field[0], None):
+            continue
+        cur_string = f"- {field[1]}:{profile[field[0]]}"
+        strings_to_join.append(cur_string)
+    return "\n".join(strings_to_join)

+ 102 - 25
pqai_agent_server/api_server.py

@@ -1,6 +1,7 @@
 #! /usr/bin/env python
 # -*- coding: utf-8 -*-
 # vim:fenc=utf-8
+import json
 import time
 import logging
 import werkzeug.exceptions
@@ -10,10 +11,13 @@ from argparse import ArgumentParser
 from pqai_agent import configs
 
 from pqai_agent import logging_service, chat_service, prompt_templates
+from pqai_agent.agents.message_reply_agent import MessageReplyAgent
+from pqai_agent.configs import apollo_config
 from pqai_agent.history_dialogue_service import HistoryDialogueService
 from pqai_agent.user_manager import MySQLUserManager, MySQLUserRelationManager
+from pqai_agent.utils.prompt_utils import format_agent_profile, format_user_profile
 from pqai_agent_server.const import AgentApiConst
-from pqai_agent_server.models import MySQLSessionManager
+from pqai_agent_server.models import MySQLSessionManager, MySQLStaffManager
 from pqai_agent_server.utils import wrap_response, quit_human_intervention_status
 from pqai_agent_server.utils import (
     run_extractor_prompt,
@@ -34,11 +38,55 @@ def list_staffs():
 @app.route('/api/getStaffProfile', methods=['GET'])
 def get_staff_profile():
     staff_id = request.args['staff_id']
-    profile = app.user_manager.get_staff_profile(staff_id)
-    if not profile:
+    if not staff_id:
+        return wrap_response(400, msg='staff_id is required')
+    agent_profile = app.staff_manager.get_staff_profile(staff_id)
+    if not agent_profile:
         return wrap_response(404, msg='staff not found')
     else:
-        return wrap_response(200, data=profile)
+        field_map_list = apollo_config.get_json_value("field_map_list", [])
+        field_map = {
+            item["field_name"]: item["display_name"] for item in field_map_list
+        }
+        profile_info = [
+            {
+                "field_name": key,
+                "display_name": field_map[key],
+                "field_value": value,
+            }
+            for key, value in agent_profile.items()
+            if agent_profile.get(key)
+        ]
+        return wrap_response(200, data=profile_info)
+
+
+@app.route('/api/saveStaffProfile', methods=['POST'])
+def save_staff_profile():
+    staff_id = request.json.get('staff_id')
+    staff_profile = request.json.get('staff_profile')
+    if not staff_id:
+        return wrap_response(400, msg='staff id is required')
+
+    if not staff_profile:
+        return wrap_response(400, msg='profile is required')
+    else:
+        try:
+            profile_info_list = json.loads(staff_profile)
+            profile_dict = {item['field_name']: item['field_value'] for item in profile_info_list}
+            affected_rows = app.staff_manager.save_staff_profile(staff_id, profile_dict)
+
+            if not affected_rows:
+                return wrap_response(500, msg='save staff profile failed')
+            else:
+                return wrap_response(200, msg='save staff profile success')
+
+        except json.decoder.JSONDecodeError:
+            return wrap_response(400, msg='profile is not a valid json')
+
+
+@app.route('/api/getProfileFields', methods=['GET'])
+def get_profile_fields():
+    return wrap_response(200, data=apollo_config.get_json_value("field_map_list", []))
 
 
 @app.route('/api/getUserProfile', methods=['GET'])
@@ -171,6 +219,32 @@ def run_prompt():
         logger.error(e)
         return wrap_response(500, msg='Error: {}'.format(e))
 
+@app.route('/api/formatForPrompt', methods=['POST'])
+def format_data_for_prompt():
+    try:
+        req_data = request.json
+        content = req_data['content']
+        format_type = req_data['format_type']
+        if format_type == 'staff_profile':
+            if not isinstance(content, dict):
+                return wrap_response(400, msg='staff_profile should be a dict')
+            response = format_agent_profile(content)
+        elif format_type == 'user_profile':
+            if not isinstance(content, dict):
+                return wrap_response(400, msg='user_profile should be a dict')
+            response = format_user_profile(content)
+        elif format_type == 'dialogue':
+            if not isinstance(content, list):
+                return wrap_response(400, msg='dialogue should be a list')
+            from pqai_agent_server.utils.prompt_util import format_dialogue_history
+            response = format_dialogue_history(content)
+        else:
+            return wrap_response(400, msg='Invalid format_type')
+        return wrap_response(200, data=response)
+    except Exception as e:
+        logger.error(e)
+        return wrap_response(500, msg='Error: {}'.format(e))
+
 
 @app.route("/api/healthCheck", methods=["GET"])
 def health_check():
@@ -208,17 +282,15 @@ def get_staff_session_list():
     if not staff_id:
         return wrap_response(404, msg="staff_id is required")
 
+    page_size = request.args.get("page_size", const.DEFAULT_PAGE_SIZE)
+    page_id = request.args.get("page_id", const.DEFAULT_PAGE_ID)
+
     # check params
-    page_size = request.args.get("page_size")
-    if page_size:
-        page_size = int(page_size)
-    else:
-        page_size = const.DEFAULT_PAGE_SIZE
-    page_id = request.args.get("page_id")
-    if page_id:
+    try:
         page_id = int(page_id)
-    else:
-        page_id = const.DEFAULT_PAGE_ID
+        page_size = int(page_size)
+    except Exception as e:
+        return wrap_response(404, msg="Invalid parameter: {}".format(e))
 
     staff_session_list = app.session_manager.get_staff_session_list(staff_id, page_id, page_size)
     if not staff_session_list:
@@ -229,17 +301,15 @@ def get_staff_session_list():
 
 @app.route("/api/getStaffList", methods=["GET"])
 def get_staff_list():
+    page_size = request.args.get("page_size", const.DEFAULT_PAGE_SIZE)
+    page_id = request.args.get("page_id", const.DEFAULT_PAGE_ID)
+
     # check params
-    page_size = request.args.get("page_size")
-    page_id = request.args.get("page_id")
-    if page_id:
+    try:
         page_id = int(page_id)
-    else:
-        page_id = const.DEFAULT_PAGE_ID
-    if page_size:
         page_size = int(page_size)
-    else:
-        page_size = const.DEFAULT_PAGE_SIZE
+    except Exception as e:
+        return wrap_response(404, msg="Invalid parameter: {}".format(e))
 
     staff_list = app.user_manager.get_staff_list(page_id, page_size)
     if not staff_list:
@@ -258,8 +328,8 @@ def get_conversation_list():
     if not staff_id or not user_id:
         return wrap_response(404, msg="staff_id and user_id are required")
 
-    page_id = request.args.get("page_id")
-    response = app.session_manager.get_conversation_list(staff_id, user_id, page_id, const.DEFAULT_CONVERSATION_SIZE)
+    page = request.args.get("page_id")
+    response = app.session_manager.get_conversation_list(staff_id, user_id, page, const.DEFAULT_CONVERSATION_SIZE)
     return wrap_response(200, data=response)
 
 
@@ -279,8 +349,6 @@ def quit_human_interventions_status():
     user_id = req_data["user_id"]
     if not user_id or not staff_id:
         return wrap_response(404, msg="user_id and staff_id are required")
-    # dev
-    staff_id = 1688854492669990
     response = quit_human_intervention_status(user_id, staff_id)
 
     return wrap_response(200, data=response)
@@ -324,6 +392,15 @@ if __name__ == '__main__':
     )
     app.session_manager = session_manager
 
+    # init staff manager
+    staff_manager = MySQLStaffManager(
+        db_config=user_db_config['mysql'],
+        staff_table=staff_db_config['table'],
+        user_table=user_db_config['table']
+    )
+    app.staff_manager = staff_manager
+
+    # init wecom manager
     wecom_db_config = config['storage']['user_relation']
     user_relation_manager = MySQLUserRelationManager(
         user_db_config['mysql'], wecom_db_config['mysql'],

+ 2 - 1
pqai_agent_server/models/__init__.py

@@ -1 +1,2 @@
-from .mysql_session_manager import MySQLSessionManager
+from .mysql_session_manager import MySQLSessionManager
+from .mysql_staff_manager import MySQLStaffManager

+ 4 - 4
pqai_agent_server/models/mysql_session_manager.py

@@ -183,14 +183,14 @@ class MySQLSessionManager(SessionManager):
             user_id = session["user_id"]
             room_id = ":".join(["private", staff_id, user_id])
             select_query = f"""
-                select content, msg_type, sendtime as max_timestamp 
-                from {self.chat_history_table} where roomid = %s
-                order by sendtime desc limit %s;
+                select content, max(sendtime) as max_timestamp, msg_type
+                from {self.chat_history_table} 
+                where roomid = %s;
             """
             last_message = self.db.select(
                 sql=select_query,
                 cursor_type=pymysql.cursors.DictCursor,
-                args=(room_id, 1),
+                args=(room_id,),
             )
             if not last_message:
                 temp_obj["message"] = None

+ 85 - 0
pqai_agent_server/models/mysql_staff_manager.py

@@ -0,0 +1,85 @@
+import abc
+import json
+import pymysql.cursors
+
+from typing import Dict, List
+from pqai_agent.database import MySQLManager
+
+
+class StaffManager(abc.ABC):
+
+    @abc.abstractmethod
+    def list_all_staffs(self, page_id: int, page_size: int) -> List[Dict]:
+        pass
+
+    @abc.abstractmethod
+    def get_staff_profile(self, staff_id) -> Dict:
+        pass
+
+    @abc.abstractmethod
+    def save_staff_profile(self, staff_id: str, staff_profile: Dict):
+        pass
+
+
+class MySQLStaffManager(StaffManager):
+
+    def __init__(self, db_config, staff_table, user_table):
+        self.db = MySQLManager(db_config)
+        self.staff_table = staff_table
+        self.user_table = user_table
+
+    def save_staff_profile(self, staff_id: str, staff_profile: Dict):
+        update_query = f"""
+            update {self.staff_table} set agent_profile = %s where third_party_user_id = %s;
+        """
+        affected_rows = self.db.execute(
+            update_query, (json.dumps(staff_profile), staff_id)
+        )
+        return affected_rows
+
+    def get_staff_profile(self, staff_id) -> Dict:
+        empty_profile = {}
+        sql = f"""select agent_profile from {self.staff_table} where third_party_user_id = %s;"""
+        response = self.db.select(
+            sql=sql,
+            cursor_type=pymysql.cursors.DictCursor,
+            args=(staff_id,),
+        )
+        if not response:
+            return empty_profile
+
+        agent_profile = response[0]["agent_profile"]
+        profile = json.loads(agent_profile)
+        if not profile:
+            return empty_profile
+        return profile
+
+    def list_all_staffs(self, page_id: int, page_size: int) -> Dict:
+        """
+        :param page_size:
+        :param page_id:
+        :return:
+        """
+        sql = f"""
+            select t1.third_party_user_id as staff_id, t1.name as staff_name, t2.iconurl as avatar
+            from {self.staff_table} t1 left join {self.user_table} t2
+            on t1.third_party_user_id = t2.third_party_user_id
+            limit %s offset %s;
+        """
+        staff_list = self.db.select(
+            sql=sql,
+            cursor_type=pymysql.cursors.DictCursor,
+            args=(page_size + 1, page_size * (page_id - 1)),
+        )
+        if len(staff_list) > page_size:
+            has_next_page = True
+            next_page_id = page_id + 1
+            staff_list = staff_list[:page_size]
+        else:
+            has_next_page = False
+            next_page_id = None
+        return {
+            "has_next_page": has_next_page,
+            "next_page": next_page_id,
+            "data": staff_list,
+        }

+ 22 - 2
pqai_agent_server/utils/prompt_util.py

@@ -1,6 +1,8 @@
 import json
 
 from datetime import datetime
+from typing import List, Dict
+
 from openai import OpenAI
 
 from pqai_agent import logging_service, chat_service
@@ -8,6 +10,7 @@ from pqai_agent.response_type_detector import ResponseTypeDetector
 from pqai_agent.user_profile_extractor import UserProfileExtractor
 from pqai_agent.dialogue_manager import DialogueManager
 from pqai_agent.message import MessageType
+from pqai_agent.utils.prompt_utils import format_agent_profile
 
 logger = logging_service.logger
 
@@ -83,7 +86,7 @@ def run_extractor_prompt(req_data):
     dialogue_history = req_data["dialogue_history"]
     model_name = req_data["model_name"]
     prompt_context = {
-        **staff_profile,
+        "formatted_staff_profile": format_agent_profile(staff_profile),
         **user_profile,
         "dialogue_history": UserProfileExtractor.compose_dialogue(dialogue_history),
     }
@@ -114,7 +117,10 @@ def run_chat_prompt(req_data):
     dialogue_history = req_data.get("dialogue_history", [])
     model_name = req_data["model_name"]
     current_timestamp = req_data["current_timestamp"] / 1000
-    prompt_context = {**staff_profile, **user_profile}
+    prompt_context = {
+        'formatted_staff_profile': format_agent_profile(staff_profile),
+        **user_profile
+    }
     current_hour = datetime.fromtimestamp(current_timestamp).hour
     prompt_context["last_interaction_interval"] = 0
     prompt_context["current_time_period"] = DialogueManager.get_time_context(
@@ -170,3 +176,17 @@ def run_response_type_prompt(req_data):
         {"role": "user", "content": prompt},
     ]
     return run_openai_chat(messages, model_name, temperature=0.2, max_tokens=128)
+
+
+def format_dialogue_history(dialogue: List[Dict]) -> str:
+    role_map = {'user': '用户', 'assistant': '客服'}
+    messages = []
+    for msg in dialogue:
+        if not msg['content']:
+            continue
+        if msg['role'] not in role_map:
+            continue
+        format_dt = datetime.fromtimestamp(msg['timestamp'] / 1000).strftime('%Y-%m-%d %H:%M:%S')
+        msg_type = MessageType(msg.get('type', MessageType.TEXT.value)).description
+        messages.append('[{}][{}][{}]{}'.format(role_map[msg['role']], format_dt, msg_type, msg['content']))
+    return '\n'.join(messages)

+ 4 - 1
requirements.txt

@@ -57,4 +57,7 @@ jsonschema~=4.23.0
 pqai_agent~=0.1.0
 numpy~=2.2.5
 pillow~=11.2.1
-json5~=0.12.0
+json5~=0.12.0
+beautifulsoup4~=4.13.4
+diskcache~=5.6.3
+SQLAlchemy~=2.0.40