#! /usr/bin/env python # -*- coding: utf-8 -*- # vim:fenc=utf-8 import sys import time from typing import Dict, List, Tuple, Any import logging from datetime import datetime, timedelta import apscheduler.triggers.cron from apscheduler.schedulers.background import BackgroundScheduler import chat_service import global_flags from chat_service import CozeChat, ChatServiceType from dialogue_manager import DialogueManager, DialogueState from user_manager import UserManager, LocalUserManager from openai import OpenAI from message_queue_backend import MessageQueueBackend, MemoryQueueBackend from user_profile_extractor import UserProfileExtractor import threading from message import MessageType from logging_service import ColoredFormatter class AgentService: def __init__( self, receive_backend: MessageQueueBackend, send_backend: MessageQueueBackend, human_backend: MessageQueueBackend, user_manager: UserManager, chat_service_type: ChatServiceType = ChatServiceType.OPENAI_COMPATIBLE ): self.receive_queue = receive_backend self.send_queue = send_backend self.human_queue = human_backend # 核心服务模块 self.user_manager = user_manager self.user_profile_extractor = UserProfileExtractor() self.agent_registry: Dict[str, DialogueManager] = {} self.llm_client = OpenAI( api_key='5e275c38-44fd-415f-abcf-4b59f6377f72', base_url="https://ark.cn-beijing.volces.com/api/v3" ) # DeepSeek on Volces self.model_name = "ep-20250213194558-rrmr2" self.coze_client = CozeChat( token=chat_service.COZE_API_TOKEN, base_url=chat_service.COZE_CN_BASE_URL ) self.chat_service_type = chat_service_type # 定时任务调度器 self.scheduler = BackgroundScheduler() self.scheduler.start() self._setup_initiative_conversations() def _setup_initiative_conversations(self): self.scheduler.add_job( self._check_initiative_conversations, apscheduler.triggers.cron.CronTrigger(second='5,35') ) def _get_agent_instance(self, user_id: str) -> DialogueManager: """获取用户Agent实例""" if user_id not in self.agent_registry: self.agent_registry[user_id] = DialogueManager( user_id, self.user_manager) return self.agent_registry[user_id] def process_messages(self): """持续处理接收队列消息""" while True: message = self.receive_queue.consume() if message: self.process_single_message(message) time.sleep(1) # 避免CPU空转 def _update_user_profile(self, user_id, user_profile, message: str): profile_to_update = self.user_profile_extractor.extract_profile_info(user_profile, message) if not profile_to_update: logging.debug("user_id: {}, no profile info extracted".format(user_id)) return logging.warning("update user profile: {}".format(profile_to_update)) merged_profile = self.user_profile_extractor.merge_profile_info(user_profile, profile_to_update) self.user_manager.save_user_profile(user_id, merged_profile) return merged_profile def _schedule_aggregation_trigger(self, user_id: str, delay_sec: int): logging.debug("user: {}, schedule trigger message after {} seconds".format(user_id, delay_sec)) message = { 'user_id': user_id, 'type': MessageType.AGGREGATION_TRIGGER, 'text': None, 'timestamp': int(time.time() * 1000) + delay_sec * 1000 } self.scheduler.add_job(lambda: self.receive_queue.produce(message), 'date', run_date=datetime.now() + timedelta(seconds=delay_sec)) def process_single_message(self, message: Dict): user_id = message['user_id'] message_text = message.get('text', None) # 获取用户信息和Agent实例 user_profile = self.user_manager.get_user_profile(user_id) agent = self._get_agent_instance(user_id) # 更新对话状态 logging.debug("process message: {}".format(message)) dialogue_state, message_text = agent.update_state(message) logging.debug("user: {}, next state: {}".format(user_id, dialogue_state)) # 根据状态路由消息 if agent.is_in_human_intervention(): self._route_to_human_intervention(user_id, message_text, dialogue_state) elif dialogue_state == DialogueState.MESSAGE_AGGREGATING: if message['type'] != MessageType.AGGREGATION_TRIGGER: # 产生一个触发器,但是不能由触发器递归产生 logging.debug("user: {}, waiting next message for aggregation".format(user_id)) self._schedule_aggregation_trigger(user_id, agent.message_aggregation_sec) return else: # 先更新用户画像再处理回复 self._update_user_profile(user_id, user_profile, message_text) self._get_chat_response(user_id, agent, message_text) def _route_to_human_intervention(self, user_id: str, user_message: str, state: DialogueState): """路由到人工干预""" self.human_queue.produce({ 'user_id': user_id, 'state': state, 'timestamp': datetime.now().isoformat() }) def _check_initiative_conversations(self): """定时检查主动发起对话""" for user_id in self.user_manager.list_all_users(): agent = self._get_agent_instance(user_id) should_initiate = agent.should_initiate_conversation() if should_initiate: logging.warning("user: {}, initiate conversation".format(user_id)) self._get_chat_response(user_id, agent, None) else: logging.debug("user: {}, do not initiate conversation".format(user_id)) def _get_chat_response(self, user_id: str, agent: DialogueManager, user_message: str): """处理LLM响应""" chat_config = agent.build_chat_configuration(user_message, self.chat_service_type) logging.debug(chat_config) # FIXME(zhoutian): 这里的抽象不够好,DialogueManager和AgentService有耦合 chat_response = self._call_chat_api(chat_config) if response := agent.generate_response(chat_response): logging.warning("user: {}, response: {}".format(user_id, response)) self.send_queue.produce({ 'user_id': user_id, 'type': MessageType.TEXT, 'text': response, }) def _call_chat_api(self, chat_config: Dict) -> str: if global_flags.DISABLE_LLM_API_CALL: return 'LLM模拟回复' if self.chat_service_type == ChatServiceType.OPENAI_COMPATIBLE: chat_completion = self.llm_client.chat.completions.create( messages=chat_config['messages'], model=self.model_name, ) response = chat_completion.choices[0].message.content elif self.chat_service_type == ChatServiceType.COZE_CHAT: bot_user_id = 'dev_user' response = self.coze_client.create( chat_config['bot_id'], bot_user_id, chat_config['messages'], chat_config['custom_variables'] ) else: raise Exception('Unsupported chat service type: {}'.format(self.chat_service_type)) return response if __name__ == "__main__": logging.getLogger().setLevel(logging.DEBUG) console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) formatter = ColoredFormatter( '%(asctime)s - %(funcName)s[%(lineno)d] - %(levelname)s - %(message)s' ) console_handler.setFormatter(formatter) root_logger = logging.getLogger() root_logger.handlers.clear() root_logger.addHandler(console_handler) scheduler_logger = logging.getLogger('apscheduler') scheduler_logger.setLevel(logging.WARNING) # 初始化不同队列的后端 receive_queue = MemoryQueueBackend() send_queue = MemoryQueueBackend() human_queue = MemoryQueueBackend() # 初始化用户管理服务 user_manager = LocalUserManager() global_flags.DISABLE_LLM_API_CALL = False # 创建Agent服务 service = AgentService( receive_backend=receive_queue, send_backend=send_queue, human_backend=human_queue, user_manager=user_manager, chat_service_type=ChatServiceType.COZE_CHAT ) process_thread = threading.Thread(target=service.process_messages) process_thread.start() while True: print("Input next message: ") message = sys.stdin.readline().strip() message_dict = { "user_id": "user_id_1", "type": MessageType.TEXT, "text": message, "timestamp": int(time.time() * 1000) } if message: receive_queue.produce(message_dict) time.sleep(0.1)