#! /usr/bin/env python # -*- coding: utf-8 -*- # vim:fenc=utf-8 import re import signal import sys import time from typing import Dict, List, Optional import logging from datetime import datetime, timedelta import threading import traceback import apscheduler.triggers.cron import rocketmq from apscheduler.schedulers.background import BackgroundScheduler from sqlalchemy.orm import sessionmaker from pqai_agent import configs from pqai_agent.configs import apollo_config from pqai_agent.exceptions import NoRetryException from pqai_agent.logging_service import logger from pqai_agent import chat_service from pqai_agent.chat_service import CozeChat, ChatServiceType from pqai_agent.dialogue_manager import DialogueManager, DialogueState, DialogueStateCache from pqai_agent.history_dialogue_service import HistoryDialogueDatabase from pqai_agent.push_service import PushScanThread, PushTaskWorkerPool from pqai_agent.rate_limiter import MessageSenderRateLimiter from pqai_agent.response_type_detector import ResponseTypeDetector from pqai_agent.user_manager import UserManager, UserRelationManager from pqai_agent.message_queue_backend import MessageQueueBackend, AliyunRocketMQQueueBackend from pqai_agent.user_profile_extractor import UserProfileExtractor from pqai_agent.mq_message import MessageType, Message, MessageChannel from pqai_agent.utils.db_utils import create_sql_engine class AgentService: def __init__( self, receive_backend: MessageQueueBackend, send_backend: MessageQueueBackend, human_backend: MessageQueueBackend, user_manager: UserManager, user_relation_manager: UserRelationManager, chat_service_type: ChatServiceType = ChatServiceType.OPENAI_COMPATIBLE ): self.config = configs.get() self.receive_queue = receive_backend self.send_queue = send_backend self.human_queue = human_backend # 核心服务模块 self.agent_state_cache = DialogueStateCache() self.user_manager = user_manager self.user_relation_manager = user_relation_manager self.user_profile_extractor = UserProfileExtractor() self.response_type_detector = ResponseTypeDetector() self.agent_registry: Dict[str, DialogueManager] = {} self.history_dialogue_db = HistoryDialogueDatabase(self.config['storage']['user']['mysql']) self.agent_db_engine = create_sql_engine(self.config['storage']['agent_state']['mysql']) self.AgentDBSession = sessionmaker(bind=self.agent_db_engine) chat_config = self.config['chat_api']['openai_compatible'] self.text_model_name = chat_config['text_model'] self.multimodal_model_name = chat_config['multimodal_model'] self.text_model_client = chat_service.OpenAICompatible.create_client(self.text_model_name) self.multimodal_model_client = chat_service.OpenAICompatible.create_client(self.multimodal_model_name) coze_config = configs.get()['chat_api']['coze'] coze_oauth_app = CozeChat.get_oauth_app( coze_config['oauth_client_id'], coze_config['private_key_path'], str(coze_config['public_key_id']), account_id=coze_config.get('account_id', None) ) self.coze_client = CozeChat( base_url=chat_service.COZE_CN_BASE_URL, auth_app=coze_oauth_app ) self.chat_service_type = chat_service_type # 定时任务调度器 self.scheduler = None self.scheduler_mode = self.config.get('system', {}).get('scheduler_mode', 'local') self.scheduler_queue = None self.msg_scheduler_thread = None self.running = False self.process_threads = [] self._sigint_cnt = 0 # Push相关 self.push_task_producer = None self.push_task_consumer = None self._init_push_task_queue() self.next_push_disabled = True self._resume_unfinished_push_task() self.send_rate_limiter = MessageSenderRateLimiter() def setup_initiative_conversations(self, schedule_params: Optional[Dict] = None): if not schedule_params: schedule_params = {'hour': '8,16,20'} self.scheduler.add_job( self._check_initiative_conversations, apscheduler.triggers.cron.CronTrigger(**schedule_params) ) def setup_scheduler(self): self.scheduler = BackgroundScheduler() if self.scheduler_mode == 'mq': logging.info("setup event message scheduler with MQ") mq_conf = self.config['mq'] topic = mq_conf['scheduler_topic'] self.scheduler_queue = AliyunRocketMQQueueBackend( mq_conf['endpoints'], mq_conf['instance_id'], topic, has_consumer=True, has_producer=True, group_id=mq_conf['scheduler_group'], topic_type='DELAY', await_duration=5 ) self.msg_scheduler_thread = threading.Thread(target=self.process_scheduler_events) self.msg_scheduler_thread.start() self.scheduler.start() def process_scheduler_events(self): while self.running: msg = self.scheduler_queue.consume() if msg: try: self.process_scheduler_event(msg) self.scheduler_queue.ack(msg) except Exception as e: logger.error("Error processing scheduler event: {}".format(e)) time.sleep(1) logger.info("Scheduler event processing thread exit") def process_scheduler_event(self, msg: Message): if msg.type == MessageType.AGGREGATION_TRIGGER: # 延迟触发的消息,需放入接收队列以驱动Agent运转 self.receive_queue.produce(msg) else: logger.warning(f"Unknown message type: {msg.type}") def get_agent_instance(self, staff_id: str, user_id: str) -> DialogueManager: """获取Agent实例""" agent_key = 'agent_{}_{}'.format(staff_id, user_id) if agent_key not in self.agent_registry: self.agent_registry[agent_key] = DialogueManager( staff_id, user_id, self.user_manager, self.agent_state_cache, self.AgentDBSession) agent = self.agent_registry[agent_key] agent.refresh_profile() return agent def create_queue_consumer(self) -> MessageQueueBackend: # 只有在MQ模式下才需要创建多消费者 if not self.config.get('debug_flags', {}).get('use_aliyun_mq', False): logger.warning("Do not create queue consumer in local mode") return self.receive_queue mq_config = self.config['mq'] consumer = AliyunRocketMQQueueBackend( endpoints=mq_config['endpoints'], instance_id=mq_config['instance_id'], topic=mq_config['receive_topic'], has_consumer=True, has_producer=False, group_id=mq_config['receive_group'], topic_type='FIFO', await_duration=10 ) return consumer def process_messages(self): """持续处理接收队列消息,通过顺序消息的消息组保证同一<用户, 客服>的消费保序,可并发处理""" receive_queue = self.create_queue_consumer() # 消费者创建后等一会儿再开始消费,否则可能远端没准备好会报错 time.sleep(1) while self.running: message = receive_queue.consume() if message: try: self.process_single_message(message) receive_queue.ack(message) except NoRetryException as e: logger.error("Error processing message and skip retry: {}".format(e)) receive_queue.ack(message) except Exception as e: error_stack = traceback.format_exc() logger.error("Error processing message: {}, {}".format(e, error_stack)) time.sleep(0.1) receive_queue.shutdown() logger.info("Message processing thread exit") def start(self, blocking=False): self.running = True max_reply_workers = self.config.get('system', {}).get('max_reply_workers', 1) self.process_threads = [] for i in range(max_reply_workers): thread = threading.Thread(target=self.process_messages) thread.start() self.process_threads.append(thread) self.setup_scheduler() # 只有企微场景需要主动发起 if not self.config['debug_flags'].get('disable_active_conversation', False): schedule_param = self.config['agent_behavior'].get('active_conversation_schedule_param', None) self.setup_initiative_conversations(schedule_param) signal.signal(signal.SIGINT, self._handle_sigint) if blocking: for thread in self.process_threads: thread.join() logger.debug("process threads finished") def shutdown(self, sync=True): if not self.running: raise Exception("Service is not running") self.running = False self.scheduler.shutdown() logger.debug("scheduler shutdown") if sync: for thread in self.process_threads: thread.join() logger.debug("message processing threads finished") if self.msg_scheduler_thread: self.msg_scheduler_thread.join() self.scheduler_queue.shutdown() logger.debug("scheduler message processing thread finished") self.receive_queue.shutdown() self.send_queue.shutdown() logger.debug("receive and send queues shutdown") def _handle_sigint(self, signum, frame): self._sigint_cnt += 1 if self._sigint_cnt == 1: logger.warning("Try to shutdown gracefully...") self.shutdown(sync=True) else: logger.warning("Forcing exit") sys.exit(0) def _update_user_profile(self, user_id, user_profile, recent_dialogue: List[Dict]): profile_to_update = self.user_profile_extractor.extract_profile_info(user_profile, recent_dialogue) if not profile_to_update: logger.debug("user_id: {}, no profile info extracted".format(user_id)) return logger.warning("update user profile: {}".format(profile_to_update)) if profile_to_update.get('interaction_frequency', None) == 'stopped': # 和企微日常push联动,减少对用户的干扰 if self.user_relation_manager.stop_user_daily_push(user_id): logger.warning(f"user[{user_id}]: daily push set to be stopped") merged_profile = self.user_profile_extractor.merge_profile_info(user_profile, profile_to_update) self.user_manager.save_user_profile(user_id, merged_profile) return merged_profile def _schedule_aggregation_trigger(self, staff_id: str, user_id: str, delay_sec: int): logger.debug("user: {}, schedule trigger message after {} seconds".format(user_id, delay_sec)) message_ts = int((time.time() + delay_sec) * 1000) msg = Message.build(MessageType.AGGREGATION_TRIGGER, MessageChannel.SYSTEM, user_id, staff_id, None, message_ts) # 系统消息使用特定的msgId,无实际意义 msg.msgId = -MessageType.AGGREGATION_TRIGGER.value if self.scheduler_mode == 'mq': self.scheduler_queue.produce(msg, msg_group='agent_system') else: self.scheduler.add_job(lambda: self.receive_queue.produce(msg), 'date', run_date=datetime.now() + timedelta(seconds=delay_sec)) def process_single_message(self, message: Message): user_id = message.sender staff_id = message.receiver # 获取用户信息和Agent实例 user_profile = self.user_manager.get_user_profile(user_id) agent = self.get_agent_instance(staff_id, user_id) if not agent.is_valid(): logger.error(f"staff[{staff_id}] user[{user_id}]: agent is invalid") raise Exception('agent is invalid') # 更新对话状态 logger.debug("process message: {}".format(message)) need_response, message_text = agent.update_state(message) logger.debug("user: {}, next state: {}".format(user_id, agent.current_state)) # 根据状态路由消息 try: if agent.is_in_human_intervention(): self._route_to_human_intervention(user_id, message) elif agent.current_state == DialogueState.MESSAGE_AGGREGATING: if message.type != MessageType.AGGREGATION_TRIGGER: # 产生一个触发器,但是不能由触发器递归产生 logger.debug("user: {}, waiting next message for aggregation".format(user_id)) self._schedule_aggregation_trigger(staff_id, user_id, agent.message_aggregation_sec) elif need_response: # 先更新用户画像再处理回复 self._update_user_profile(user_id, user_profile, agent.dialogue_history[-10:]) resp = self._get_chat_response(user_id, agent, message_text) if resp: recent_dialogue = agent.dialogue_history[-10:] agent_voice_whitelist = set(apollo_config.get_json_value("agent_voice_whitelist", [])) if len(recent_dialogue) < 2 or staff_id not in agent_voice_whitelist: message_type = MessageType.TEXT else: message_type = self.response_type_detector.detect_type( recent_dialogue[:-1], recent_dialogue[-1], enable_random=True) self.send_response(staff_id, user_id, resp, message_type) else: logger.debug(f"staff[{staff_id}], user[{user_id}]: do not need response") # 当前消息处理成功,commit并持久化agent状态 agent.persist_state() except Exception as e: agent.rollback_state() raise e def send_response(self, staff_id, user_id, response, message_type: MessageType, skip_check=False): logger.warning(f"staff[{staff_id}] user[{user_id}]: response[{message_type}] {response}") current_ts = int(time.time() * 1000) user_tags = self.user_relation_manager.get_user_tags(user_id) white_list_tags = set(apollo_config.get_json_value("agent_response_whitelist_tags", [])) hit_white_list_tags = len(set(user_tags).intersection(white_list_tags)) > 0 # FIXME(zhoutian) # 测试期间临时逻辑,只发送特定的账号或特定用户 staff_white_lists = set(apollo_config.get_json_value("agent_response_whitelist_staffs", [])) if not (staff_id in staff_white_lists or hit_white_list_tags or skip_check): logger.warning(f"staff[{staff_id}] user[{user_id}]: skip reply") return self.send_rate_limiter.wait_for_sending(staff_id, response) self.send_queue.produce( Message.build(message_type, MessageChannel.CORP_WECHAT, staff_id, user_id, response, current_ts) ) def _route_to_human_intervention(self, user_id: str, origin_message: Message): """路由到人工干预""" self.human_queue.produce(Message.build( MessageType.TEXT, origin_message.channel, origin_message.sender, origin_message.receiver, "用户对话需人工介入,用户名:{}".format(user_id), int(time.time() * 1000) )) def _init_push_task_queue(self): credentials = rocketmq.Credentials() mq_conf = configs.get()['mq'] rmq_client_conf = rocketmq.ClientConfiguration(mq_conf['endpoints'], credentials, mq_conf['instance_id']) rmq_topic = mq_conf['push_tasks_topic'] rmq_group = mq_conf['push_tasks_group'] self.push_task_rmq_topic = rmq_topic self.push_task_producer = rocketmq.Producer(rmq_client_conf, (rmq_topic,)) self.push_task_producer.startup() self.push_task_consumer = rocketmq.SimpleConsumer(rmq_client_conf, rmq_group, await_duration=5) self.push_task_consumer.startup() self.push_task_consumer.subscribe(rmq_topic) def _resume_unfinished_push_task(self): def run_unfinished_push_task(): logger.info("start to resume unfinished push task") push_task_worker_pool = PushTaskWorkerPool( self, self.push_task_rmq_topic, self.push_task_consumer, self.push_task_producer) push_task_worker_pool.start() push_task_worker_pool.wait_to_finish() self.next_push_disabled = False logger.info("unfinished push tasks should be finished") thread = threading.Thread(target=run_unfinished_push_task) thread.start() def _check_initiative_conversations(self): logger.info("start to check initiative conversations") if self.next_push_disabled: logger.info("previous push tasks in processing, next push is disabled") return if not DialogueManager.is_time_suitable_for_active_conversation(): logger.info("time is not suitable for active conversation") return push_scan_threads = [] for staff in self.user_relation_manager.list_staffs(): staff_id = staff['third_party_user_id'] scan_thread = threading.Thread(target=PushScanThread( staff_id, self, self.push_task_rmq_topic, self.push_task_producer).run) scan_thread.start() push_scan_threads.append(scan_thread) push_task_worker_pool = PushTaskWorkerPool( self, self.push_task_rmq_topic, self.push_task_consumer, self.push_task_producer) push_task_worker_pool.start() for thread in push_scan_threads: thread.join() # 由于扫描和生成异步,两次扫描之间可能消息并未处理完,会有重复生成任务的情况,因此需等待上一次任务结束 # 问题在于,如果每次创建出新的PushTaskWorkerPool,在上次任务有未处理完的消息即退出时,会有未处理的消息堆积 push_task_worker_pool.wait_to_finish() def _check_initiative_conversations_v1(self): logger.info("start to check initiative conversations") if not DialogueManager.is_time_suitable_for_active_conversation(): logger.info("time is not suitable for active conversation") return white_list_tags = set(apollo_config.get_json_value('agent_initiate_whitelist_tags', [])) first_initiate_tags = set(apollo_config.get_json_value('agent_first_initiate_whitelist_tags', [])) # 合并白名单,减少配置成本 white_list_tags.update(first_initiate_tags) voice_tags = set(apollo_config.get_json_value('agent_initiate_by_voice_tags', [])) """定时检查主动发起对话""" for staff_user in self.user_relation_manager.list_staff_users(): staff_id = staff_user['staff_id'] user_id = staff_user['user_id'] agent = self.get_agent_instance(staff_id, user_id) should_initiate = agent.should_initiate_conversation() user_tags = self.user_relation_manager.get_user_tags(user_id) if configs.get_env() != 'dev' and not white_list_tags.intersection(user_tags): should_initiate = False if should_initiate: logger.warning(f"user[{user_id}], tags{user_tags}: initiate conversation") # FIXME:虽然需要主动唤起的用户同时发来消息的概率很低,但仍可能会有并发冲突 需要并入事件驱动框架 agent.do_state_change(DialogueState.GREETING) try: if agent.previous_state == DialogueState.INITIALIZED or first_initiate_tags.intersection(user_tags): # 完全无交互历史的用户才使用此策略,但新用户接入即会产生“我已添加了你”的消息将Agent初始化 # 因此存量用户无法使用该状态做实验 # TODO:增加基于对话历史的判断、策略去重;如果对话间隔过长需要使用长期记忆检索;在无长期记忆时,可采用用户添加时间来判断 resp = self._generate_active_greeting_message(agent, user_tags) else: resp = self._get_chat_response(user_id, agent, None) if resp: if set(user_tags).intersection(voice_tags): message_type = MessageType.VOICE else: message_type = MessageType.TEXT self.send_response(staff_id, user_id, resp, message_type, skip_check=True) agent.persist_state() except Exception as e: # FIXME:虽然需要主动唤起的用户同时发来消息的概率很低,但仍可能会有并发冲突 agent.rollback_state() logger.error("Error in active greeting: {}".format(e)) else: logger.debug(f"user[{user_id}], do not initiate conversation") def _generate_active_greeting_message(self, agent: DialogueManager, user_tags: List[str]=None): chat_config = agent.build_active_greeting_config(user_tags) chat_response = self._call_chat_api(chat_config, ChatServiceType.OPENAI_COMPATIBLE) chat_response = self.sanitize_response(chat_response) if response := agent.generate_response(chat_response): return response else: logger.warning(f"staff[{agent.staff_id}] user[{agent.user_id}]: no response generated") return None def _get_chat_response(self, user_id: str, agent: DialogueManager, user_message: Optional[str]): """处理LLM响应""" chat_config = agent.build_chat_configuration(user_message, self.chat_service_type) config_for_logging = chat_config.copy() config_for_logging['messages'] = config_for_logging['messages'][-20:] logger.debug(config_for_logging) chat_response = self._call_chat_api(chat_config, self.chat_service_type) chat_response = self.sanitize_response(chat_response) if response := agent.generate_response(chat_response): return response else: logger.warning(f"staff[{agent.staff_id}] user[{user_id}]: no response generated") return None def _call_chat_api(self, chat_config: Dict, chat_service_type: ChatServiceType) -> str: if configs.get().get('debug_flags', {}).get('disable_llm_api_call', False): return 'LLM模拟回复 {}'.format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) if chat_service_type == ChatServiceType.OPENAI_COMPATIBLE: # 指定了LLM模型则优先使用指定模型 if chat_config.get('model_name', None): llm_client = chat_service.OpenAICompatible.create_client(chat_config['model_name']) chat_completion = llm_client.chat.completions.create( messages=chat_config['messages'], model=chat_config['model_name'], ) elif chat_config.get('use_multimodal_model', False): chat_completion = self.multimodal_model_client.chat.completions.create( messages=chat_config['messages'], model=self.multimodal_model_name, ) else: chat_completion = self.text_model_client.chat.completions.create( messages=chat_config['messages'], model=self.text_model_name, ) response = chat_completion.choices[0].message.content elif chat_service_type == ChatServiceType.COZE_CHAT: bot_user_id = 'qywx_{}'.format(chat_config['user_id']) response = self.coze_client.create( chat_config['bot_id'], bot_user_id, chat_config['messages'], chat_config['custom_variables'] ) else: raise Exception('Unsupported chat service type: {}'.format(self.chat_service_type)) return response @staticmethod def sanitize_response(response: str): pattern = r'\[?\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\]?' response = re.sub(pattern, '', response) response = response.strip() return response