agent_service.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. #! /usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # vim:fenc=utf-8
  4. import sys
  5. import time
  6. from typing import Dict, List, Tuple, Any, Optional
  7. import logging
  8. from datetime import datetime, timedelta
  9. import apscheduler.triggers.cron
  10. from apscheduler.schedulers.background import BackgroundScheduler
  11. import chat_service
  12. import global_flags
  13. from chat_service import CozeChat, ChatServiceType
  14. from dialogue_manager import DialogueManager, DialogueState
  15. from user_manager import UserManager, LocalUserManager
  16. from openai import OpenAI
  17. from message_queue_backend import MessageQueueBackend, MemoryQueueBackend
  18. from user_profile_extractor import UserProfileExtractor
  19. import threading
  20. from message import MessageType, Message, MessageChannel
  21. from logging_service import ColoredFormatter
  22. class AgentService:
  23. def __init__(
  24. self,
  25. receive_backend: MessageQueueBackend,
  26. send_backend: MessageQueueBackend,
  27. human_backend: MessageQueueBackend,
  28. user_manager: UserManager,
  29. chat_service_type: ChatServiceType = ChatServiceType.OPENAI_COMPATIBLE
  30. ):
  31. self.receive_queue = receive_backend
  32. self.send_queue = send_backend
  33. self.human_queue = human_backend
  34. # 核心服务模块
  35. self.user_manager = user_manager
  36. self.user_profile_extractor = UserProfileExtractor()
  37. self.agent_registry: Dict[str, DialogueManager] = {}
  38. self.llm_client = OpenAI(
  39. api_key=chat_service.VOLCENGINE_API_TOKEN,
  40. base_url=chat_service.VOLCENGINE_BASE_URL
  41. )
  42. # DeepSeek on Volces
  43. self.model_name = chat_service.VOLCENGINE_MODEL_DEEPSEEK_V3
  44. self.coze_client = CozeChat(
  45. token=chat_service.COZE_API_TOKEN,
  46. base_url=chat_service.COZE_CN_BASE_URL
  47. )
  48. self.chat_service_type = chat_service_type
  49. # 定时任务调度器
  50. self.scheduler = BackgroundScheduler()
  51. self.scheduler.start()
  52. def setup_initiative_conversations(self, schedule_params: Optional[Dict] = None):
  53. if not schedule_params:
  54. schedule_params = {'hour': '8,16,20'}
  55. self.scheduler.add_job(
  56. self._check_initiative_conversations,
  57. apscheduler.triggers.cron.CronTrigger(**schedule_params)
  58. )
  59. def _get_agent_instance(self, staff_id: str, user_id: str) -> DialogueManager:
  60. """获取Agent实例"""
  61. agent_key = 'agent_{}_{}'.format(staff_id, user_id)
  62. if agent_key not in self.agent_registry:
  63. self.agent_registry[agent_key] = DialogueManager(
  64. staff_id, user_id, self.user_manager)
  65. return self.agent_registry[agent_key]
  66. def process_messages(self):
  67. """持续处理接收队列消息"""
  68. while True:
  69. message = self.receive_queue.consume()
  70. if message:
  71. self.process_single_message(message)
  72. time.sleep(1) # 避免CPU空转
  73. def _update_user_profile(self, user_id, user_profile, message: str):
  74. profile_to_update = self.user_profile_extractor.extract_profile_info(user_profile, message)
  75. if not profile_to_update:
  76. logging.debug("user_id: {}, no profile info extracted".format(user_id))
  77. return
  78. logging.warning("update user profile: {}".format(profile_to_update))
  79. merged_profile = self.user_profile_extractor.merge_profile_info(user_profile, profile_to_update)
  80. self.user_manager.save_user_profile(user_id, merged_profile)
  81. return merged_profile
  82. def _schedule_aggregation_trigger(self, user_id: str, delay_sec: int):
  83. logging.debug("user: {}, schedule trigger message after {} seconds".format(user_id, delay_sec))
  84. message_ts = int((time.time() + delay_sec) * 1000)
  85. message = Message.build(MessageType.AGGREGATION_TRIGGER, MessageChannel.SYSTEM, None, user_id, None, message_ts)
  86. message.id = -MessageType.AGGREGATION_TRIGGER.code
  87. self.scheduler.add_job(lambda: self.receive_queue.produce(message),
  88. 'date',
  89. run_date=datetime.now() + timedelta(seconds=delay_sec))
  90. def process_single_message(self, message: Message):
  91. user_id = message.sender
  92. staff_id = message.receiver
  93. # 获取用户信息和Agent实例
  94. user_profile = self.user_manager.get_user_profile(user_id)
  95. agent = self._get_agent_instance(staff_id, user_id)
  96. # 更新对话状态
  97. logging.debug("process message: {}".format(message))
  98. dialogue_state, message_text = agent.update_state(message)
  99. logging.debug("user: {}, next state: {}".format(user_id, dialogue_state))
  100. # 根据状态路由消息
  101. if agent.is_in_human_intervention():
  102. self._route_to_human_intervention(user_id, message)
  103. elif dialogue_state == DialogueState.MESSAGE_AGGREGATING:
  104. if message.type != MessageType.AGGREGATION_TRIGGER:
  105. # 产生一个触发器,但是不能由触发器递归产生
  106. logging.debug("user: {}, waiting next message for aggregation".format(user_id))
  107. self._schedule_aggregation_trigger(user_id, agent.message_aggregation_sec)
  108. return
  109. else:
  110. # 先更新用户画像再处理回复
  111. self._update_user_profile(user_id, user_profile, message_text)
  112. self._get_chat_response(user_id, agent, message_text)
  113. def _route_to_human_intervention(self, user_id: str, origin_message: Message):
  114. """路由到人工干预"""
  115. self.human_queue.produce(Message.build(
  116. MessageType.TEXT,
  117. origin_message.channel,
  118. origin_message.sender,
  119. origin_message.receiver,
  120. "用户对话需人工介入,用户名:{}".format(user_id),
  121. int(time.time() * 1000)
  122. ))
  123. def _check_initiative_conversations(self):
  124. """定时检查主动发起对话"""
  125. for user_id in self.user_manager.list_all_users():
  126. agent = self._get_agent_instance('staff_id_0', user_id)
  127. should_initiate = agent.should_initiate_conversation()
  128. if should_initiate:
  129. logging.warning("user: {}, initiate conversation".format(user_id))
  130. self._get_chat_response(user_id, agent, None)
  131. else:
  132. logging.debug("user: {}, do not initiate conversation".format(user_id))
  133. def _get_chat_response(self, user_id: str, agent: DialogueManager,
  134. user_message: str):
  135. """处理LLM响应"""
  136. chat_config = agent.build_chat_configuration(user_message, self.chat_service_type)
  137. logging.debug(chat_config)
  138. chat_response = self._call_chat_api(chat_config)
  139. if response := agent.generate_response(chat_response):
  140. logging.warning("user: {}, response: {}".format(user_id, response))
  141. current_ts = int(time.time() * 1000)
  142. self.send_queue.produce(
  143. Message.build(MessageType.TEXT, MessageChannel.CORP_WECHAT,
  144. agent.staff_id, user_id, response, current_ts)
  145. )
  146. def _call_chat_api(self, chat_config: Dict) -> str:
  147. if global_flags.DISABLE_LLM_API_CALL:
  148. return 'LLM模拟回复'
  149. if self.chat_service_type == ChatServiceType.OPENAI_COMPATIBLE:
  150. chat_completion = self.llm_client.chat.completions.create(
  151. messages=chat_config['messages'],
  152. model=self.model_name,
  153. )
  154. response = chat_completion.choices[0].message.content
  155. elif self.chat_service_type == ChatServiceType.COZE_CHAT:
  156. bot_user_id = 'dev_user'
  157. response = self.coze_client.create(
  158. chat_config['bot_id'], bot_user_id, chat_config['messages'],
  159. chat_config['custom_variables']
  160. )
  161. else:
  162. raise Exception('Unsupported chat service type: {}'.format(self.chat_service_type))
  163. return response
  164. if __name__ == "__main__":
  165. logging.getLogger().setLevel(logging.DEBUG)
  166. console_handler = logging.StreamHandler()
  167. console_handler.setLevel(logging.DEBUG)
  168. formatter = ColoredFormatter(
  169. '%(asctime)s - %(funcName)s[%(lineno)d] - %(levelname)s - %(message)s'
  170. )
  171. console_handler.setFormatter(formatter)
  172. root_logger = logging.getLogger()
  173. root_logger.handlers.clear()
  174. root_logger.addHandler(console_handler)
  175. scheduler_logger = logging.getLogger('apscheduler')
  176. scheduler_logger.setLevel(logging.WARNING)
  177. # 初始化不同队列的后端
  178. receive_queue = MemoryQueueBackend()
  179. send_queue = MemoryQueueBackend()
  180. human_queue = MemoryQueueBackend()
  181. # 初始化用户管理服务
  182. user_manager = LocalUserManager()
  183. global_flags.DISABLE_LLM_API_CALL = True
  184. # 创建Agent服务
  185. service = AgentService(
  186. receive_backend=receive_queue,
  187. send_backend=send_queue,
  188. human_backend=human_queue,
  189. user_manager=user_manager,
  190. chat_service_type=ChatServiceType.COZE_CHAT
  191. )
  192. service.setup_initiative_conversations({'second': '5,35'})
  193. process_thread = threading.Thread(target=service.process_messages)
  194. process_thread.start()
  195. message_id = 0
  196. while True:
  197. print("Input next message: ")
  198. text = sys.stdin.readline().strip()
  199. if not text:
  200. continue
  201. message_id += 1
  202. message = Message.build(MessageType.TEXT, MessageChannel.CORP_WECHAT,
  203. 'staff_id_1','user_id_1', text, int(time.time() * 1000)
  204. )
  205. message.id = message_id
  206. receive_queue.produce(message)
  207. time.sleep(0.1)