#! /usr/bin/env python # -*- coding: utf-8 -*- # vim:fenc=utf-8 import time from typing import Dict, List, Tuple, Any import logging from datetime import datetime, timedelta import apscheduler.triggers.cron from apscheduler.schedulers.background import BackgroundScheduler import global_flags from dialogue_manager import DialogueManager, DialogueState from user_manager import UserManager, LocalUserManager from openai import OpenAI from message_queue_backend import MessageQueueBackend, MemoryQueueBackend from user_profile_extractor import UserProfileExtractor import threading from message import MessageType from logging_service import ColoredFormatter class AgentService: def __init__( self, receive_backend: MessageQueueBackend, send_backend: MessageQueueBackend, human_backend: MessageQueueBackend, user_manager: UserManager ): self.receive_queue = receive_backend self.send_queue = send_backend self.human_queue = human_backend # 核心服务模块 self.user_manager = user_manager self.user_profile_extractor = UserProfileExtractor() self.agent_registry: Dict[str, DialogueManager] = {} self.llm_client = OpenAI( api_key='5e275c38-44fd-415f-abcf-4b59f6377f72', base_url="https://ark.cn-beijing.volces.com/api/v3" ) self.model_name = "ep-20250213194558-rrmr2" # DeepSeek on Volces # 定时任务调度器 self.scheduler = BackgroundScheduler() self.scheduler.start() self._setup_initiative_conversations() def _setup_initiative_conversations(self): self.scheduler.add_job( self._check_initiative_conversations, apscheduler.triggers.cron.CronTrigger(second='5,35') ) def _get_agent_instance(self, user_id: str) -> DialogueManager: """获取用户Agent实例""" if user_id not in self.agent_registry: self.agent_registry[user_id] = DialogueManager( user_id, self.user_manager) return self.agent_registry[user_id] def process_messages(self): """持续处理接收队列消息""" while True: message = self.receive_queue.consume() if message: self._process_single_message(message) time.sleep(1) # 避免CPU空转 def _update_user_profile(self, user_id, user_profile, message: str): profile_to_update = self.user_profile_extractor.extract_profile_info(user_profile, message) if not profile_to_update: logging.debug("user_id: {}, no profile info extracted".format(user_id)) return logging.warning("update user profile: {}".format(profile_to_update)) merged_profile = self.user_profile_extractor.merge_profile_info(user_profile, profile_to_update) self.user_manager.save_user_profile(user_id, merged_profile) return merged_profile def _schedule_aggregation_trigger(self, user_id: str, delay_sec: int): logging.debug("user: {}, schedule trigger message after {} seconds".format(user_id, delay_sec)) message = { 'user_id': user_id, 'type': MessageType.AGGREGATION_TRIGGER, 'text': None, 'timestamp': int(time.time() * 1000) + delay_sec * 1000 } self.scheduler.add_job(lambda: self.receive_queue.produce(message), 'date', run_date=datetime.now() + timedelta(seconds=delay_sec)) def _process_single_message(self, message: Dict): user_id = message['user_id'] message_text = message.get('text', None) # 获取用户信息和Agent实例 user_profile = self.user_manager.get_user_profile(user_id) agent = self._get_agent_instance(user_id) # 更新对话状态 logging.debug("process message: {}".format(message)) dialogue_state, message_text = agent.update_state(message) logging.debug("user: {}, next state: {}".format(user_id, dialogue_state)) # 根据状态路由消息 if agent.is_in_human_intervention(): self._route_to_human_intervention(user_id, message_text, dialogue_state) elif dialogue_state == DialogueState.MESSAGE_AGGREGATING: if message['type'] != MessageType.AGGREGATION_TRIGGER: # 产生一个触发器,但是不能由触发器递归产生 logging.debug("user: {}, waiting next message for aggregation".format(user_id)) self._schedule_aggregation_trigger(user_id, agent.message_aggregation_sec) return else: # 先更新用户画像再处理回复 self._update_user_profile(user_id, user_profile, message_text) self._process_llm_response(user_id, agent, message_text) def _route_to_human_intervention(self, user_id: str, user_message: str, state: DialogueState): """路由到人工干预""" self.human_queue.produce({ 'user_id': user_id, 'state': state, 'timestamp': datetime.now().isoformat() }) def _process_llm_response(self, user_id: str, agent: DialogueManager, user_message: str): """处理LLM响应""" messages = agent.make_llm_messages(user_message) logging.debug(messages) llm_response = self._call_llm_api(messages) if response := agent.generate_response(llm_response): logging.warning("user: {}, response: {}".format(user_id, response)) self.send_queue.produce({ 'user_id': user_id, 'text': response, }) def _check_initiative_conversations(self): """定时检查主动发起对话""" for user_id in self.user_manager.list_all_users(): agent = self._get_agent_instance(user_id) should_initiate = agent.should_initiate_conversation() if should_initiate: logging.warning("user: {}, initiate conversation".format(user_id)) self._process_llm_response(user_id, agent, None) else: logging.debug("user: {}, do not initiate conversation".format(user_id)) def _call_llm_api(self, messages: List[Dict]) -> str: if global_flags.DISABLE_LLM_API_CALL: return 'LLM模拟回复' chat_completion = self.llm_client.chat.completions.create( messages=messages, model=self.model_name, ) response = chat_completion.choices[0].message.content return response if __name__ == "__main__": logging.getLogger().setLevel(logging.DEBUG) console_handler = logging.StreamHandler() console_handler.setLevel(logging.WARNING) formatter = ColoredFormatter( '%(asctime)s - %(funcName)s[%(lineno)d] - %(levelname)s - %(message)s' ) console_handler.setFormatter(formatter) root_logger = logging.getLogger() root_logger.handlers.clear() root_logger.addHandler(console_handler) scheduler_logger = logging.getLogger('apscheduler') scheduler_logger.setLevel(logging.WARNING) # 初始化不同队列的后端 receive_queue = MemoryQueueBackend() send_queue = MemoryQueueBackend() human_queue = MemoryQueueBackend() # 初始化用户管理服务 user_manager = LocalUserManager() global_flags.DISABLE_LLM_API_CALL = False # 创建Agent服务 service = AgentService( receive_backend=receive_queue, send_backend=send_queue, human_backend=human_queue, user_manager=user_manager ) process_thread = threading.Thread(target=service.process_messages) process_thread.start() while True: print("Input next message: ") message = sys.stdin.readline().strip() message_dict = { "user_id": "user_id_1", "type": MessageType.TEXT, "text": message, "timestamp": int(time.time() * 1000) } if message: receive_queue.produce(message_dict) time.sleep(0.1)