dialogue_manager.py 22 KB


  1. #! /usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # vim:fenc=utf-8
  4. from enum import Enum, auto
  5. from typing import Dict, List, Optional, Tuple, Any
  6. from datetime import datetime
  7. import time
  8. from logging_service import logger
  9. import pymysql.cursors
  10. import configs
  11. import cozepy
  12. from database import MySQLManager
  13. from history_dialogue_service import HistoryDialogueService
  14. from chat_service import ChatServiceType
  15. from message import MessageType, Message
  16. from user_manager import UserManager
  17. from prompt_templates import *
  18. class DummyVectorMemoryManager:
  19. def __init__(self, user_id):
  20. pass
  21. def add_to_memory(self, conversation):
  22. pass
  23. def retrieve_relevant_memories(self, query, k=3):
  24. return []
  25. class DialogueState(int, Enum):
  26. INITIALIZED = 0
  27. GREETING = 1 # 问候状态
  28. CHITCHAT = 2 # 闲聊状态
  29. CLARIFICATION = 3 # 澄清状态
  30. FAREWELL = 4 # 告别状态
  31. HUMAN_INTERVENTION = 5 # 人工介入状态
  32. MESSAGE_AGGREGATING = 6 # 等待消息状态
  33. class TimeContext(Enum):
  34. EARLY_MORNING = "清晨" # 清晨 (5:00-7:59)
  35. MORNING = "上午" # 上午 (8:00-11:59)
  36. NOON = "中午" # 中午 (12:00-13:59)
  37. AFTERNOON = "下午" # 下午 (14:00-17:59)
  38. EVENING = "晚上" # 晚上 (18:00-21:59)
  39. NIGHT = "深夜" # 夜晚 (22:00-4:59)
  40. def __init__(self, description):
  41. self.description = description
  42. class DialogueStateCache:
  43. def __init__(self):
  44. config = configs.get()
  45. self.db = MySQLManager(config['storage']['agent_state']['mysql'])
  46. self.table = config['storage']['agent_state']['table']
  47. def get_state(self, staff_id: str, user_id: str) -> Tuple[DialogueState, DialogueState]:
  48. query = f"SELECT current_state, previous_state FROM {self.table} WHERE staff_id=%s AND user_id=%s"
  49. data = self.db.select(query, pymysql.cursors.DictCursor, (staff_id, user_id))
  50. if not data:
  51. logger.warning(f"staff[{staff_id}], user[{user_id}]: agent state not found")
  52. state = DialogueState.INITIALIZED
  53. previous_state = DialogueState.INITIALIZED
  54. self.set_state(staff_id, user_id, state, previous_state)
  55. else:
  56. state = DialogueState(data[0]['current_state'])
  57. previous_state = DialogueState(data[0]['previous_state'])
  58. return state, previous_state
  59. def set_state(self, staff_id: str, user_id: str, state: DialogueState, previous_state: DialogueState):
  60. query = f"INSERT INTO {self.table} (staff_id, user_id, current_state, previous_state)" \
  61. f" VALUES (%s, %s, %s, %s) " \
  62. f"ON DUPLICATE KEY UPDATE current_state=%s, previous_state=%s"
  63. rows = self.db.execute(query, (staff_id, user_id, state.value, previous_state.value, state.value, previous_state.value))
  64. logger.debug("staff[{}], user[{}]: set state: {}, previous state: {}, rows affected: {}"
  65. .format(staff_id, user_id, state, previous_state, rows))
  66. class DialogueManager:
  67. def __init__(self, staff_id: str, user_id: str, user_manager: UserManager, state_cache: DialogueStateCache):
  68. config = configs.get()
  69. self.staff_id = staff_id
  70. self.user_id = user_id
  71. self.user_manager = user_manager
  72. self.state_cache = state_cache
  73. self.current_state = DialogueState.GREETING
  74. self.previous_state = DialogueState.INITIALIZED
  75. # 目前实际仅用作调试,拼装prompt时使用history_dialogue_service获取
  76. self.dialogue_history = []
  77. self.user_profile = self.user_manager.get_user_profile(user_id)
  78. self.staff_profile = self.user_manager.get_staff_profile(staff_id)
  79. self.last_interaction_time = 0
  80. self.consecutive_clarifications = 0
  81. self.complex_request_counter = 0
  82. self.human_intervention_triggered = False
  83. self.vector_memory = DummyVectorMemoryManager(user_id)
  84. self.message_aggregation_sec = config.get('agent_behavior', {}).get('message_aggregation_sec', 5)
  85. self.unprocessed_messages = []
  86. self.history_dialogue_service = HistoryDialogueService(
  87. config['storage']['history_dialogue']['api_base_url']
  88. )
  89. self._recover_state()
  90. def _recover_state(self):
  91. self.current_state, self.previous_state = self.state_cache.get_state(self.staff_id, self.user_id)
  92. # 从数据库恢复对话状态
  93. last_message = self.history_dialogue_service.get_dialogue_history(self.staff_id, self.user_id)
  94. if last_message:
  95. self.last_interaction_time = last_message[-1]['timestamp']
  96. else:
  97. # 默认设置为24小时前
  98. self.last_interaction_time = int(time.time() * 1000) - 24 * 3600 * 1000
  99. time_for_read = datetime.fromtimestamp(self.last_interaction_time / 1000).strftime("%Y-%m-%d %H:%M:%S")
  100. logger.debug(f"staff[{self.staff_id}], user[{self.user_id}]: state: {self.current_state.name}, last_interaction: {time_for_read}")
  101. def persist_state(self):
  102. """持久化对话状态"""
  103. config = configs.get()
  104. if not config.get('debug_flags', {}).get('disable_state_persistence', False):
  105. return
  106. self.state_cache.set_state(self.staff_id, self.user_id, self.current_state, self.previous_state)
  107. @staticmethod
  108. def get_time_context(current_hour=None) -> TimeContext:
  109. """获取当前时间上下文"""
  110. if not current_hour:
  111. current_hour = datetime.now().hour
  112. if 5 <= current_hour < 8:
  113. return TimeContext.EARLY_MORNING
  114. elif 8 <= current_hour < 12:
  115. return TimeContext.MORNING
  116. elif 12 <= current_hour < 14:
  117. return TimeContext.NOON
  118. elif 14 <= current_hour < 18:
  119. return TimeContext.AFTERNOON
  120. elif 18 <= current_hour < 22:
  121. return TimeContext.EVENING
  122. else:
  123. return TimeContext.NIGHT
  124. def update_state(self, message: Message) -> Tuple[bool, Optional[str]]:
  125. """根据用户消息更新对话状态,并返回是否需要发起回复 及下一条需处理的用户消息"""
  126. message_text = message.content
  127. message_ts = message.sendTime
  128. # 如果当前已经是人工介入状态,保持该状态
  129. if self.current_state == DialogueState.HUMAN_INTERVENTION:
  130. # 记录对话历史,但不改变状态
  131. self.dialogue_history.append({
  132. "role": "user",
  133. "content": message_text,
  134. "timestamp": int(time.time() * 1000),
  135. "state": self.current_state.name
  136. })
  137. return False, message_text
  138. # 检查是否处于消息聚合状态
  139. if self.current_state == DialogueState.MESSAGE_AGGREGATING:
  140. # 收到的是特殊定时触发的空消息,且在聚合中,且已经超时,恢复之前状态,继续处理
  141. if message.type == MessageType.AGGREGATION_TRIGGER \
  142. and message_ts - self.last_interaction_time > self.message_aggregation_sec * 1000:
  143. logger.debug("user_id: {}, last interaction time: {}".format(
  144. self.user_id, datetime.fromtimestamp(self.last_interaction_time / 1000)))
  145. self.current_state = self.previous_state
  146. else:
  147. # 非空消息,更新最后交互时间,保持消息聚合状态
  148. if message_text:
  149. self.unprocessed_messages.append(message_text)
  150. self.last_interaction_time = message_ts
  151. return False, message_text
  152. else:
  153. if message.type == MessageType.AGGREGATION_TRIGGER:
  154. # 未在聚合状态中,收到的聚合触发消息为过时消息,不应当处理
  155. return False, None
  156. if message.type != MessageType.AGGREGATION_TRIGGER and self.message_aggregation_sec > 0:
  157. # 收到有内容的用户消息,切换到消息聚合状态
  158. self.previous_state = self.current_state
  159. self.current_state = DialogueState.MESSAGE_AGGREGATING
  160. self.unprocessed_messages.append(message_text)
  161. # 更新最后交互时间
  162. if message_text:
  163. self.last_interaction_time = message_ts
  164. self.persist_state()
  165. return False, message_text
  166. # 保存前一个状态
  167. self.previous_state = self.current_state
  168. # 检查是否长时间未交互(超过3小时)
  169. if self._get_hours_since_last_interaction() > 3:
  170. self.current_state = DialogueState.GREETING
  171. self.dialogue_history = [] # 重置对话历史
  172. self.consecutive_clarifications = 0 # 重置澄清计数
  173. self.complex_request_counter = 0 # 重置复杂请求计数
  174. # 获得未处理的聚合消息,并清空未处理队列
  175. if message_text:
  176. self.unprocessed_messages.append(message_text)
  177. if self.unprocessed_messages:
  178. message_text = '\n'.join(self.unprocessed_messages)
  179. self.unprocessed_messages.clear()
  180. # 根据消息内容和当前状态确定新状态
  181. new_state = self._determine_state_from_message(message_text)
  182. # 处理连续澄清的情况
  183. if new_state == DialogueState.CLARIFICATION:
  184. self.consecutive_clarifications += 1
  185. # FIXME(zhoutian): 规则过于简单
  186. if self.consecutive_clarifications >= 10000:
  187. new_state = DialogueState.HUMAN_INTERVENTION
  188. # self._trigger_human_intervention("连续多次澄清请求")
  189. else:
  190. self.consecutive_clarifications = 0
  191. # 更新状态并持久化
  192. self.current_state = new_state
  193. self.persist_state()
  194. # 更新最后交互时间
  195. if message_text:
  196. self.last_interaction_time = message_ts
  197. # 记录对话历史
  198. if message_text:
  199. self.dialogue_history.append({
  200. "role": "user",
  201. "content": message_text,
  202. "timestamp": message_ts,
  203. "state": self.current_state.name
  204. })
  205. return True, message_text
  206. def _determine_state_from_message(self, message_text: Optional[str]) -> DialogueState:
  207. """根据消息内容确定对话状态"""
  208. if not message_text:
  209. return self.current_state
  210. # 简单的规则-关键词匹配
  211. message_lower = message_text.lower()
  212. # 判断是否是复杂请求
  213. # FIXME(zhoutian): 规则过于简单
  214. # complex_request_keywords = ["帮我", "怎么办", "我需要", "麻烦你", "请帮助", "急", "紧急"]
  215. # if any(keyword in message_lower for keyword in complex_request_keywords):
  216. # self.complex_request_counter += 1
  217. #
  218. # # 如果检测到困难请求且计数达到阈值,触发人工介入
  219. # if self.complex_request_counter >= 1:
  220. # # self._trigger_human_intervention("检测到复杂请求")
  221. # return DialogueState.HUMAN_INTERVENTION
  222. # else:
  223. # # 如果不是复杂请求,重置计数器
  224. # self.complex_request_counter = 0
  225. # 问候检测
  226. greeting_keywords = ["你好", "早上好", "中午好", "晚上好", "嗨", "在吗"]
  227. if any(keyword in message_lower for keyword in greeting_keywords):
  228. return DialogueState.GREETING
  229. # 告别检测
  230. farewell_keywords = ["再见", "拜拜", "晚安", "明天见", "回头见"]
  231. if any(keyword in message_lower for keyword in farewell_keywords):
  232. return DialogueState.FAREWELL
  233. # 澄清请求
  234. clarification_keywords = ["没明白", "不明白", "没听懂", "不懂", "什么意思", "再说一遍"]
  235. if any(keyword in message_lower for keyword in clarification_keywords):
  236. return DialogueState.CLARIFICATION
  237. # 默认为闲聊状态
  238. return DialogueState.CHITCHAT
  239. def _trigger_human_intervention(self, reason: str) -> None:
  240. """触发人工介入"""
  241. if not self.human_intervention_triggered:
  242. self.human_intervention_triggered = True
  243. # 记录人工介入事件
  244. event = {
  245. "timestamp": int(time.time() * 1000),
  246. "reason": reason,
  247. "dialogue_context": self.history_dialogue_service.get_dialogue_history(self.staff_id, self.user_id, 60)
  248. }
  249. # 更新用户资料中的人工介入历史
  250. if "human_intervention_history" not in self.user_profile:
  251. self.user_profile["human_intervention_history"] = []
  252. self.user_profile["human_intervention_history"].append(event)
  253. self.user_manager.save_user_profile(self.user_id, self.user_profile)
  254. # 发送告警
  255. self._send_human_intervention_alert(reason)
  256. def _send_human_intervention_alert(self, reason: str) -> None:
  257. alert_message = f"""
  258. 人工介入告警
  259. 用户ID: {self.user_id}
  260. 用户昵称: {self.user_profile.get("nickname", "未知")}
  261. 时间: {int(time.time() * 1000)}
  262. 原因: {reason}
  263. 最近对话:
  264. """
  265. # 添加最近的对话记录
  266. recent_dialogues = self.history_dialogue_service.get_dialogue_history(self.staff_id, self.user_id, 10)
  267. for dialogue in recent_dialogues:
  268. alert_message += f"\n{dialogue['role']}: {dialogue['content']}"
  269. # TODO(zhoutian): 实现发送告警的具体逻辑
  270. logger.warning(alert_message)
  271. def resume_from_human_intervention(self) -> None:
  272. """从人工介入状态恢复"""
  273. if self.current_state == DialogueState.HUMAN_INTERVENTION:
  274. self.current_state = DialogueState.GREETING
  275. self.human_intervention_triggered = False
  276. self.consecutive_clarifications = 0
  277. self.complex_request_counter = 0
  278. # 记录恢复事件
  279. self.dialogue_history.append({
  280. "role": "system",
  281. "content": "已从人工介入状态恢复到自动对话",
  282. "timestamp": int(time.time() * 1000),
  283. "state": self.current_state.name
  284. })
  285. def generate_response(self, llm_response: str) -> Optional[str]:
  286. """根据当前状态处理LLM响应,如果处于人工介入状态则返回None"""
  287. # 如果处于人工介入状态,不生成回复
  288. if self.current_state == DialogueState.HUMAN_INTERVENTION:
  289. return None
  290. # 记录响应到对话历史
  291. current_ts = int(time.time() * 1000)
  292. self.dialogue_history.append({
  293. "role": "assistant",
  294. "content": llm_response,
  295. "timestamp": current_ts,
  296. "state": self.current_state.name
  297. })
  298. self.last_interaction_time = current_ts
  299. return llm_response
  300. def _get_hours_since_last_interaction(self, precision: int = -1):
  301. time_diff = (time.time() * 1000) - self.last_interaction_time
  302. hours_passed = time_diff / 1000 / 3600
  303. if precision >= 0:
  304. return round(hours_passed, precision)
  305. return hours_passed
  306. def should_initiate_conversation(self) -> bool:
  307. """判断是否应该主动发起对话"""
  308. # 如果处于人工介入状态,不应主动发起对话
  309. if self.current_state == DialogueState.HUMAN_INTERVENTION:
  310. return False
  311. hours_passed = self._get_hours_since_last_interaction()
  312. # 获取当前时间上下文
  313. time_context = self.get_time_context()
  314. # 根据用户交互频率偏好设置不同的阈值
  315. interaction_frequency = self.user_profile.get("interaction_frequency", "medium")
  316. # 设置不同偏好的交互时间阈值(小时)
  317. thresholds = {
  318. "low": 24, # 低频率:一天一次
  319. "medium": 12, # 中频率:半天一次
  320. "high": 6 # 高频率:大约6小时一次
  321. }
  322. threshold = thresholds.get(interaction_frequency, 12)
  323. if hours_passed < threshold:
  324. return False
  325. # 根据时间上下文决定主动交互的状态
  326. if time_context in [TimeContext.MORNING,
  327. TimeContext.NOON, TimeContext.AFTERNOON,
  328. TimeContext.EVENING]:
  329. self.previous_state = self.current_state
  330. self.current_state = DialogueState.GREETING
  331. self.persist_state()
  332. return True
  333. return False
  334. def is_in_human_intervention(self) -> bool:
  335. """检查是否处于人工介入状态"""
  336. return self.current_state == DialogueState.HUMAN_INTERVENTION
  337. def get_prompt_context(self, user_message) -> Dict:
  338. # 获取当前时间上下文
  339. time_context = self.get_time_context()
  340. # 刷新用户画像
  341. self.user_profile = self.user_manager.get_user_profile(self.user_id)
  342. # 刷新员工画像(不一定需要)
  343. self.staff_profile = self.user_manager.get_staff_profile(self.staff_id)
  344. context = {
  345. "user_profile": self.user_profile,
  346. "current_state": self.current_state.name,
  347. "previous_state": self.previous_state.name,
  348. "current_time_period": time_context.description,
  349. "current_hour": datetime.now().hour,
  350. # "dialogue_history": self.dialogue_history[-10:],
  351. "last_interaction_interval": self._get_hours_since_last_interaction(2),
  352. "if_first_interaction": True if self.previous_state == DialogueState.INITIALIZED else False,
  353. "if_active_greeting": False if user_message else True,
  354. **self.user_profile,
  355. **self.staff_profile
  356. }
  357. # 获取长期记忆
  358. relevant_memories = self.vector_memory.retrieve_relevant_memories(user_message)
  359. context["long_term_memory"] = {
  360. "relevant_conversations": relevant_memories
  361. }
  362. return context
  363. def _select_prompt(self, state):
  364. state_to_prompt_map = {
  365. DialogueState.GREETING: GENERAL_GREETING_PROMPT,
  366. DialogueState.CHITCHAT: GENERAL_GREETING_PROMPT,
  367. DialogueState.FAREWELL: GENERAL_GREETING_PROMPT
  368. }
  369. return state_to_prompt_map[state]
  370. def _select_coze_bot(self, state):
  371. state_to_bot_map = {
  372. DialogueState.GREETING: '7486112546798780425',
  373. DialogueState.CHITCHAT: '7491300566573301770',
  374. DialogueState.FAREWELL: '7491300566573301770'
  375. }
  376. return state_to_bot_map[state]
  377. def _create_system_message(self, prompt_context):
  378. prompt_template = self._select_prompt(self.current_state)
  379. prompt = prompt_template.format(**prompt_context)
  380. return {'role': 'system', 'content': prompt}
  381. def build_chat_configuration(
  382. self,
  383. user_message: Optional[str] = None,
  384. chat_service_type: ChatServiceType = ChatServiceType.OPENAI_COMPATIBLE,
  385. overwrite_context: Optional[Dict] = None
  386. ) -> Dict:
  387. """
  388. 参数:
  389. user_message: 当前用户消息,如果是主动交互则为None
  390. 返回:
  391. 消息列表
  392. """
  393. dialogue_history = self.history_dialogue_service.get_dialogue_history(self.staff_id, self.user_id)
  394. logger.debug("staff[{}], user[{}], dialogue_history: {}".format(
  395. self.staff_id, self.user_id, dialogue_history
  396. ))
  397. messages = []
  398. config = {}
  399. prompt_context = self.get_prompt_context(user_message)
  400. if overwrite_context:
  401. prompt_context.update(overwrite_context)
  402. if chat_service_type == ChatServiceType.OPENAI_COMPATIBLE:
  403. system_message = self._create_system_message(prompt_context)
  404. messages.append(system_message)
  405. for entry in dialogue_history:
  406. role = entry['role']
  407. fmt_time = self.format_timestamp(entry['timestamp'])
  408. messages.append({
  409. "role": role,
  410. "content": '[{}] {}'.format(fmt_time, entry["content"])
  411. })
  412. # 添加一条前缀用于 约束时间场景
  413. msg_prefix = '[{}]'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
  414. messages.append({'role': 'assistant', 'content': msg_prefix})
  415. elif chat_service_type == ChatServiceType.COZE_CHAT:
  416. dialogue_history = dialogue_history[-95:] # Coze最多支持100条,还需要附加系统消息
  417. for entry in dialogue_history:
  418. if not entry['content']:
  419. logger.warning("staff[{}], user[{}], role[{}]: empty content in dialogue history".format(
  420. self.staff_id, self.user_id, entry['role']
  421. ))
  422. continue
  423. role = entry['role']
  424. fmt_time = self.format_timestamp(entry['timestamp'])
  425. content = '[{}] {}'.format(fmt_time, entry["content"])
  426. if role == 'user':
  427. messages.append(cozepy.Message.build_user_question_text(content))
  428. elif role == 'assistant':
  429. messages.append(cozepy.Message.build_assistant_answer(content))
  430. custom_variables = {}
  431. for k, v in prompt_context.items():
  432. custom_variables[k] = str(v)
  433. custom_variables.pop('user_profile', None)
  434. config['custom_variables'] = custom_variables
  435. config['bot_id'] = self._select_coze_bot(self.current_state)
  436. msg_prefix = '[{}]'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
  437. messages.append(cozepy.Message.build_assistant_answer(msg_prefix))
  438. #FIXME(zhoutian): 这种方法并不可靠,需要结合状态来判断
  439. if self.current_state == DialogueState.GREETING and not messages:
  440. # messages.append(cozepy.Message.build_user_question_text(f'{msg_prefix} 请开始对话'))
  441. pass
  442. #FIXME(zhoutian): 临时报警
  443. if user_message and not messages:
  444. logger.error(f"staff[{self.staff_id}], user[{self.user_id}]: inconsistency in messages")
  445. config['messages'] = messages
  446. return config
  447. @staticmethod
  448. def format_timestamp(timestamp_ms):
  449. return datetime.fromtimestamp(timestamp_ms / 1000).strftime("%Y-%m-%d %H:%M:%S")
  450. if __name__ == '__main__':
  451. state_cache = DialogueStateCache()
  452. state_cache.set_state('1688854492669990', '7881302581935903', DialogueState.CHITCHAT, DialogueState.GREETING)