dialogue_manager.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651
  1. #! /usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # vim:fenc=utf-8
  4. from enum import Enum, auto
  5. from typing import Dict, List, Optional, Tuple, Any
  6. from datetime import datetime
  7. import time
  8. from logging_service import logger
  9. import pymysql.cursors
  10. import configs
  11. import cozepy
  12. from database import MySQLManager
  13. from history_dialogue_service import HistoryDialogueService
  14. from chat_service import ChatServiceType
  15. from message import MessageType, Message
  16. from user_manager import UserManager
  17. from prompt_templates import *
  18. class DummyVectorMemoryManager:
  19. def __init__(self, user_id):
  20. pass
  21. def add_to_memory(self, conversation):
  22. pass
  23. def retrieve_relevant_memories(self, query, k=3):
  24. return []
  25. class DialogueState(int, Enum):
  26. INITIALIZED = 0
  27. GREETING = 1 # 问候状态
  28. CHITCHAT = 2 # 闲聊状态
  29. CLARIFICATION = 3 # 澄清状态
  30. FAREWELL = 4 # 告别状态
  31. HUMAN_INTERVENTION = 5 # 人工介入状态
  32. MESSAGE_AGGREGATING = 6 # 等待消息状态
  33. class TimeContext(Enum):
  34. EARLY_MORNING = "清晨" # 清晨 (5:00-7:59)
  35. MORNING = "上午" # 上午 (8:00-11:59)
  36. NOON = "中午" # 中午 (12:00-13:59)
  37. AFTERNOON = "下午" # 下午 (14:00-17:59)
  38. EVENING = "晚上" # 晚上 (18:00-21:59)
  39. NIGHT = "深夜" # 夜晚 (22:00-4:59)
  40. def __init__(self, description):
  41. self.description = description
  42. class DialogueStateCache:
  43. def __init__(self):
  44. self.config = configs.get()
  45. self.db = MySQLManager(self.config['storage']['agent_state']['mysql'])
  46. self.table = self.config['storage']['agent_state']['table']
  47. def get_state(self, staff_id: str, user_id: str) -> Tuple[DialogueState, DialogueState]:
  48. query = f"SELECT current_state, previous_state FROM {self.table} WHERE staff_id=%s AND user_id=%s"
  49. data = self.db.select(query, pymysql.cursors.DictCursor, (staff_id, user_id))
  50. if not data:
  51. logger.warning(f"staff[{staff_id}], user[{user_id}]: agent state not found")
  52. state = DialogueState.INITIALIZED
  53. previous_state = DialogueState.INITIALIZED
  54. self.set_state(staff_id, user_id, state, previous_state)
  55. else:
  56. state = DialogueState(data[0]['current_state'])
  57. previous_state = DialogueState(data[0]['previous_state'])
  58. return state, previous_state
  59. def set_state(self, staff_id: str, user_id: str, state: DialogueState, previous_state: DialogueState):
  60. if self.config.get('debug_flags', {}).get('disable_database_write', False):
  61. return
  62. query = f"INSERT INTO {self.table} (staff_id, user_id, current_state, previous_state)" \
  63. f" VALUES (%s, %s, %s, %s) " \
  64. f"ON DUPLICATE KEY UPDATE current_state=%s, previous_state=%s"
  65. rows = self.db.execute(query, (staff_id, user_id, state.value, previous_state.value, state.value, previous_state.value))
  66. logger.debug("staff[{}], user[{}]: set state: {}, previous state: {}, rows affected: {}"
  67. .format(staff_id, user_id, state, previous_state, rows))
  68. class DialogueManager:
  69. def __init__(self, staff_id: str, user_id: str, user_manager: UserManager, state_cache: DialogueStateCache):
  70. config = configs.get()
  71. self.staff_id = staff_id
  72. self.user_id = user_id
  73. self.user_manager = user_manager
  74. self.state_cache = state_cache
  75. self.current_state = DialogueState.GREETING
  76. self.previous_state = DialogueState.INITIALIZED
  77. # 用于消息处理失败时回滚
  78. self.state_backup = (DialogueState.INITIALIZED, DialogueState.INITIALIZED)
  79. # 目前实际仅用作调试,拼装prompt时使用history_dialogue_service获取
  80. self.dialogue_history = []
  81. self.user_profile = self.user_manager.get_user_profile(user_id)
  82. self.staff_profile = self.user_manager.get_staff_profile(staff_id)
  83. # FIXME(zhoutian): last_interaction_time也需要回滚
  84. self.last_interaction_time = 0
  85. self.consecutive_clarifications = 0
  86. self.complex_request_counter = 0
  87. self.human_intervention_triggered = False
  88. self.vector_memory = DummyVectorMemoryManager(user_id)
  89. self.message_aggregation_sec = config.get('agent_behavior', {}).get('message_aggregation_sec', 5)
  90. self.unprocessed_messages = []
  91. self.history_dialogue_service = HistoryDialogueService(
  92. config['storage']['history_dialogue']['api_base_url']
  93. )
  94. self._recover_state()
  95. def _recover_state(self):
  96. self.current_state, self.previous_state = self.state_cache.get_state(self.staff_id, self.user_id)
  97. # 从数据库恢复对话状态
  98. self.dialogue_history = self.history_dialogue_service.get_dialogue_history(self.staff_id, self.user_id)
  99. if self.dialogue_history:
  100. self.last_interaction_time = self.dialogue_history[-1]['timestamp']
  101. else:
  102. # 默认设置为24小时前
  103. self.last_interaction_time = int(time.time() * 1000) - 24 * 3600 * 1000
  104. time_for_read = datetime.fromtimestamp(self.last_interaction_time / 1000).strftime("%Y-%m-%d %H:%M:%S")
  105. logger.debug(f"staff[{self.staff_id}], user[{self.user_id}]: state: {self.current_state.name}, last_interaction: {time_for_read}")
  106. def persist_state(self):
  107. """持久化对话状态,只有当前状态处理成功后才应该做持久化"""
  108. config = configs.get()
  109. if config.get('debug_flags', {}).get('disable_database_write', False):
  110. return
  111. self.state_cache.set_state(self.staff_id, self.user_id, self.current_state, self.previous_state)
  112. def rollback_state(self):
  113. logger.debug("staff[{}], user[{}]: rollback state: {}, previous state: {}".format(
  114. self.staff_id, self.user_id, self.state_backup, self.current_state
  115. ))
  116. self.current_state, self.previous_state = self.state_backup
  117. @staticmethod
  118. def get_time_context(current_hour=None) -> TimeContext:
  119. """获取当前时间上下文"""
  120. if not current_hour:
  121. current_hour = datetime.now().hour
  122. if 5 <= current_hour < 8:
  123. return TimeContext.EARLY_MORNING
  124. elif 8 <= current_hour < 12:
  125. return TimeContext.MORNING
  126. elif 12 <= current_hour < 14:
  127. return TimeContext.NOON
  128. elif 14 <= current_hour < 18:
  129. return TimeContext.AFTERNOON
  130. elif 18 <= current_hour < 22:
  131. return TimeContext.EVENING
  132. else:
  133. return TimeContext.NIGHT
  134. def do_state_change(self, state: DialogueState):
  135. self.state_backup = (self.current_state, self.previous_state)
  136. if self.current_state == DialogueState.MESSAGE_AGGREGATING:
  137. # MESSAGE_AGGREGATING不能成为previous_state,仅使用state_backup做回退
  138. self.current_state = state
  139. else:
  140. self.previous_state = self.current_state
  141. self.current_state = state
  142. def update_state(self, message: Message) -> Tuple[bool, Optional[str]]:
  143. """根据用户消息更新对话状态,并返回是否需要发起回复 及下一条需处理的用户消息"""
  144. message_text = message.content
  145. message_ts = message.sendTime
  146. # 如果当前已经是人工介入状态,保持该状态
  147. if self.current_state == DialogueState.HUMAN_INTERVENTION:
  148. # 记录对话历史,但不改变状态
  149. self.dialogue_history.append({
  150. "role": "user",
  151. "content": message_text,
  152. "timestamp": int(time.time() * 1000),
  153. "state": self.current_state.name
  154. })
  155. return False, message_text
  156. # 检查是否处于消息聚合状态
  157. if self.current_state == DialogueState.MESSAGE_AGGREGATING:
  158. # 收到的是特殊定时触发的空消息,且在聚合中,且已经超时,继续处理
  159. if message.type == MessageType.AGGREGATION_TRIGGER \
  160. and message_ts - self.last_interaction_time > self.message_aggregation_sec * 1000:
  161. logger.debug("user_id: {}, last interaction time: {}".format(
  162. self.user_id, datetime.fromtimestamp(self.last_interaction_time / 1000)))
  163. else:
  164. # 非空消息,更新最后交互时间,保持消息聚合状态
  165. if message_text:
  166. self.unprocessed_messages.append(message_text)
  167. self.last_interaction_time = message_ts
  168. return False, message_text
  169. else:
  170. if message.type == MessageType.AGGREGATION_TRIGGER:
  171. # 未在聚合状态中,收到的聚合触发消息为过时消息,不应当处理
  172. logger.warning(f"staff[{self.staff_id}], user[{self.user_id}]: received {message.type} in state {self.current_state}")
  173. return False, None
  174. if message.type != MessageType.AGGREGATION_TRIGGER and self.message_aggregation_sec > 0:
  175. # 收到有内容的用户消息,切换到消息聚合状态
  176. self.do_state_change(DialogueState.MESSAGE_AGGREGATING)
  177. self.unprocessed_messages.append(message_text)
  178. # 更新最后交互时间
  179. if message_text:
  180. self.last_interaction_time = message_ts
  181. return False, message_text
  182. # 获得未处理的聚合消息,并清空未处理队列
  183. if message_text:
  184. self.unprocessed_messages.append(message_text)
  185. if self.unprocessed_messages:
  186. message_text = '\n'.join(self.unprocessed_messages)
  187. self.unprocessed_messages.clear()
  188. # 根据消息内容和当前状态确定新状态
  189. new_state = self._determine_state_from_message(message_text)
  190. # 处理连续澄清的情况
  191. if new_state == DialogueState.CLARIFICATION:
  192. self.consecutive_clarifications += 1
  193. # FIXME(zhoutian): 规则过于简单
  194. if self.consecutive_clarifications >= 10000:
  195. new_state = DialogueState.HUMAN_INTERVENTION
  196. # self._trigger_human_intervention("连续多次澄清请求")
  197. else:
  198. self.consecutive_clarifications = 0
  199. # 更新状态
  200. self.do_state_change(new_state)
  201. if message_text:
  202. self.last_interaction_time = message_ts
  203. self.dialogue_history.append({
  204. "role": "user",
  205. "content": message_text,
  206. "timestamp": message_ts,
  207. "state": self.current_state.name
  208. })
  209. return True, message_text
  210. def _determine_state_from_message(self, message_text: Optional[str]) -> DialogueState:
  211. """根据消息内容确定对话状态"""
  212. if not message_text:
  213. return self.current_state
  214. # 简单的规则-关键词匹配
  215. message_lower = message_text.lower()
  216. # 判断是否是复杂请求
  217. # FIXME(zhoutian): 规则过于简单
  218. # complex_request_keywords = ["帮我", "怎么办", "我需要", "麻烦你", "请帮助", "急", "紧急"]
  219. # if any(keyword in message_lower for keyword in complex_request_keywords):
  220. # self.complex_request_counter += 1
  221. #
  222. # # 如果检测到困难请求且计数达到阈值,触发人工介入
  223. # if self.complex_request_counter >= 1:
  224. # # self._trigger_human_intervention("检测到复杂请求")
  225. # return DialogueState.HUMAN_INTERVENTION
  226. # else:
  227. # # 如果不是复杂请求,重置计数器
  228. # self.complex_request_counter = 0
  229. # 问候检测
  230. greeting_keywords = ["你好", "早上好", "中午好", "晚上好", "嗨", "在吗"]
  231. if any(keyword in message_lower for keyword in greeting_keywords):
  232. return DialogueState.GREETING
  233. # 告别检测
  234. farewell_keywords = ["再见", "拜拜", "晚安", "明天见", "回头见"]
  235. if any(keyword in message_lower for keyword in farewell_keywords):
  236. return DialogueState.FAREWELL
  237. # 澄清请求
  238. # clarification_keywords = ["没明白", "不明白", "没听懂", "不懂", "什么意思", "再说一遍"]
  239. # if any(keyword in message_lower for keyword in clarification_keywords):
  240. # return DialogueState.CLARIFICATION
  241. # 默认为闲聊状态
  242. return DialogueState.CHITCHAT
  243. def _trigger_human_intervention(self, reason: str) -> None:
  244. """触发人工介入"""
  245. if not self.human_intervention_triggered:
  246. self.human_intervention_triggered = True
  247. # 记录人工介入事件
  248. event = {
  249. "timestamp": int(time.time() * 1000),
  250. "reason": reason,
  251. "dialogue_context": self.dialogue_history[-10:]
  252. }
  253. # 更新用户资料中的人工介入历史
  254. if "human_intervention_history" not in self.user_profile:
  255. self.user_profile["human_intervention_history"] = []
  256. self.user_profile["human_intervention_history"].append(event)
  257. self.user_manager.save_user_profile(self.user_id, self.user_profile)
  258. # 发送告警
  259. self._send_human_intervention_alert(reason)
  260. def _send_human_intervention_alert(self, reason: str) -> None:
  261. alert_message = f"""
  262. 人工介入告警
  263. 用户ID: {self.user_id}
  264. 用户昵称: {self.user_profile.get("nickname", "未知")}
  265. 时间: {int(time.time() * 1000)}
  266. 原因: {reason}
  267. 最近对话:
  268. """
  269. # 添加最近的对话记录
  270. recent_dialogues = self.dialogue_history[-10:]
  271. for dialogue in recent_dialogues:
  272. alert_message += f"\n{dialogue['role']}: {dialogue['content']}"
  273. # TODO(zhoutian): 实现发送告警的具体逻辑
  274. logger.warning(alert_message)
  275. def resume_from_human_intervention(self) -> None:
  276. """从人工介入状态恢复"""
  277. if self.current_state == DialogueState.HUMAN_INTERVENTION:
  278. self.current_state = DialogueState.GREETING
  279. self.human_intervention_triggered = False
  280. self.consecutive_clarifications = 0
  281. self.complex_request_counter = 0
  282. # 记录恢复事件
  283. self.dialogue_history.append({
  284. "role": "system",
  285. "content": "已从人工介入状态恢复到自动对话",
  286. "timestamp": int(time.time() * 1000),
  287. "state": self.current_state.name
  288. })
  289. def generate_response(self, llm_response: str) -> Optional[str]:
  290. """根据当前状态处理LLM响应,如果处于人工介入状态则返回None"""
  291. # 如果处于人工介入状态,不生成回复
  292. if self.current_state == DialogueState.HUMAN_INTERVENTION:
  293. return None
  294. # 记录响应到对话历史
  295. current_ts = int(time.time() * 1000)
  296. self.dialogue_history.append({
  297. "role": "assistant",
  298. "content": llm_response,
  299. "timestamp": current_ts,
  300. "state": self.current_state.name
  301. })
  302. self.last_interaction_time = current_ts
  303. return llm_response
  304. def _get_hours_since_last_interaction(self, precision: int = -1):
  305. time_diff = (time.time() * 1000) - self.last_interaction_time
  306. hours_passed = time_diff / 1000 / 3600
  307. if precision >= 0:
  308. return round(hours_passed, precision)
  309. return hours_passed
  310. def should_initiate_conversation(self) -> bool:
  311. """判断是否应该主动发起对话"""
  312. # 如果处于人工介入状态,不应主动发起对话
  313. if self.current_state == DialogueState.HUMAN_INTERVENTION:
  314. return False
  315. hours_passed = self._get_hours_since_last_interaction()
  316. # 获取当前时间上下文
  317. time_context = self.get_time_context()
  318. # 根据用户交互频率偏好设置不同的阈值
  319. interaction_frequency = self.user_profile.get("interaction_frequency", "medium")
  320. if interaction_frequency == 'stopped':
  321. return False
  322. # 设置不同偏好的交互时间阈值(小时)
  323. thresholds = {
  324. "low": 24, # 低频率:一天一次
  325. "medium": 12, # 中频率:半天一次
  326. "high": 6 # 高频率:大约6小时一次
  327. }
  328. threshold = thresholds.get(interaction_frequency, 12)
  329. if hours_passed < threshold:
  330. return False
  331. # 根据时间上下文决定主动交互的状态
  332. if time_context in [TimeContext.MORNING,
  333. TimeContext.NOON, TimeContext.AFTERNOON]:
  334. self.previous_state = self.current_state
  335. self.current_state = DialogueState.GREETING
  336. return True
  337. return False
  338. def is_in_human_intervention(self) -> bool:
  339. """检查是否处于人工介入状态"""
  340. return self.current_state == DialogueState.HUMAN_INTERVENTION
  341. def get_prompt_context(self, user_message) -> Dict:
  342. # 获取当前时间上下文
  343. time_context = self.get_time_context()
  344. # 刷新用户画像
  345. self.user_profile = self.user_manager.get_user_profile(self.user_id)
  346. # 刷新员工画像(不一定需要)
  347. self.staff_profile = self.user_manager.get_staff_profile(self.staff_id)
  348. context = {
  349. "user_profile": self.user_profile,
  350. "current_state": self.current_state.name,
  351. "previous_state": self.previous_state.name,
  352. "current_time_period": time_context.description,
  353. "current_hour": datetime.now().hour,
  354. "last_interaction_interval": self._get_hours_since_last_interaction(2),
  355. "if_first_interaction": True if self.previous_state == DialogueState.INITIALIZED else False,
  356. "if_active_greeting": False if user_message else True,
  357. **self.user_profile,
  358. **self.staff_profile
  359. }
  360. # 获取长期记忆
  361. relevant_memories = self.vector_memory.retrieve_relevant_memories(user_message)
  362. context["long_term_memory"] = {
  363. "relevant_conversations": relevant_memories
  364. }
  365. return context
  366. @staticmethod
  367. def _select_prompt(state):
  368. state_to_prompt_map = {
  369. DialogueState.GREETING: GENERAL_GREETING_PROMPT,
  370. DialogueState.CHITCHAT: CHITCHAT_PROMPT_COZE,
  371. DialogueState.FAREWELL: GENERAL_GREETING_PROMPT
  372. }
  373. return state_to_prompt_map[state]
  374. @staticmethod
  375. def _select_coze_bot(state, dialogue: List[Dict], multimodal=False):
  376. state_to_bot_map = {
  377. DialogueState.GREETING: '7486112546798780425',
  378. DialogueState.CHITCHAT: '7491300566573301770',
  379. DialogueState.FAREWELL: '7491300566573301770',
  380. }
  381. if multimodal:
  382. state_to_bot_map = {
  383. DialogueState.GREETING: '7496772218198900770',
  384. DialogueState.CHITCHAT: '7495692989504438308',
  385. DialogueState.FAREWELL: '7491300566573301770',
  386. }
  387. return state_to_bot_map[state]
  388. @staticmethod
  389. def need_multimodal_model(dialogue: List[Dict], max_message_to_use: int = 10):
  390. # 当前仅为简单实现
  391. recent_messages = dialogue[-max_message_to_use:]
  392. ret = False
  393. for entry in recent_messages:
  394. if entry.get('type') in (MessageType.IMAGE_GW, MessageType.IMAGE_QW, MessageType.GIF):
  395. ret = True
  396. break
  397. return ret
  398. def _create_system_message(self, prompt_context):
  399. prompt_template = self._select_prompt(self.current_state)
  400. prompt = prompt_template.format(**prompt_context)
  401. return {'role': 'system', 'content': prompt}
  402. @staticmethod
  403. def compose_chat_messages_openai_compatible(dialogue_history, current_time, multimodal=False):
  404. messages = []
  405. for entry in dialogue_history:
  406. role = entry['role']
  407. msg_type = entry.get('type', MessageType.TEXT)
  408. fmt_time = DialogueManager.format_timestamp(entry['timestamp'])
  409. if msg_type in (MessageType.IMAGE_GW, MessageType.IMAGE_QW, MessageType.GIF):
  410. if multimodal:
  411. messages.append({
  412. "role": role,
  413. "content": [
  414. {"type": "image_url", "image_url": {"url": entry["content"]}}
  415. ]
  416. })
  417. else:
  418. logger.warning("Image in non-multimodal mode")
  419. messages.append({
  420. "role": role,
  421. "content": "[{}] {}".format(fmt_time, '[图片]')
  422. })
  423. else:
  424. messages.append({
  425. "role": role,
  426. "content": '[{}] {}'.format(fmt_time, entry["content"])
  427. })
  428. # 添加一条前缀用于 约束时间场景
  429. msg_prefix = '[{}]'.format(current_time)
  430. messages.append({'role': 'assistant', 'content': msg_prefix})
  431. return messages
  432. @staticmethod
  433. def compose_chat_messages_coze(dialogue_history, current_time, staff_id, user_id):
  434. messages = []
  435. # 如果system后的第1条消息不为user,需要在最开始补一条user消息,否则会吞assistant消息
  436. if len(dialogue_history) > 0 and dialogue_history[0]['role'] != 'user':
  437. fmt_time = DialogueManager.format_timestamp(dialogue_history[0]['timestamp'])
  438. messages.append(cozepy.Message.build_user_question_text(f'[{fmt_time}] '))
  439. # coze最后一条消息必须为user,且可能吞掉连续的user消息,故强制增加一条空消息(可参与合并)
  440. dialogue_history.append({
  441. 'role': 'user',
  442. 'content': ' ',
  443. 'timestamp': int(datetime.strptime(current_time, '%Y-%m-%d %H:%M:%S').timestamp() * 1000),
  444. })
  445. # 将连续的同一角色的消息做聚合,避免coze吞消息
  446. messages_to_aggr = []
  447. objects_to_aggr = []
  448. last_message_role = None
  449. for entry in dialogue_history:
  450. if not entry['content']:
  451. logger.warning("staff[{}], user[{}], role[{}]: empty content in dialogue history".format(
  452. staff_id, user_id, entry['role']
  453. ))
  454. continue
  455. role = entry['role']
  456. if role != last_message_role:
  457. if objects_to_aggr:
  458. if last_message_role != 'user':
  459. pass
  460. else:
  461. text_message = '\n'.join(messages_to_aggr)
  462. object_string_list = []
  463. for object_entry in objects_to_aggr:
  464. # FIXME: 其它消息类型的支持
  465. object_string_list.append(cozepy.MessageObjectString.build_image(file_url=object_entry['content']))
  466. object_string_list.append(cozepy.MessageObjectString.build_text(text_message))
  467. messages.append(cozepy.Message.build_user_question_objects(object_string_list))
  468. elif messages_to_aggr:
  469. aggregated_message = '\n'.join(messages_to_aggr)
  470. messages.append(DialogueManager.build_chat_message(
  471. last_message_role, aggregated_message, ChatServiceType.COZE_CHAT))
  472. objects_to_aggr = []
  473. messages_to_aggr = []
  474. last_message_role = role
  475. if entry.get('type', MessageType.TEXT) in (MessageType.IMAGE_GW, MessageType.IMAGE_QW, MessageType.GIF):
  476. # 多模态消息必须用特殊的聚合方式,一个object_string数组中只能有一个文字消息,但可以有多个图片
  477. if role == 'user':
  478. objects_to_aggr.append(entry)
  479. else:
  480. logger.warning("staff[{}], user[{}]: unsupported message type [{}] in assistant role".format(
  481. staff_id, user_id, entry['type']
  482. ))
  483. else:
  484. messages_to_aggr.append(DialogueManager.format_dialogue_content(entry))
  485. # 如果有未聚合的object消息,需要特殊处理
  486. if objects_to_aggr:
  487. if last_message_role != 'user':
  488. pass
  489. else:
  490. text_message = '\n'.join(messages_to_aggr)
  491. object_string_list = []
  492. for object_entry in objects_to_aggr:
  493. # FIXME: 其它消息类型的支持
  494. object_string_list.append(cozepy.MessageObjectString.build_image(file_url=object_entry['content']))
  495. object_string_list.append(cozepy.MessageObjectString.build_text(text_message))
  496. messages.append(cozepy.Message.build_user_question_objects(object_string_list))
  497. elif messages_to_aggr:
  498. aggregated_message = '\n'.join(messages_to_aggr)
  499. messages.append(DialogueManager.build_chat_message(
  500. last_message_role, aggregated_message, ChatServiceType.COZE_CHAT))
  501. return messages
  502. def build_chat_configuration(
  503. self,
  504. user_message: Optional[str] = None,
  505. chat_service_type: ChatServiceType = ChatServiceType.OPENAI_COMPATIBLE,
  506. overwrite_context: Optional[Dict] = None
  507. ) -> Dict:
  508. """
  509. 参数:
  510. user_message: 当前用户消息,如果是主动交互则为None
  511. 返回:
  512. 消息列表
  513. """
  514. dialogue_history = self.history_dialogue_service.get_dialogue_history(self.staff_id, self.user_id)
  515. logger.debug("staff[{}], user[{}], dialogue_history: {}".format(
  516. self.staff_id, self.user_id, dialogue_history
  517. ))
  518. messages = []
  519. config = {
  520. 'user_id': self.user_id
  521. }
  522. prompt_context = self.get_prompt_context(user_message)
  523. if overwrite_context:
  524. prompt_context.update(overwrite_context)
  525. # FIXME(zhoutian): time in string type
  526. current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  527. if overwrite_context and 'current_time' in overwrite_context:
  528. current_time = overwrite_context.get('current_time')
  529. need_multimodal = self.need_multimodal_model(dialogue_history)
  530. config['use_multimodal_model'] = need_multimodal
  531. if chat_service_type == ChatServiceType.OPENAI_COMPATIBLE:
  532. system_message = self._create_system_message(prompt_context)
  533. messages.append(system_message)
  534. messages.extend(self.compose_chat_messages_openai_compatible(dialogue_history, current_time, need_multimodal))
  535. elif chat_service_type == ChatServiceType.COZE_CHAT:
  536. dialogue_history = dialogue_history[-95:] # Coze最多支持100条,还需要附加系统消息
  537. messages = self.compose_chat_messages_coze(dialogue_history, current_time, self.staff_id, self.user_id)
  538. custom_variables = {}
  539. for k, v in prompt_context.items():
  540. custom_variables[k] = str(v)
  541. custom_variables.pop('user_profile', None)
  542. config['custom_variables'] = custom_variables
  543. config['bot_id'] = self._select_coze_bot(self.current_state, dialogue_history, need_multimodal)
  544. #FIXME(zhoutian): 临时报警
  545. if user_message and not messages:
  546. logger.error(f"staff[{self.staff_id}], user[{self.user_id}]: inconsistency in messages")
  547. config['messages'] = messages
  548. return config
  549. @staticmethod
  550. def format_timestamp(timestamp_ms):
  551. return datetime.fromtimestamp(timestamp_ms / 1000).strftime("%Y-%m-%d %H:%M:%S")
  552. @staticmethod
  553. def format_dialogue_content(dialogue_entry):
  554. fmt_time = DialogueManager.format_timestamp(dialogue_entry['timestamp'])
  555. content = '[{}] {}'.format(fmt_time, dialogue_entry['content'])
  556. return content
  557. @staticmethod
  558. def build_chat_message(role, content, chat_service_type: ChatServiceType):
  559. if chat_service_type == ChatServiceType.COZE_CHAT:
  560. if role == 'user':
  561. return cozepy.Message.build_user_question_text(content)
  562. elif role == 'assistant':
  563. return cozepy.Message.build_assistant_answer(content)
  564. else:
  565. return {'role': role, 'content': content}
  566. if __name__ == '__main__':
  567. state_cache = DialogueStateCache()
  568. state_cache.set_state('1688854492669990', '7881302581935903', DialogueState.CHITCHAT, DialogueState.GREETING)