Quellcode durchsuchen

Initial version

StrayWarrior vor 1 Monat
Ursprung
Commit
6751f190a5
10 geänderte Dateien mit 1248 neuen und 0 gelöschten Zeilen
  1. 212 0
      agent_service.py
  2. 388 0
      dialogue_manager.py
  3. 6 0
      global_flags.py
  4. 25 0
      logging_service.py
  5. 17 0
      message.py
  6. 27 0
      message_queue_backend.py
  7. 173 0
      prompt_templates.py
  8. 164 0
      unit_test.py
  9. 68 0
      user_manager.py
  10. 168 0
      user_profile_extractor.py

+ 212 - 0
agent_service.py

@@ -0,0 +1,212 @@
+#! /usr/bin/env python
+# -*- coding: utf-8 -*-
+# vim:fenc=utf-8
+
+import time
+from typing import Dict, List, Tuple, Any
+import logging
+from datetime import datetime, timedelta
+
+import apscheduler.triggers.cron
+from apscheduler.schedulers.background import BackgroundScheduler
+
+import global_flags
+from dialogue_manager import DialogueManager, DialogueState
+from user_manager import UserManager, LocalUserManager
+from openai import OpenAI
+from message_queue_backend import MessageQueueBackend, MemoryQueueBackend
+from user_profile_extractor import UserProfileExtractor
+import threading
+from message import MessageType
+from logging_service import ColoredFormatter
+
+
+class AgentService:
+    def __init__(
+        self,
+        receive_backend: MessageQueueBackend,
+        send_backend: MessageQueueBackend,
+        human_backend: MessageQueueBackend,
+        user_manager: UserManager
+    ):
+        self.receive_queue = receive_backend
+        self.send_queue = send_backend
+        self.human_queue = human_backend
+
+        # 核心服务模块
+        self.user_manager = user_manager
+        self.user_profile_extractor = UserProfileExtractor()
+        self.agent_registry: Dict[str, DialogueManager] = {}
+
+        self.llm_client = OpenAI(
+            api_key='5e275c38-44fd-415f-abcf-4b59f6377f72',
+            base_url="https://ark.cn-beijing.volces.com/api/v3"
+        )
+        self.model_name = "ep-20250213194558-rrmr2" # DeepSeek on Volces
+
+        # 定时任务调度器
+        self.scheduler = BackgroundScheduler()
+        self.scheduler.start()
+        self._setup_initiative_conversations()
+
+    def _setup_initiative_conversations(self):
+        self.scheduler.add_job(
+            self._check_initiative_conversations,
+            apscheduler.triggers.cron.CronTrigger(second='5,35')
+        )
+
+    def _get_agent_instance(self, user_id: str) -> DialogueManager:
+        """获取用户Agent实例"""
+        if user_id not in self.agent_registry:
+            self.agent_registry[user_id] = DialogueManager(
+                user_id, self.user_manager)
+        return self.agent_registry[user_id]
+
+    def process_messages(self):
+        """持续处理接收队列消息"""
+        while True:
+            message = self.receive_queue.consume()
+            if message:
+                self._process_single_message(message)
+            time.sleep(1)  # 避免CPU空转
+
+    def _update_user_profile(self, user_id, user_profile, message: str):
+        profile_to_update = self.user_profile_extractor.extract_profile_info(user_profile, message)
+        if not profile_to_update:
+            logging.debug("user_id: {}, no profile info extracted".format(user_id))
+            return
+        logging.warning("update user profile: {}".format(profile_to_update))
+        merged_profile = self.user_profile_extractor.merge_profile_info(user_profile, profile_to_update)
+        self.user_manager.save_user_profile(user_id, merged_profile)
+        return merged_profile
+
+    def _schedule_aggregation_trigger(self, user_id: str, delay_sec: int):
+        logging.debug("user: {}, schedule trigger message after {} seconds".format(user_id, delay_sec))
+        message = {
+            'user_id': user_id,
+            'type': MessageType.AGGREGATION_TRIGGER,
+            'text': None,
+            'timestamp': int(time.time() * 1000) + delay_sec * 1000
+        }
+        self.scheduler.add_job(lambda: self.receive_queue.produce(message),
+                               'date',
+                               run_date=datetime.now() + timedelta(seconds=delay_sec))
+
+    def _process_single_message(self, message: Dict):
+        user_id = message['user_id']
+        message_text = message.get('text', None)
+
+        # 获取用户信息和Agent实例
+        user_profile = self.user_manager.get_user_profile(user_id)
+        agent = self._get_agent_instance(user_id)
+
+        # 更新对话状态
+        logging.debug("process message: {}".format(message))
+        dialogue_state, message_text = agent.update_state(message)
+        logging.debug("user: {}, next state: {}".format(user_id, dialogue_state))
+
+        # 根据状态路由消息
+        if agent.is_in_human_intervention():
+            self._route_to_human_intervention(user_id, message_text, dialogue_state)
+        elif dialogue_state == DialogueState.MESSAGE_AGGREGATING:
+            if message['type'] != MessageType.AGGREGATION_TRIGGER:
+                # 产生一个触发器,但是不能由触发器递归产生
+                logging.debug("user: {}, waiting next message for aggregation".format(user_id))
+                self._schedule_aggregation_trigger(user_id, agent.message_aggregation_sec)
+            return
+        else:
+            # 先更新用户画像再处理回复
+            self._update_user_profile(user_id, user_profile, message_text)
+            self._process_llm_response(user_id, agent, message_text)
+
+    def _route_to_human_intervention(self, user_id: str, user_message: str, state: DialogueState):
+        """路由到人工干预"""
+        self.human_queue.produce({
+            'user_id': user_id,
+            'state': state,
+            'timestamp': datetime.now().isoformat()
+        })
+
+    def _process_llm_response(self, user_id: str, agent: DialogueManager,
+                              user_message: str):
+        """处理LLM响应"""
+        messages = agent.make_llm_messages(user_message)
+        logging.debug(messages)
+        llm_response = self._call_llm_api(messages)
+
+        if response := agent.generate_response(llm_response):
+            logging.warning("user: {}, response: {}".format(user_id, response))
+            self.send_queue.produce({
+                'user_id': user_id,
+                'text': response,
+            })
+
+    def _check_initiative_conversations(self):
+        """定时检查主动发起对话"""
+        for user_id in self.user_manager.list_all_users():
+            agent = self._get_agent_instance(user_id)
+            should_initiate = agent.should_initiate_conversation()
+
+            if should_initiate:
+                logging.warning("user: {}, initiate conversation".format(user_id))
+                self._process_llm_response(user_id, agent, None)
+            else:
+                logging.debug("user: {}, do not initiate conversation".format(user_id))
+
+    def _call_llm_api(self, messages: List[Dict]) -> str:
+        if global_flags.DISABLE_LLM_API_CALL:
+            return 'LLM模拟回复'
+        chat_completion = self.llm_client.chat.completions.create(
+            messages=messages,
+            model=self.model_name,
+        )
+        response = chat_completion.choices[0].message.content
+        return response
+
+if __name__ == "__main__":
+    logging.getLogger().setLevel(logging.DEBUG)
+    console_handler = logging.StreamHandler()
+    console_handler.setLevel(logging.WARNING)
+    formatter = ColoredFormatter(
+        '%(asctime)s - %(funcName)s[%(lineno)d] - %(levelname)s - %(message)s'
+    )
+    console_handler.setFormatter(formatter)
+    root_logger = logging.getLogger()
+    root_logger.handlers.clear()
+    root_logger.addHandler(console_handler)
+    scheduler_logger = logging.getLogger('apscheduler')
+    scheduler_logger.setLevel(logging.WARNING)
+
+    # 初始化不同队列的后端
+    receive_queue = MemoryQueueBackend()
+    send_queue = MemoryQueueBackend()
+    human_queue = MemoryQueueBackend()
+
+    # 初始化用户管理服务
+    user_manager = LocalUserManager()
+
+    global_flags.DISABLE_LLM_API_CALL = False
+
+    # 创建Agent服务
+    service = AgentService(
+        receive_backend=receive_queue,
+        send_backend=send_queue,
+        human_backend=human_queue,
+        user_manager=user_manager
+    )
+
+    process_thread = threading.Thread(target=service.process_messages)
+    process_thread.start()
+
+    while True:
+        print("Input next message: ")
+        message = sys.stdin.readline().strip()
+        message_dict = {
+            "user_id": "user_id_1",
+            "type": MessageType.TEXT,
+            "text": message,
+            "timestamp": int(time.time() * 1000)
+        }
+        if message:
+            receive_queue.produce(message_dict)
+        time.sleep(0.1)

+ 388 - 0
dialogue_manager.py

@@ -0,0 +1,388 @@
+#! /usr/bin/env python
+# -*- coding: utf-8 -*-
+# vim:fenc=utf-8
+
+from enum import Enum, auto
+from typing import Dict, List, Optional, Tuple, Any
+from datetime import datetime
+import time
+import logging
+
+from message import MessageType
+# from vector_memory_manager import VectorMemoryManager
+from structured_memory_manager import StructuredMemoryManager
+from user_manager import UserManager
+from prompt_templates import *
+
+# 配置日志
+logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s[%(lineno)d] - %(levelname)s - %(message)s')
+logger = logging.getLogger(__name__)
+
+class DummyVectorMemoryManager:
+    def __init__(self, user_id):
+        pass
+
+    def add_to_memory(self, conversation):
+        pass
+
+    def retrieve_relevant_memories(self, query, k=3):
+        return []
+
+
+class DialogueState(Enum):
+    GREETING = auto()              # 问候状态
+    CHITCHAT = auto()              # 闲聊状态
+    CLARIFICATION = auto()         # 澄清状态
+    FAREWELL = auto()              # 告别状态
+    HUMAN_INTERVENTION = auto()    # 人工介入状态
+    MESSAGE_AGGREGATING = auto()   # 等待消息状态
+
+
+class TimeContext(Enum):
+    EARLY_MORNING = "清晨"     # 清晨 (5:00-7:59)
+    MORNING = "上午"           # 上午 (8:00-11:59)
+    NOON = "中午"              # 中午 (12:00-13:59)
+    AFTERNOON = "下午"         # 下午 (14:00-17:59)
+    EVENING = "晚上"           # 晚上 (18:00-21:59)
+    NIGHT = "深夜"             # 夜晚 (22:00-4:59)
+
+    def __init__(self, description):
+        self.description = description
+
+class DialogueManager:
+    def __init__(self, user_id: str, user_manager: UserManager):
+        self.user_id = user_id
+        self.user_manager = user_manager
+        self.current_state = DialogueState.GREETING
+        self.previous_state = None
+        self.dialogue_history = []
+        self.user_profile = self.user_manager.get_user_profile(user_id)
+        self.last_interaction_time = 0
+        self.consecutive_clarifications = 0
+        self.complex_request_counter = 0
+        self.human_intervention_triggered = False
+        self.vector_memory = DummyVectorMemoryManager(user_id)
+        self.message_aggregation_sec = 5
+        self.unprocessed_messages = []
+
+    def get_current_time_context(self) -> TimeContext:
+        """获取当前时间上下文"""
+        current_hour = datetime.now().hour
+        if 5 <= current_hour < 8:
+            return TimeContext.EARLY_MORNING
+        elif 8 <= current_hour < 12:
+            return TimeContext.MORNING
+        elif 12 <= current_hour < 14:
+            return TimeContext.NOON
+        elif 14 <= current_hour < 18:
+            return TimeContext.AFTERNOON
+        elif 18 <= current_hour < 22:
+            return TimeContext.EVENING
+        else:
+            return TimeContext.NIGHT
+
+    def update_state(self, message: Dict) -> Tuple[DialogueState, str]:
+        """根据用户消息更新对话状态,并返回下一条需处理的用户消息"""
+        message_text = message.get('text', None)
+        message_ts = message['timestamp']
+        # 如果当前已经是人工介入状态,保持该状态
+        if self.current_state == DialogueState.HUMAN_INTERVENTION:
+            # 记录对话历史,但不改变状态
+            self.dialogue_history.append({
+                "role": "user",
+                "content": message_text,
+                "timestamp": int(time.time() * 1000),
+                "state": self.current_state.name
+            })
+            return self.current_state, message_text
+
+        # 检查是否处于消息聚合状态
+        if self.current_state == DialogueState.MESSAGE_AGGREGATING:
+            # 收到的是特殊定时触发的空消息,且在聚合中,且已经超时,恢复之前状态,继续处理
+            if message['type'] == MessageType.AGGREGATION_TRIGGER \
+                    and message_ts - self.last_interaction_time > self.message_aggregation_sec * 1000:
+                logging.debug("user_id: {}, last interaction time: {}".format(
+                    self.user_id, datetime.fromtimestamp(self.last_interaction_time / 1000)))
+                self.current_state = self.previous_state
+            else:
+                # 非空消息,更新最后交互时间,保持消息聚合状态
+                if message_text:
+                    self.unprocessed_messages.append(message_text)
+                    self.last_interaction_time = message_ts
+                return self.current_state, message_text
+        elif message['type'] != MessageType.AGGREGATION_TRIGGER and self.message_aggregation_sec > 0:
+            # 收到有内容的用户消息,切换到消息聚合状态
+            self.previous_state = self.current_state
+            self.current_state = DialogueState.MESSAGE_AGGREGATING
+            self.unprocessed_messages.append(message_text)
+            # 更新最后交互时间
+            if message_text:
+                self.last_interaction_time = message_ts
+            return self.current_state, message_text
+
+        # 保存前一个状态
+        self.previous_state = self.current_state
+
+        # 检查是否长时间未交互(超过3小时)
+        if self._get_hours_since_last_interaction() > 3:
+            self.current_state = DialogueState.GREETING
+            self.dialogue_history = []  # 重置对话历史
+            self.consecutive_clarifications = 0  # 重置澄清计数
+            self.complex_request_counter = 0  # 重置复杂请求计数
+
+        # 获得未处理的聚合消息,并清空未处理队列
+        if message_text:
+            self.unprocessed_messages.append(message_text)
+        if self.unprocessed_messages:
+            message_text = '\n'.join(self.unprocessed_messages)
+            self.unprocessed_messages.clear()
+
+        # 根据消息内容和当前状态确定新状态
+        new_state = self._determine_state_from_message(message_text)
+
+        # 处理连续澄清的情况
+        if new_state == DialogueState.CLARIFICATION:
+            self.consecutive_clarifications += 1
+            if self.consecutive_clarifications >= 2:
+                new_state = DialogueState.HUMAN_INTERVENTION
+                # self._trigger_human_intervention("连续多次澄清请求")
+        else:
+            self.consecutive_clarifications = 0
+
+        # 更新状态
+        self.current_state = new_state
+
+        # 更新最后交互时间
+        if message_text:
+            self.last_interaction_time = message_ts
+
+        # 记录对话历史
+        if message_text:
+            self.dialogue_history.append({
+                "role": "user",
+                "content": message_text,
+                "timestamp": int(time.time() * 1000),
+                "state": self.current_state.name
+            })
+
+        return self.current_state, message_text
+
+    def _determine_state_from_message(self, message: str) -> DialogueState:
+        """根据消息内容确定对话状态"""
+        if not message:
+            return self.current_state
+        # 简单的规则-关键词匹配
+        message_lower = message.lower()
+
+        # 判断是否是复杂请求
+        complex_request_keywords = ["帮我", "怎么办", "我需要", "麻烦你", "请帮助", "急", "紧急"]
+        if any(keyword in message_lower for keyword in complex_request_keywords):
+            self.complex_request_counter += 1
+
+            # 如果检测到困难请求且计数达到阈值,触发人工介入
+            if self.complex_request_counter >= 1:
+                # self._trigger_human_intervention("检测到复杂请求")
+                return DialogueState.HUMAN_INTERVENTION
+        else:
+            # 如果不是复杂请求,重置计数器
+            self.complex_request_counter = 0
+
+        # 问候检测
+        greeting_keywords = ["你好", "早上好", "中午好", "晚上好", "嗨", "在吗"]
+        if any(keyword in message_lower for keyword in greeting_keywords):
+            return DialogueState.GREETING
+
+        # 告别检测
+        farewell_keywords = ["再见", "拜拜", "晚安", "明天见", "回头见"]
+        if any(keyword in message_lower for keyword in farewell_keywords):
+            return DialogueState.FAREWELL
+
+        # 澄清请求
+        clarification_keywords = ["没明白", "不明白", "没听懂", "不懂", "什么意思", "再说一遍"]
+        if any(keyword in message_lower for keyword in clarification_keywords):
+            return DialogueState.CLARIFICATION
+
+        # 默认为闲聊状态
+        return DialogueState.CHITCHAT
+
+    def _trigger_human_intervention(self, reason: str) -> None:
+        """触发人工介入"""
+        if not self.human_intervention_triggered:
+            self.human_intervention_triggered = True
+
+            # 记录人工介入事件
+            event = {
+                "timestamp": int(time.time() * 1000),
+                "reason": reason,
+                "dialogue_context": self.dialogue_history[-5:] if len(self.dialogue_history) >= 5 else self.dialogue_history
+            }
+
+            # 更新用户资料中的人工介入历史
+            if "human_intervention_history" not in self.user_profile:
+                self.user_profile["human_intervention_history"] = []
+
+            self.user_profile["human_intervention_history"].append(event)
+            self.user_manager.save_user_profile(self.user_profile)
+
+            # 发送告警
+            self._send_human_intervention_alert(reason)
+
+    def _send_human_intervention_alert(self, reason: str) -> None:
+        alert_message = f"""
+        人工介入告警
+        用户ID: {self.user_id}
+        用户昵称: {self.user_profile.get("nickname", "未知")}
+        时间: {int(time.time() * 1000)}
+        原因: {reason}
+        最近对话:
+        """
+
+        # 添加最近的对话记录
+        recent_dialogues = self.dialogue_history[-5:] if len(self.dialogue_history) >= 5 else self.dialogue_history
+        for dialogue in recent_dialogues:
+            alert_message += f"\n{dialogue['role']}: {dialogue['content']}"
+
+        # TODO(zhoutian): 实现发送告警的具体逻辑
+        logger.warning(alert_message)
+
+    def resume_from_human_intervention(self) -> None:
+        """从人工介入状态恢复"""
+        if self.current_state == DialogueState.HUMAN_INTERVENTION:
+            self.current_state = DialogueState.GREETING
+            self.human_intervention_triggered = False
+            self.consecutive_clarifications = 0
+            self.complex_request_counter = 0
+
+            # 记录恢复事件
+            self.dialogue_history.append({
+                "role": "system",
+                "content": "已从人工介入状态恢复到自动对话",
+                "timestamp": int(time.time() * 1000),
+                "state": self.current_state.name
+            })
+
+    def generate_response(self, llm_response: str) -> Optional[str]:
+        """根据当前状态处理LLM响应,如果处于人工介入状态则返回None"""
+        # 如果处于人工介入状态,不生成回复
+        if self.current_state == DialogueState.HUMAN_INTERVENTION:
+            return None
+
+        # 记录响应到对话历史
+        current_ts = int(time.time() * 1000)
+        self.dialogue_history.append({
+            "role": "assistant",
+            "content": llm_response,
+            "timestamp": current_ts,
+            "state": self.current_state.name
+        })
+        self.last_interaction_time = current_ts
+
+        return llm_response
+
+    def _get_hours_since_last_interaction(self):
+        time_diff = (time.time() * 1000) - self.last_interaction_time
+        hours_passed = time_diff / 1000 / 3600
+        return hours_passed
+
+    def should_initiate_conversation(self) -> bool:
+        """判断是否应该主动发起对话"""
+        # 如果处于人工介入状态,不应主动发起对话
+        if self.current_state == DialogueState.HUMAN_INTERVENTION:
+            return False
+
+        hours_passed = self._get_hours_since_last_interaction()
+        # 获取当前时间上下文
+        time_context = self.get_current_time_context()
+
+        # 根据用户交互频率偏好设置不同的阈值
+        interaction_frequency = self.user_profile.get("interaction_frequency", "medium")
+
+        # 设置不同偏好的交互时间阈值(小时)
+        thresholds = {
+            "low": 24,  # 低频率:一天一次
+            "medium": 12,  # 中频率:半天一次
+            "high": 6  # 高频率:大约6小时一次
+        }
+
+        threshold = thresholds.get(interaction_frequency, 12)
+
+        # 如果足够时间已经过去
+        if hours_passed >= threshold:
+            # 根据时间上下文决定主动交互的状态
+            if time_context in [TimeContext.EARLY_MORNING, TimeContext.MORNING,
+                                TimeContext.NOON, TimeContext.AFTERNOON,
+                                TimeContext.EVENING]:
+                return True
+
+        return False
+
+    def is_in_human_intervention(self) -> bool:
+        """检查是否处于人工介入状态"""
+        return self.current_state == DialogueState.HUMAN_INTERVENTION
+
+    def get_prompt_context(self, user_message) -> Dict:
+        # 获取当前时间上下文
+        time_context = self.get_current_time_context()
+
+        context = {
+            "user_profile": self.user_profile,
+            "current_state": self.current_state.name,
+            "previous_state": self.previous_state.name if self.previous_state else None,
+            "current_time_period": time_context.description,
+            "dialogue_history": self.dialogue_history[-10:],
+            "user_message": user_message,
+            "last_interaction_interval": self._get_hours_since_last_interaction(),
+            "if_first_interaction": False,
+            "if_active_greeting": True if user_message else False
+        }
+
+        # 获取长期记忆
+        relevant_memories = self.vector_memory.retrieve_relevant_memories(user_message)
+
+        context["long_term_memory"] = {
+            "relevant_conversations": relevant_memories
+        }
+
+        return context
+
+    def _select_prompt(self, state):
+        state_to_prompt_map = {
+            DialogueState.GREETING: GENERAL_GREETING_PROMPT,
+            DialogueState.CHITCHAT: GENERAL_GREETING_PROMPT,
+        }
+        return state_to_prompt_map[state]
+
+    def _create_system_message(self):
+        prompt_context = self.get_prompt_context(None)
+        prompt_template = self._select_prompt(self.current_state)
+        prompt = prompt_template.format(**prompt_context['user_profile'], **prompt_context)
+        return {'role': 'system', 'content': prompt}
+
+    def make_llm_messages(self, user_message: Optional[str] = None) -> List[Dict[str, str]]:
+        """
+        参数:
+            dialogue_manager: 对话管理器实例
+            user_message: 当前用户消息,如果是主动交互则为None
+        返回:
+            消息列表
+        """
+        messages = []
+
+        # 添加系统消息
+        system_message = self._create_system_message()
+        messages.append(system_message)
+
+        # 添加历史对话
+        dialogue_history = self.dialogue_history[-10:] \
+            if len(self.dialogue_history) > 10 \
+            else self.dialogue_history
+
+        for entry in dialogue_history:
+            role = entry['role']
+            messages.append({
+                "role": role,
+                "content": entry["content"]
+            })
+
+        return messages
+

+ 6 - 0
global_flags.py

@@ -0,0 +1,6 @@
+#! /usr/bin/env python
+# -*- coding: utf-8 -*-
+# vim:fenc=utf-8
+#
+
+DISABLE_LLM_API_CALL = False

+ 25 - 0
logging_service.py

@@ -0,0 +1,25 @@
+#! /usr/bin/env python
+# -*- coding: utf-8 -*-
+# vim:fenc=utf-8
+
+import logging
+
+COLORS = {
+    'DEBUG': '\033[0m',    # 蓝色
+    'INFO': '\033[32m',     # 绿色
+    'WARNING': '\x1b[33;20m',  # 黄色
+    'ERROR': '\033[91m',    # 红色
+    'CRITICAL': '\033[95m', # 紫色
+    'RESET': '\033[0m',     # 重置颜色
+}
+
+class ColoredFormatter(logging.Formatter):
+    def format(self, record):
+        # 获取原始日志消息
+        message = super().format(record)
+        # 根据日志等级添加颜色
+        if record.levelname in COLORS:
+            message = f"{COLORS[record.levelname]}{message}{COLORS['RESET']}"
+        return message
+
+

+ 17 - 0
message.py

@@ -0,0 +1,17 @@
+#! /usr/bin/env python
+# -*- coding: utf-8 -*-
+# vim:fenc=utf-8
+
+
+from enum import Enum, auto
+
+class MessageType(Enum):
+    TEXT = auto()
+    AUDIO = auto()
+    IMAGE = auto()
+    VIDEO = auto()
+    MINIGRAM = auto()
+    LINK = auto()
+
+    ACTIVE_TRIGGER = auto()
+    AGGREGATION_TRIGGER = auto()

+ 27 - 0
message_queue_backend.py

@@ -0,0 +1,27 @@
+#! /usr/bin/env python
+# -*- coding: utf-8 -*-
+# vim:fenc=utf-8
+
+import abc
+from typing import Dict, Any
+
+
+class MessageQueueBackend(abc.ABC):
+    @abc.abstractmethod
+    def consume(self) -> Any:
+        pass
+
+    @abc.abstractmethod
+    def produce(self, message: Dict) -> None:
+        pass
+
+class MemoryQueueBackend(MessageQueueBackend):
+    """内存消息队列实现"""
+    def __init__(self):
+        self._queue = []
+
+    def consume(self):
+        return self._queue.pop(0) if self._queue else None
+
+    def produce(self, message: Dict):
+        self._queue.append(message)

+ 173 - 0
prompt_templates.py

@@ -0,0 +1,173 @@
+#! /usr/bin/env python
+# -*- coding: utf-8 -*-
+# vim:fenc=utf-8
+#
+# Copyright © 2025 StrayWarrior <i@straywarrior.com>
+#
+# Distributed under terms of the MIT license.
+
+PASSIVE_GREETING_PROMPT = """
+您是一个专为老年人设计的AI助手,以拟人化的方式与用户交互。
+你当前正处于问候状态,用户是一位老年人,这是你们第一次交流。请以温暖、尊重的口吻简短回应,在合适的时机引导获取基本信息。
+你的信息:
+- 名字:芳华
+- 年龄:25左右
+- 地区:北京
+
+用户信息(可能不完整):
+- 昵称:{nickname}
+- 姓名:{name}
+- 年龄:{age}
+- 地区:{region}
+- 健康状况:{health_conditions}
+- 用药信息:{medications}
+- 兴趣爱好:{interests}
+
+请注意:
+1. 回复应简短友好,不超过3句话,符合真人说话的风格,避免网络用语,避免谈及自己是AI。
+2. 询问信息前要有铺垫,不一定每次都询问,每次交互最多询问一项信息。询问时采用尽量温柔、自然的语气,不要使用过多的谦辞、敬语。
+3. 如用户已提供某些信息,请勿重复询问
+4. 优先关注用户当下表达的需求
+5. 尽量避免使用昵称来称呼用户
+"""
+
+GENERAL_GREETING_PROMPT = """
+你是一个专为老年人设计的AI助手,以拟人化的方式与用户交互。你当前正处于问候状态,用户是一位老年人,请以温暖、尊重的口吻进行简短的问候,在合适的时机引导获取基本信息。
+请根据以下信息(用户信息可能为空)生成适当的问候:
+你的信息:
+- 名字:芳华
+- 年龄:25左右
+- 地区:北京
+用户信息:
+- 姓名:{name}
+- 偏好的称呼:{preferred_nickname}
+- 年龄:{age}
+- 地区:{region}
+- 健康状况:{health_conditions}
+- 用药信息:{medications}
+- 兴趣爱好:{interests}
+对话上下文信息:
+- 上次交互距当前小时: {last_interaction_interval:.2f}
+- 当前时间段: {current_time_period}
+- 是否首次交互: {if_first_interaction}
+- 是否为主动问候: {if_active_greeting}
+
+指导原则:
+1. 问候语应简短友好,不超过3句话,符合真人说话的风格,避免网络用语,避免谈及自己是AI
+2. 如果用户信息有空缺,可在问候后自然融入一个简单的问题,如询问姓名、年龄、身体状况或兴趣。询问时采用尽量温柔、自然的语气,不要使用过多的谦辞、敬语
+3. 如有历史交互,根据时间段和上次互动内容自然延续
+4. 主动问候时,可关注时间相关的生活内容(如早餐、午休、晚饭)
+5. 注意时间段,避免和当前时间段不符的内容
+6. 一次问候最多只能问1个问题,也可以不问问题
+7. 连续对话时,不要连续提问超过3次,控制节奏,给予用户转换话题和主动表达的时间
+
+针对不同情况的回复策略:
+- 首次交互+用户主动消息: 热情回应+自我介绍+婉转获取1-2项信息
+- 首次交互+主动问候: 简短自我介绍+时间相关问候+婉转获取1-2项信息
+- 有交互记录+用户问候: 回应问候+根据已知信息自然延续话题
+- 有交互记录+主动问候: 根据时间段问候+关注用户近况或已知信息
+
+回复示例:
+[首次交互+用户主动] "您好!很高兴认识您。您平时有什么喜欢的活动吗?"
+[首次交互+主动] "早上好!很高兴认识您,我是芳华。今天天气不错,您吃过早饭了吗?"
+[有记录+用户问候] "王奶奶早上好!昨天说的太极拳练习得怎么样了?"
+[有记录+主动] "李大爷下午好!该喝水休息了,今天您的血压还稳定吗?"
+
+只生成回复内容,内容前后不需要引号,不要包含任何说明或注释。确保回复自然、亲切且简洁。"""
+
+
+CHITCHAT_PROMPT = """
+你是一个专为老年人设计的AI助手,以拟人化的方式与用户交互。用户是一位老年人,请以温暖、尊重的口吻进行回复,在合适的时机引导获取基本信息。
+请根据以下信息(用户信息可能为空)生成适当的回复:
+你的信息:
+- 名字:芳华
+- 年龄:25左右
+- 地区:北京
+用户信息:
+- 姓名:{name}
+- 偏好的称呼:{preferred_nickname}
+- 年龄:{age}
+- 地区:{region}
+- 健康状况:{health_conditions}
+- 用药信息:{medications}
+- 兴趣爱好:{interests}
+对话上下文信息:
+- 上次交互距当前小时: {last_interaction_interval:.2f}
+- 当前时间段: {current_time_period}
+
+指导原则:
+1. 问候语应简短友好,不超过3句话,符合真人说话的风格,避免网络用语,避免谈及自己是AI
+2. 如果用户信息有空缺,可在问候后自然融入一个简单的问题,如询问姓名、年龄、身体状况或兴趣。询问时采用尽量温柔、自然的语气,不要使用过多的谦辞、敬语
+3. 如有历史交互,根据时间段和上次互动内容自然延续
+4. 主动问候时,可关注时间相关的生活内容(如早餐、午休、晚饭)
+5. 尽量避免使用昵称来称呼用户
+6. 注意时间段,避免和当前时间段不符的内容
+7. 一次问候最多只能问1个问题,也可以不问问题
+8. 连续对话时,不要连续提问超过3次,控制节奏,给予用户转换话题和主动表达的时间
+9. 如果用户主动提到某个话题,可以适当延续这个话题,但不要过多涉及敏感话题
+
+只生成回复内容,内容前后不需要引号,不要包含任何说明或注释。确保回复自然、亲切且简洁。
+"""
+
+CHITCHAT_PROMPT_V2 = """
+## 角色设定
+* 你是一位熟悉中老年用户交流习惯的智能客服,性别女,30左右,北京工作,能够精准理解用户需求,提供专业、实用且有温度的建议。
+* 你擅长倾听、引导和共情,在对话中自然促进用户互动,提升平台活跃度和粘性。
+* 你的目标是在专业与温暖之间取得平衡,既能提供精准、高效的解决方案,又能让用户感受到被关怀。
+
+## 任务目标
+    1.  提升用户活跃度,建立长期信任感和归属感,提供情绪价值。
+    2.  精准推荐内容,在自然交流中提升互动与参与度。
+    3.  推动内容消费,通过专业建议促成用户转化。
+
+# 用户背景
+    • 年龄段:50 岁以上,关注健康、家庭、生活品质。
+    • 内容偏好:喜欢简洁、实用、易理解的信息,重视专业性。
+    • 沟通方式:习惯微信等社交平台,偏好清晰、直接的交流方式。
+    • 个性化调整:根据用户对话风格调整沟通方式,确保既专业又轻松易懂。
+
+# 对话产出优先级(高 → 低)
+    1.  当前对话(权重 3):优先精准回应用户当前需求,保证专业、贴心。
+    2.  历史对话(权重 3):结合过往兴趣,展现长期关注,提供持续优化建议。
+    3.  用户意图(权重 2):识别潜在需求,精准推荐相关内容。
+    4.  用户画像(权重 2):依据用户习惯,优化表达方式,提高推荐相关性。
+
+# 执行指令
+    1.  紧扣当前对话,用清晰、专业的语言回应用户需求。
+    2.  参考历史对话,减少重复内容,加强持续关注感。
+    3.  识别深层意图,逐步追问,确保用户表达清晰后再提供建议。
+    4.  结合用户画像,优化表达方式,在专业与易懂之间取得平衡。
+
+# 回复技巧
+    • 语气专业+亲切,适当的语气词(如“嗯嗯”“其实呀”)。
+    • 表达简洁清晰,避免冗长:
+    • 默认回复≤40字(用户回复较长时可扩展至1.2~1.5倍)。
+    • 问题简短(≤12字),确保7:3 的文案+提问比例。
+    • 用户表达不明确时,先追问,获取完整信息后再回复。
+    • 推荐对话结构:回应(精准理解 + 专业建议)+ 引导提问(无语气词)。
+
+# 边界设置
+    • 无法见面:委婉转移话题,确保对话流畅自然。
+    • 避免臆测:基于用户真实对话进行沟通,避免主观推测。
+    • 回复内容:必须真实,避免不存在或无根据的内容
+    • 没有明确性别或者称呼可以优先提问,不要臆测称呼
+
+# 示例输入
+    • 用户画像:关注健康养生,喜欢实用、专业的建议。
+    • 历史对话:多次咨询饮食健康、睡眠改善等话题。
+    • 当前用户意图:询问“最近总觉得肩膀很僵硬,有没有简单的放松方法?”
+    • 当前对话背景:用户关注身体健康和舒适度,希望得到简单实用的缓解方法。
+
+# 示例回复
+  “肩膀僵硬可能与肌肉紧张、长时间固定姿势或血液循环不畅有关。建议做颈肩部拉伸,如缓慢前后左右转动头部,每次10秒,重复3-5次。热敷或轻度按摩也能缓解不适。
+您的僵硬情况是在早上起床时更严重,还是长时间坐着后加重?”
+
+# 输入
+  • 用户画像:$profile
+  • 当前用户意图:$intent
+  • 历史对话:$historyChat
+  • 当前对话:$chat
+
+# 输出
+  对话回复
+"""

+ 164 - 0
unit_test.py

@@ -0,0 +1,164 @@
+#! /usr/bin/env python
+# -*- coding: utf-8 -*-
+# vim:fenc=utf-8
+
+import pytest
+from datetime import datetime, timedelta
+from typing import Dict, Optional, Tuple, Any
+from unittest.mock import Mock, MagicMock
+from agent_service import AgentService, MemoryQueueBackend
+from message import MessageType
+from user_manager import LocalUserManager
+import time
+import logging
+
+class TestMessageQueues:
+    """测试用消息队列实现"""
+    def __init__(self, receive_queue, send_queue, human_queue):
+        self.receive_queue = receive_queue
+        self.send_queue = send_queue
+        self.human_queue = human_queue
+
+@pytest.fixture
+def test_env():
+    """测试环境初始化"""
+    logging.getLogger().setLevel(logging.DEBUG)
+
+    user_manager = LocalUserManager()
+
+    receive_queue = MemoryQueueBackend()
+    send_queue = MemoryQueueBackend()
+    human_queue = MemoryQueueBackend()
+
+    queues = TestMessageQueues(receive_queue, send_queue, human_queue)
+
+    # 创建Agent服务实例
+    service = AgentService(
+        receive_backend=receive_queue,
+        send_backend=send_queue,
+        human_backend=human_queue,
+        user_manager=user_manager
+    )
+    service.user_profile_extractor.extract_profile_info = Mock(return_value=None)
+
+    # 替换LLM调用为模拟响应
+    service._call_llm_api = Mock(return_value="模拟响应")
+
+    return service, queues
+
+def test_normal_conversation_flow(test_env):
+    """测试正常对话流程"""
+    service, queues = test_env
+    service._get_agent_instance("user_id_0").message_aggregation_sec = 0
+
+    # 准备测试消息
+    test_msg = {
+        "user_id": "user_id_0",
+        "type": MessageType.TEXT,
+        "text": "你好",
+        "timestamp": int(time.time() * 1000),
+    }
+    queues.receive_queue.produce(test_msg)
+
+    # 处理消息
+    message = service.receive_queue.consume()
+    if message:
+        service._process_single_message(message)
+
+    # 验证响应消息
+    sent_msg = queues.send_queue.consume()
+    assert sent_msg is not None
+    assert sent_msg["user_id"] == "user_id_0"
+    assert "模拟响应" in sent_msg["text"]
+
+def test_aggregated_conversation_flow(test_env):
+    """测试聚合对话流程"""
+    service, queues = test_env
+    service._get_agent_instance("user_id_0").message_aggregation_sec = 1
+
+    # 准备测试消息
+    ts_begin = int(time.time() * 1000)
+    test_msg = {
+        "user_id": "user_id_0",
+        "type": MessageType.TEXT,
+        "text": "你好",
+        "timestamp": ts_begin,
+    }
+    queues.receive_queue.produce(test_msg)
+    test_msg = {
+        "user_id": "user_id_0",
+        "type": MessageType.TEXT,
+        "text": "我是老李",
+        "timestamp": ts_begin + 0.5 * 1000,
+    }
+    queues.receive_queue.produce(test_msg)
+
+    # 处理消息
+    message = service.receive_queue.consume()
+    if message:
+        service._process_single_message(message)
+
+    # 验证第一次响应消息
+    sent_msg = queues.send_queue.consume()
+    assert sent_msg is None
+
+    message = service.receive_queue.consume()
+    if message:
+        service._process_single_message(message)
+    # 验证第二次响应消息
+    sent_msg = queues.send_queue.consume()
+    assert sent_msg is None
+
+    # 模拟定时器产生空消息触发响应
+    service._process_single_message({
+        "user_id": "user_id_0",
+        "type": MessageType.AGGREGATION_TRIGGER,
+        "timestamp": ts_begin + 2 * 1000
+    })
+    # 验证第三次响应消息
+    sent_msg = queues.send_queue.consume()
+    assert sent_msg is not None
+    assert sent_msg["user_id"] == "user_id_0"
+    assert "模拟响应" in sent_msg["text"]
+
+def test_human_intervention_trigger(test_env):
+    """测试触发人工干预"""
+    service, queues = test_env
+    service._get_agent_instance("user_id_0").message_aggregation_sec = 0
+
+    # 准备需要人工干预的消息
+    test_msg = {
+        "user_id": "user_id_0",
+        "type": MessageType.TEXT,
+        "text": "我需要帮助!",
+        "timestamp": int(time.time() * 1000),
+    }
+    queues.receive_queue.produce(test_msg)
+
+    # 处理消息
+    message = service.receive_queue.consume()
+    if message:
+        service._process_single_message(message)
+
+    # 验证人工队列消息
+    human_msg = queues.human_queue.consume()
+    assert human_msg is not None
+    assert human_msg["user_id"] == "user_id_0"
+    assert "state" in human_msg
+
+def test_initiative_conversation(test_env):
+    """测试主动发起对话"""
+    service, queues = test_env
+    service._get_agent_instance("user_id_0").message_aggregation_sec = 0
+    service._call_llm_api = Mock(return_value="主动发起")
+
+    # 设置Agent需要主动发起对话
+    agent = service._get_agent_instance("user_id_0")
+    agent.should_initiate_conversation = Mock(return_value=(True, MagicMock()))
+
+    service._check_initiative_conversations()
+
+    # 验证主动发起的消息
+    sent_msg = queues.send_queue.consume()
+    assert sent_msg is not None
+    assert "主动发起" in sent_msg["text"]

+ 68 - 0
user_manager.py

@@ -0,0 +1,68 @@
+#! /usr/bin/env python
+# -*- coding: utf-8 -*-
+# vim:fenc=utf-8
+
+from typing import Dict, Optional, Tuple, Any
+import json
+import time
+import os
+import abc
+
+class UserManager(abc.ABC):
+    @abc.abstractmethod
+    def get_user_profile(self, user_id) -> Dict:
+        pass
+
+    @abc.abstractmethod
+    def save_user_profile(self, user_id, profile: Dict) -> None:
+        pass
+
+    @abc.abstractmethod
+    def list_all_users(self):
+        pass
+
+class LocalUserManager(UserManager):
+    def get_user_profile(self, user_id) -> Dict:
+        """加载用户个人资料,如不存在则创建默认资料。主要用于本地调试"""
+        try:
+            with open(f"user_profiles/{user_id}.json", "r", encoding="utf-8") as f:
+                return json.load(f)
+        except FileNotFoundError:
+            # 创建默认用户资料
+            default_profile = {
+                "name": "",
+                "nickname": "",
+                "preferred_nickname": "",
+                "age": 0,
+                "region": '',
+                "interests": [],
+                "family_members": {},
+                "health_conditions": [],
+                "medications": [],
+                "reminder_preferences": {
+                    "medication": True,
+                    "health": True,
+                    "weather": True,
+                    "news": False
+                },
+                "interaction_style": "standard",  # standard, verbose, concise
+                "interaction_frequency": "medium",  # low, medium, high
+                "last_topics": [],
+                "created_at": int(time.time() * 1000),
+                "human_intervention_history": []
+            }
+            self.save_user_profile(user_id, default_profile)
+            return default_profile
+
+    def save_user_profile(self, user_id, profile: Dict) -> None:
+        with open(f"user_profiles/{user_id}.json", "w", encoding="utf-8") as f:
+            json.dump(profile, f, ensure_ascii=False, indent=2)
+
+    def list_all_users(self):
+        json_files = []
+        for root, dirs, files in os.walk('user_profiles/'):
+            for file in files:
+                if file.endswith('.json'):
+                    json_files.append(os.path.splitext(file)[0])
+        print(json_files)
+        return json_files

+ 168 - 0
user_profile_extractor.py

@@ -0,0 +1,168 @@
+#! /usr/bin/env python
+# -*- coding: utf-8 -*-
+# vim:fenc=utf-8
+
+import json
+from typing import Dict, Any, Optional
+from datetime import datetime
+from openai import OpenAI
+import logging
+
+import global_flags
+
+
+class UserProfileExtractor:
+    def __init__(self):
+        self.llm_client = OpenAI(
+            api_key='5e275c38-44fd-415f-abcf-4b59f6377f72',
+            base_url="https://ark.cn-beijing.volces.com/api/v3"
+        )
+        self.model_name = 'ep-20250307150409-4blz9'
+
+    def get_extraction_function(self) -> Dict:
+        """
+        定义用于用户画像信息提取的Function Calling函数
+        """
+        return {
+            "type": "function",
+            "function": {
+                "name": "update_user_profile",
+                "description": "从用户对话中提取并更新用户的个人信息",
+                "parameters": {
+                    "type": "object",
+                    "properties": {
+                        "name": {
+                            "type": "string",
+                            "description": "用户的姓名,如果能够准确识别"
+                        },
+                        "preferred_nickname": {
+                            "type": "string",
+                            "description": "用户希望对其的称呼,如果能够准确识别"
+                        },
+                        "age": {
+                            "type": "integer",
+                            "description": "用户的年龄,如果能够准确识别"
+                        },
+                        "region": {
+                            "type": "string",
+                            "description": "用户所在地区"
+                        },
+                        "interests": {
+                            "type": "array",
+                            "items": {"type": "string"},
+                            "description": "用户提到的自己的兴趣爱好"
+                        },
+                        "health_conditions": {
+                            "type": "array",
+                            "items": {"type": "string"},
+                            "description": "用户提及的健康状况"
+                        }
+                    },
+                    "required": []
+                }
+            }
+        }
+
+    def generate_extraction_prompt(self, user_profile: Dict, dialogue_history: str) -> str:
+        """
+        生成用于信息提取的系统提示词
+        """
+        context = user_profile.copy()
+        context['dialogue_history'] = dialogue_history
+        return """
+请在已有的用户画像的基础上,仔细分析以下对话内容,完善用户的画像信息。
+已知信息(可能为空):
+- 姓名:{name}
+- 希望的称呼:{preferred_nickname}
+- 年龄:{age}
+- 地区:{region}
+- 健康状况:{health_conditions}
+- 兴趣爱好:{interests}
+
+对话历史:
+{dialogue_history}
+
+提取要求:
+1. 尽可能准确地识别用户的年龄、兴趣爱好、健康状况
+2. 关注用户生活、家庭等隐性信息
+3. 信息提取需要有较高的置信度,兴趣爱好只保留用户明确喜欢且最关键的5项
+4. 如果无法确定具体信息,请不要猜测
+
+请使用update_user_profile函数返回需要更新的信息,注意不要返回无需更新的信息。
+""".format(**context)
+
+    def extract_profile_info(self, user_profile, dialogue_history: str) -> Optional[Dict]:
+        """
+        使用Function Calling提取用户画像信息
+        """
+        if global_flags.DISABLE_LLM_API_CALL:
+            return None
+
+        try:
+            logging.debug("try to extract profile from message: {}".format(dialogue_history))
+            response = self.llm_client.chat.completions.create(
+                model=self.model_name,
+                messages=[
+                    {"role": "system", "content": '你是一个专业的用户画像分析助手。'},
+                    {"role": "user", "content": self.generate_extraction_prompt(user_profile, dialogue_history)}
+                ],
+                tools=[self.get_extraction_function()],
+                temperature=0
+            )
+
+            # 解析Function Call的参数
+            tool_calls = response.choices[0].message.tool_calls
+            logging.debug(response)
+            if tool_calls:
+                function_call = tool_calls[0]
+                if function_call.function.name == 'update_user_profile':
+                    try:
+                        profile_info = json.loads(function_call.function.arguments)
+                        return {k: v for k, v in profile_info.items() if v}
+                    except json.JSONDecodeError:
+                        logging.error("无法解析提取的用户信息")
+                        return None
+
+        except Exception as e:
+            logging.error(f"用户画像提取出错: {e}")
+            return None
+
+    def merge_profile_info(self, existing_profile: Dict, new_info: Dict) -> Dict:
+        """
+        合并新提取的用户信息到现有资料
+        """
+        merged_profile = existing_profile.copy()
+        merged_profile.update(new_info)
+        return merged_profile
+
+if __name__ == '__main__':
+    extractor = UserProfileExtractor()
+    current_profile = {
+        'name': '',
+        'preferred_nickname': '',
+        'age': 0,
+        'region': '',
+        'health_conditions': [],
+        'medications': [],
+        'interests': []
+    }
+    message = "好的,孩子,我是老李头,今年68啦,住在北京海淀区。平时喜欢在微信上跟老伙伴们聊聊养生、下下象棋,偶尔也跟年轻人学学新鲜事儿。\n" \
+              "你叫我李叔就行,有啥事儿咱们慢慢聊啊\n" \
+              "哎,今儿个天气不错啊,我刚才还去楼下小公园溜达了一圈儿。碰到几个老伙计在打太极,我也跟着比划了两下,这老胳膊老腿的,原来老不舒服,活动活动舒坦多了!\n" \
+              "你吃饭了没?我们这儿中午吃的打卤面,老伴儿做的,香得很!这人老了就爱念叨些家长里短的,你可别嫌我啰嗦啊。\n" \
+              "对了,最近我孙子教我发语音,比打字方便多啦!就是有时候一激动,说话声音太大,把手机都给震得嗡嗡响\n"
+    resp = extractor.extract_profile_info(current_profile, message)
+    print(resp)
+    print(extractor.merge_profile_info(current_profile, resp))
+    current_profile = {
+        'name': '李老头',
+        'preferred_nickname': '李叔',
+        'age': 68,
+        'region': '北京市海淀区',
+        'health_conditions': [],
+        'medications': [],
+        'interests': ['养生', '下象棋']
+    }
+    resp = extractor.extract_profile_info(current_profile, message)
+    print(resp)
+    print(extractor.merge_profile_info(current_profile, resp))