agent_service.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481
  1. #! /usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # vim:fenc=utf-8
  4. import re
  5. import signal
  6. import sys
  7. import time
  8. import random
  9. from typing import Dict, List, Tuple, Any, Optional
  10. import logging
  11. from datetime import datetime, timedelta
  12. import traceback
  13. import apscheduler.triggers.cron
  14. from apscheduler.schedulers.background import BackgroundScheduler
  15. import chat_service
  16. import configs
  17. import logging_service
  18. from configs import apollo_config
  19. from logging_service import logger
  20. from chat_service import CozeChat, ChatServiceType
  21. from dialogue_manager import DialogueManager, DialogueState, DialogueStateCache
  22. from response_type_detector import ResponseTypeDetector
  23. from user_manager import UserManager, LocalUserManager, MySQLUserManager, MySQLUserRelationManager, UserRelationManager, \
  24. LocalUserRelationManager
  25. from openai import OpenAI
  26. from message_queue_backend import MessageQueueBackend, MemoryQueueBackend, AliyunRocketMQQueueBackend
  27. from user_profile_extractor import UserProfileExtractor
  28. import threading
  29. from message import MessageType, Message, MessageChannel
  30. class AgentService:
  31. def __init__(
  32. self,
  33. receive_backend: MessageQueueBackend,
  34. send_backend: MessageQueueBackend,
  35. human_backend: MessageQueueBackend,
  36. user_manager: UserManager,
  37. user_relation_manager: UserRelationManager,
  38. chat_service_type: ChatServiceType = ChatServiceType.OPENAI_COMPATIBLE
  39. ):
  40. self.receive_queue = receive_backend
  41. self.send_queue = send_backend
  42. self.human_queue = human_backend
  43. # 核心服务模块
  44. self.agent_state_cache = DialogueStateCache()
  45. self.user_manager = user_manager
  46. self.user_relation_manager = user_relation_manager
  47. self.user_profile_extractor = UserProfileExtractor()
  48. self.response_type_detector = ResponseTypeDetector()
  49. self.agent_registry: Dict[str, DialogueManager] = {}
  50. self.config = configs.get()
  51. chat_config = self.config['chat_api']['openai_compatible']
  52. self.text_model_name = chat_config['text_model']
  53. self.multimodal_model_name = chat_config['multimodal_model']
  54. self.text_model_client = chat_service.OpenAICompatible.create_client(self.text_model_name)
  55. self.multimodal_model_client = chat_service.OpenAICompatible.create_client(self.multimodal_model_name)
  56. coze_config = configs.get()['chat_api']['coze']
  57. coze_oauth_app = CozeChat.get_oauth_app(
  58. coze_config['oauth_client_id'], coze_config['private_key_path'], str(coze_config['public_key_id']),
  59. account_id=coze_config.get('account_id', None)
  60. )
  61. self.coze_client = CozeChat(
  62. base_url=chat_service.COZE_CN_BASE_URL,
  63. auth_app=coze_oauth_app
  64. )
  65. self.chat_service_type = chat_service_type
  66. # 定时任务调度器
  67. self.scheduler = None
  68. self.scheduler_mode = self.config.get('system', {}).get('scheduler_mode', 'local')
  69. self.scheduler_queue = None
  70. self.msg_scheduler_thread = None
  71. self.limit_initiative_conversation_rate = True
  72. self.running = False
  73. self.process_thread = None
  74. self._sigint_cnt = 0
  75. def setup_initiative_conversations(self, schedule_params: Optional[Dict] = None):
  76. if not schedule_params:
  77. schedule_params = {'hour': '8,16,20'}
  78. self.scheduler.add_job(
  79. self._check_initiative_conversations,
  80. apscheduler.triggers.cron.CronTrigger(**schedule_params)
  81. )
  82. def setup_scheduler(self):
  83. self.scheduler = BackgroundScheduler()
  84. if self.scheduler_mode == 'mq':
  85. logging.info("setup event message scheduler with MQ")
  86. mq_conf = self.config['mq']
  87. topic = mq_conf['scheduler_topic']
  88. self.scheduler_queue = AliyunRocketMQQueueBackend(
  89. mq_conf['endpoints'],
  90. mq_conf['instance_id'],
  91. topic,
  92. has_consumer=True, has_producer=True,
  93. group_id=mq_conf['scheduler_group'],
  94. topic_type='DELAY'
  95. )
  96. self.msg_scheduler_thread = threading.Thread(target=self.process_scheduler_events)
  97. self.msg_scheduler_thread.start()
  98. self.scheduler.start()
  99. def process_scheduler_events(self):
  100. while self.running:
  101. msg = self.scheduler_queue.consume()
  102. if msg:
  103. try:
  104. self.process_scheduler_event(msg)
  105. self.scheduler_queue.ack(msg)
  106. except Exception as e:
  107. logger.error("Error processing scheduler event: {}".format(e))
  108. time.sleep(1)
  109. logger.info("Scheduler event processing thread exit")
  110. def process_scheduler_event(self, msg: Message):
  111. if msg.type == MessageType.AGGREGATION_TRIGGER:
  112. # 延迟触发的消息,需放入接收队列以驱动Agent运转
  113. self.receive_queue.produce(msg)
  114. else:
  115. logger.warning(f"Unknown message type: {msg.type}")
  116. def _get_agent_instance(self, staff_id: str, user_id: str) -> DialogueManager:
  117. """获取Agent实例"""
  118. agent_key = 'agent_{}_{}'.format(staff_id, user_id)
  119. if agent_key not in self.agent_registry:
  120. self.agent_registry[agent_key] = DialogueManager(
  121. staff_id, user_id, self.user_manager, self.agent_state_cache)
  122. return self.agent_registry[agent_key]
  123. def process_messages(self):
  124. """持续处理接收队列消息"""
  125. while self.running:
  126. message = self.receive_queue.consume()
  127. if message:
  128. try:
  129. self.process_single_message(message)
  130. self.receive_queue.ack(message)
  131. except Exception as e:
  132. logger.error("Error processing message: {}".format(e))
  133. traceback.print_exc()
  134. time.sleep(1)
  135. logger.info("Message processing thread exit")
  136. def start(self, blocking=False):
  137. self.running = True
  138. self.process_thread = threading.Thread(target=service.process_messages)
  139. self.process_thread.start()
  140. self.setup_scheduler()
  141. # 只有企微场景需要主动发起
  142. if not self.config['debug_flags'].get('disable_active_conversation', False):
  143. schedule_param = self.config['agent_behavior'].get('active_conversation_schedule_param', None)
  144. self.setup_initiative_conversations(schedule_param)
  145. signal.signal(signal.SIGINT, self._handle_sigint)
  146. if blocking:
  147. self.process_thread.join()
  148. def shutdown(self, sync=True):
  149. if not self.running:
  150. raise Exception("Service is not running")
  151. self.running = False
  152. self.scheduler.shutdown()
  153. if sync:
  154. self.process_thread.join()
  155. self.receive_queue.shutdown()
  156. self.send_queue.shutdown()
  157. if self.msg_scheduler_thread:
  158. self.msg_scheduler_thread.join()
  159. self.scheduler_queue.shutdown()
  160. def _handle_sigint(self, signum, frame):
  161. self._sigint_cnt += 1
  162. if self._sigint_cnt == 1:
  163. logger.warning("Try to shutdown gracefully...")
  164. self.shutdown(sync=True)
  165. else:
  166. logger.warning("Forcing exit")
  167. sys.exit(0)
  168. def _update_user_profile(self, user_id, user_profile, recent_dialogue: List[Dict]):
  169. profile_to_update = self.user_profile_extractor.extract_profile_info(user_profile, recent_dialogue)
  170. if not profile_to_update:
  171. logger.debug("user_id: {}, no profile info extracted".format(user_id))
  172. return
  173. logger.warning("update user profile: {}".format(profile_to_update))
  174. if profile_to_update.get('interaction_frequency', None) == 'stopped':
  175. # 和企微日常push联动,减少对用户的干扰
  176. if self.user_relation_manager.stop_user_daily_push(user_id):
  177. logger.warning(f"user[{user_id}]: daily push set to be stopped")
  178. merged_profile = self.user_profile_extractor.merge_profile_info(user_profile, profile_to_update)
  179. self.user_manager.save_user_profile(user_id, merged_profile)
  180. return merged_profile
  181. def _schedule_aggregation_trigger(self, staff_id: str, user_id: str, delay_sec: int):
  182. logger.debug("user: {}, schedule trigger message after {} seconds".format(user_id, delay_sec))
  183. message_ts = int((time.time() + delay_sec) * 1000)
  184. msg = Message.build(MessageType.AGGREGATION_TRIGGER, MessageChannel.SYSTEM, user_id, staff_id, None, message_ts)
  185. # 系统消息使用特定的msgId,无实际意义
  186. msg.msgId = -MessageType.AGGREGATION_TRIGGER.value
  187. if self.scheduler_mode == 'mq':
  188. self.scheduler_queue.produce(msg)
  189. else:
  190. self.scheduler.add_job(lambda: self.receive_queue.produce(msg),
  191. 'date',
  192. run_date=datetime.now() + timedelta(seconds=delay_sec))
  193. def process_single_message(self, message: Message):
  194. user_id = message.sender
  195. staff_id = message.receiver
  196. # 获取用户信息和Agent实例
  197. user_profile = self.user_manager.get_user_profile(user_id)
  198. agent = self._get_agent_instance(staff_id, user_id)
  199. # 更新对话状态
  200. logger.debug("process message: {}".format(message))
  201. need_response, message_text = agent.update_state(message)
  202. logger.debug("user: {}, next state: {}".format(user_id, agent.current_state))
  203. # 根据状态路由消息
  204. try:
  205. if agent.is_in_human_intervention():
  206. self._route_to_human_intervention(user_id, message)
  207. elif agent.current_state == DialogueState.MESSAGE_AGGREGATING:
  208. if message.type != MessageType.AGGREGATION_TRIGGER:
  209. # 产生一个触发器,但是不能由触发器递归产生
  210. logger.debug("user: {}, waiting next message for aggregation".format(user_id))
  211. self._schedule_aggregation_trigger(staff_id, user_id, agent.message_aggregation_sec)
  212. elif need_response:
  213. # 先更新用户画像再处理回复
  214. self._update_user_profile(user_id, user_profile, agent.dialogue_history[-10:])
  215. resp = self._get_chat_response(user_id, agent, message_text)
  216. if resp:
  217. recent_dialogue = agent.dialogue_history[-10:]
  218. if len(recent_dialogue) < 2 or staff_id not in ('1688855931724582', '1688854492669990'):
  219. message_type = MessageType.TEXT
  220. else:
  221. message_type = self.response_type_detector.detect_type(
  222. recent_dialogue[:-1], recent_dialogue[-1], enable_random=True)
  223. self._send_response(staff_id, user_id, resp, message_type)
  224. else:
  225. logger.debug(f"staff[{staff_id}], user[{user_id}]: do not need response")
  226. # 当前消息处理成功,commit并持久化agent状态
  227. agent.persist_state()
  228. except Exception as e:
  229. agent.rollback_state()
  230. raise e
  231. def _send_response(self, staff_id, user_id, response, message_type: MessageType, skip_check=False):
  232. logger.warning(f"staff[{staff_id}] user[{user_id}]: response[{message_type}] {response}")
  233. current_ts = int(time.time() * 1000)
  234. user_tags = self.user_relation_manager.get_user_tags(user_id)
  235. white_list_tags = set(apollo_config.get_json_value("agent_response_whitelist_tags"))
  236. hit_white_list_tags = len(set(user_tags).intersection(white_list_tags)) > 0
  237. # FIXME(zhoutian)
  238. # 测试期间临时逻辑,只发送特定的账号或特定用户
  239. staff_white_lists = set(apollo_config.get_json_value("agent_response_whitelist_staffs"))
  240. if not (staff_id in staff_white_lists or hit_white_list_tags or skip_check):
  241. logger.warning(f"staff[{staff_id}] user[{user_id}]: skip reply")
  242. return None
  243. self.send_queue.produce(
  244. Message.build(message_type, MessageChannel.CORP_WECHAT,
  245. staff_id, user_id, response, current_ts)
  246. )
  247. def _route_to_human_intervention(self, user_id: str, origin_message: Message):
  248. """路由到人工干预"""
  249. self.human_queue.produce(Message.build(
  250. MessageType.TEXT,
  251. origin_message.channel,
  252. origin_message.sender,
  253. origin_message.receiver,
  254. "用户对话需人工介入,用户名:{}".format(user_id),
  255. int(time.time() * 1000)
  256. ))
  257. def _check_initiative_conversations(self):
  258. logger.info("start to check initiative conversations")
  259. if not DialogueManager.is_time_suitable_for_active_conversation():
  260. logger.info("time is not suitable for active conversation")
  261. return
  262. white_list_tags = set(apollo_config.get_json_value('agent_initiate_whitelist_tags'))
  263. first_initiate_tags = set(apollo_config.get_json_value('agent_first_initiate_whitelist_tags', []))
  264. # 合并白名单,减少配置成本
  265. white_list_tags.update(first_initiate_tags)
  266. voice_tags = set(apollo_config.get_json_value('agent_initiate_by_voice_tags'))
  267. """定时检查主动发起对话"""
  268. for staff_user in self.user_relation_manager.list_staff_users():
  269. staff_id = staff_user['staff_id']
  270. user_id = staff_user['user_id']
  271. agent = self._get_agent_instance(staff_id, user_id)
  272. should_initiate = agent.should_initiate_conversation()
  273. user_tags = self.user_relation_manager.get_user_tags(user_id)
  274. if configs.get_env() != 'dev' and not white_list_tags.intersection(user_tags):
  275. should_initiate = False
  276. if should_initiate:
  277. logger.warning(f"user[{user_id}], tags{user_tags}: initiate conversation")
  278. # FIXME:虽然需要主动唤起的用户同时发来消息的概率很低,但仍可能会有并发冲突 需要并入事件驱动框架
  279. agent.do_state_change(DialogueState.GREETING)
  280. try:
  281. if agent.previous_state == DialogueState.INITIALIZED or first_initiate_tags.intersection(user_tags):
  282. # 完全无交互历史的用户才使用此策略,但新用户接入即会产生“我已添加了你”的消息将Agent初始化
  283. # 因此存量用户无法使用该状态做实验
  284. # TODO:增加基于对话历史的判断、策略去重;如果对话间隔过长需要使用长期记忆检索;在无长期记忆时,可采用用户添加时间来判断
  285. resp = self._generate_active_greeting_message(agent, user_tags)
  286. else:
  287. resp = self._get_chat_response(user_id, agent, None)
  288. if resp:
  289. if set(user_tags).intersection(voice_tags):
  290. message_type = MessageType.VOICE
  291. else:
  292. message_type = MessageType.TEXT
  293. self._send_response(staff_id, user_id, resp, message_type, skip_check=True)
  294. if self.limit_initiative_conversation_rate:
  295. time.sleep(random.randint(10,20))
  296. agent.persist_state()
  297. except Exception as e:
  298. # FIXME:虽然需要主动唤起的用户同时发来消息的概率很低,但仍可能会有并发冲突
  299. agent.rollback_state()
  300. logger.error("Error in active greeting: {}".format(e))
  301. else:
  302. logger.debug(f"user[{user_id}], do not initiate conversation")
  303. def _generate_active_greeting_message(self, agent: DialogueManager, user_tags: List[str]=None):
  304. chat_config = agent.build_active_greeting_config(user_tags)
  305. chat_response = self._call_chat_api(chat_config, ChatServiceType.OPENAI_COMPATIBLE)
  306. chat_response = self.sanitize_response(chat_response)
  307. if response := agent.generate_response(chat_response):
  308. return response
  309. else:
  310. logger.warning(f"staff[{agent.staff_id}] user[{agent.user_id}]: no response generated")
  311. return None
  312. def _get_chat_response(self, user_id: str, agent: DialogueManager,
  313. user_message: Optional[str]):
  314. """处理LLM响应"""
  315. chat_config = agent.build_chat_configuration(user_message, self.chat_service_type)
  316. logger.debug(chat_config)
  317. chat_response = self._call_chat_api(chat_config, self.chat_service_type)
  318. chat_response = self.sanitize_response(chat_response)
  319. if response := agent.generate_response(chat_response):
  320. return response
  321. else:
  322. logger.warning(f"staff[{agent.staff_id}] user[{user_id}]: no response generated")
  323. return None
  324. def _call_chat_api(self, chat_config: Dict, chat_service_type: ChatServiceType) -> str:
  325. if configs.get().get('debug_flags', {}).get('disable_llm_api_call', False):
  326. return 'LLM模拟回复 {}'.format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
  327. if chat_service_type == ChatServiceType.OPENAI_COMPATIBLE:
  328. # 指定了LLM模型则优先使用指定模型
  329. if chat_config.get('model_name', None):
  330. llm_client = chat_service.OpenAICompatible.create_client(chat_config['model_name'])
  331. chat_completion = llm_client.chat.completions.create(
  332. messages=chat_config['messages'],
  333. model=chat_config['model_name'],
  334. )
  335. elif chat_config.get('use_multimodal_model', False):
  336. chat_completion = self.multimodal_model_client.chat.completions.create(
  337. messages=chat_config['messages'],
  338. model=self.multimodal_model_name,
  339. )
  340. else:
  341. chat_completion = self.text_model_client.chat.completions.create(
  342. messages=chat_config['messages'],
  343. model=self.text_model_name,
  344. )
  345. response = chat_completion.choices[0].message.content
  346. elif chat_service_type == ChatServiceType.COZE_CHAT:
  347. bot_user_id = 'qywx_{}'.format(chat_config['user_id'])
  348. response = self.coze_client.create(
  349. chat_config['bot_id'], bot_user_id, chat_config['messages'],
  350. chat_config['custom_variables']
  351. )
  352. else:
  353. raise Exception('Unsupported chat service type: {}'.format(self.chat_service_type))
  354. return response
  355. @staticmethod
  356. def sanitize_response(response: str):
  357. pattern = r'\[?\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\]?'
  358. response = re.sub(pattern, '', response)
  359. response = response.strip()
  360. return response
  361. if __name__ == "__main__":
  362. config = configs.get()
  363. logging_service.setup_root_logger()
  364. logger.warning("current env: {}".format(configs.get_env()))
  365. scheduler_logger = logging.getLogger('apscheduler')
  366. scheduler_logger.setLevel(logging.WARNING)
  367. use_aliyun_mq = config['debug_flags']['use_aliyun_mq']
  368. # 初始化不同队列的后端
  369. if use_aliyun_mq:
  370. receive_queue = AliyunRocketMQQueueBackend(
  371. config['mq']['endpoints'],
  372. config['mq']['instance_id'],
  373. config['mq']['receive_topic'],
  374. has_consumer=True, has_producer=True,
  375. group_id=config['mq']['receive_group'],
  376. topic_type='FIFO'
  377. )
  378. send_queue = AliyunRocketMQQueueBackend(
  379. config['mq']['endpoints'],
  380. config['mq']['instance_id'],
  381. config['mq']['send_topic'],
  382. has_consumer=False, has_producer=True,
  383. topic_type='FIFO'
  384. )
  385. else:
  386. receive_queue = MemoryQueueBackend()
  387. send_queue = MemoryQueueBackend()
  388. human_queue = MemoryQueueBackend()
  389. # 初始化用户管理服务
  390. # FIXME(zhoutian): 如果不使用MySQL,此数据库配置非必须
  391. user_db_config = config['storage']['user']
  392. staff_db_config = config['storage']['staff']
  393. wecom_db_config = config['storage']['user_relation']
  394. if config['debug_flags'].get('use_local_user_storage', False):
  395. user_manager = LocalUserManager()
  396. user_relation_manager = LocalUserRelationManager()
  397. else:
  398. user_manager = MySQLUserManager(user_db_config['mysql'], user_db_config['table'], staff_db_config['table'])
  399. user_relation_manager = MySQLUserRelationManager(
  400. user_db_config['mysql'], wecom_db_config['mysql'],
  401. config['storage']['staff']['table'],
  402. user_db_config['table'],
  403. wecom_db_config['table']['staff'],
  404. wecom_db_config['table']['relation'],
  405. wecom_db_config['table']['user']
  406. )
  407. # 创建Agent服务
  408. service = AgentService(
  409. receive_backend=receive_queue,
  410. send_backend=send_queue,
  411. human_backend=human_queue,
  412. user_manager=user_manager,
  413. user_relation_manager=user_relation_manager,
  414. chat_service_type=ChatServiceType.COZE_CHAT
  415. )
  416. if not config['debug_flags'].get('console_input', False):
  417. service.start(blocking=True)
  418. sys.exit(0)
  419. else:
  420. service.start()
  421. message_id = 0
  422. while service.running:
  423. print("Input next message: ")
  424. text = sys.stdin.readline().strip()
  425. if not text:
  426. continue
  427. message_id += 1
  428. sender = '7881301263964433'
  429. receiver = '1688854492669990'
  430. if text == MessageType.AGGREGATION_TRIGGER.name:
  431. message = Message.build(MessageType.AGGREGATION_TRIGGER, MessageChannel.CORP_WECHAT,
  432. sender, receiver, None, int(time.time() * 1000))
  433. else:
  434. message = Message.build(MessageType.TEXT, MessageChannel.CORP_WECHAT,
  435. sender,receiver, text, int(time.time() * 1000)
  436. )
  437. message.msgId = message_id
  438. receive_queue.produce(message)
  439. time.sleep(0.1)