unit_test.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. #! /usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # vim:fenc=utf-8
  4. import pytest
  5. from unittest.mock import Mock, MagicMock
  6. import pqai_agent.abtest.client
  7. import pqai_agent.configs
  8. from pqai_agent.agent_service import AgentService
  9. from pqai_agent.dialogue_manager import DialogueState, TimeContext
  10. from pqai_agent.message_queue_backend import MemoryQueueBackend
  11. from pqai_agent.mq_message import MessageType, MqMessage, MessageChannel
  12. from pqai_agent.response_type_detector import ResponseTypeDetector
  13. from pqai_agent.user_manager import LocalUserManager
  14. import time
  15. import logging
  16. class TestMessageQueues:
  17. """测试用消息队列实现"""
  18. def __init__(self, receive_queue, send_queue, human_queue):
  19. self.receive_queue = receive_queue
  20. self.send_queue = send_queue
  21. self.human_queue = human_queue
  22. @pytest.fixture
  23. def test_env():
  24. """测试环境初始化"""
  25. logging.getLogger().setLevel(logging.DEBUG)
  26. user_manager = LocalUserManager()
  27. user_relation_manager = Mock()
  28. user_relation_manager.get_user_tags = Mock(return_value=['AgentTest1'])
  29. user_relation_manager.list_staff_users = Mock(return_value=[{'staff_id': 'staff_id_0', 'user_id': 'user_id_0'}])
  30. receive_queue = MemoryQueueBackend()
  31. send_queue = MemoryQueueBackend()
  32. human_queue = MemoryQueueBackend()
  33. queues = TestMessageQueues(receive_queue, send_queue, human_queue)
  34. # 创建Agent服务实例
  35. service = AgentService(
  36. receive_backend=receive_queue,
  37. send_backend=send_queue,
  38. human_backend=human_queue,
  39. user_manager=user_manager,
  40. user_relation_manager=user_relation_manager
  41. )
  42. service.user_profile_extractor.extract_profile_info = Mock(return_value=None)
  43. service.can_send_to_user = Mock(return_value=True)
  44. service.start()
  45. # 替换LLM调用为模拟响应
  46. service._call_chat_api = Mock(return_value="模拟响应")
  47. yield service, queues
  48. service.shutdown(sync=True)
  49. pqai_agent.abtest.client.get_client().shutdown()
  50. def test_agent_state_change(test_env):
  51. service, _ = test_env
  52. agent = service.get_agent_instance('staff_id_0', 'user_id_0')
  53. assert agent.current_state == DialogueState.INITIALIZED
  54. assert agent.previous_state == DialogueState.INITIALIZED
  55. agent.do_state_change(DialogueState.MESSAGE_AGGREGATING)
  56. assert agent.current_state == DialogueState.MESSAGE_AGGREGATING
  57. assert agent.previous_state == DialogueState.INITIALIZED
  58. agent.do_state_change(DialogueState.GREETING)
  59. assert agent.current_state == DialogueState.GREETING
  60. assert agent.previous_state == DialogueState.INITIALIZED
  61. agent.do_state_change(DialogueState.MESSAGE_AGGREGATING)
  62. assert agent.current_state == DialogueState.MESSAGE_AGGREGATING
  63. assert agent.previous_state == DialogueState.GREETING
  64. agent.commit()
  65. agent.do_state_change(DialogueState.CHITCHAT)
  66. assert agent.current_state == DialogueState.CHITCHAT
  67. assert agent.previous_state == DialogueState.GREETING
  68. agent.rollback_state()
  69. assert agent.current_state == DialogueState.MESSAGE_AGGREGATING
  70. assert agent.previous_state == DialogueState.GREETING
  71. agent.do_state_change(DialogueState.CHITCHAT)
  72. assert agent.current_state == DialogueState.CHITCHAT
  73. assert agent.previous_state == DialogueState.GREETING
  74. agent.do_state_change(DialogueState.MESSAGE_AGGREGATING)
  75. agent.do_state_change(DialogueState.CHITCHAT)
  76. assert agent.current_state == DialogueState.CHITCHAT
  77. assert agent.previous_state == DialogueState.CHITCHAT
  78. agent.do_state_change(DialogueState.MESSAGE_AGGREGATING)
  79. agent.commit()
  80. agent.do_state_change(DialogueState.CHITCHAT)
  81. agent.rollback_state()
  82. assert agent.current_state == DialogueState.MESSAGE_AGGREGATING
  83. agent.rollback_state()
  84. # no state should be rollback
  85. assert agent.current_state == DialogueState.MESSAGE_AGGREGATING
  86. def test_response_sanitization(test_env):
  87. case1 = '[2024-01-01 12:00:00] 你好'
  88. ret1 = AgentService.sanitize_response(case1)
  89. assert ret1 == '你好'
  90. case1 = '2024-01-01 12:00:00 你好'
  91. ret2 = AgentService.sanitize_response(case1)
  92. assert ret2 == '你好'
  93. def test_normal_conversation_flow(test_env):
  94. """测试正常对话流程"""
  95. service, queues = test_env
  96. service.get_agent_instance('staff_id_0', "user_id_0").message_aggregation_sec = 0
  97. # 准备测试消息
  98. test_msg = MqMessage.build(
  99. MessageType.TEXT, MessageChannel.CORP_WECHAT,
  100. 'user_id_0', 'staff_id_0', '你好', int(time.time() * 1000))
  101. queues.receive_queue.produce(test_msg)
  102. # 处理消息
  103. message = service.receive_queue.consume()
  104. if message:
  105. service.process_single_message(message)
  106. # 验证响应消息
  107. sent_msg = queues.send_queue.consume()
  108. assert sent_msg is not None
  109. assert sent_msg.receiver == "user_id_0"
  110. assert "模拟响应" in sent_msg.content
  111. def test_aggregated_conversation_flow(test_env):
  112. """测试聚合对话流程"""
  113. service, queues = test_env
  114. service.get_agent_instance('staff_id_0', "user_id_0").message_aggregation_sec = 1
  115. # 准备测试消息
  116. ts_begin = int(time.time() * 1000)
  117. test_msg = MqMessage.build(
  118. MessageType.TEXT, MessageChannel.CORP_WECHAT,
  119. 'user_id_0', 'staff_id_0', '你好', ts_begin)
  120. queues.receive_queue.produce(test_msg)
  121. test_msg = MqMessage.build(
  122. MessageType.TEXT, MessageChannel.CORP_WECHAT,
  123. 'user_id_0', 'staff_id_0', '我是老李', ts_begin + 500)
  124. queues.receive_queue.produce(test_msg)
  125. # 处理消息
  126. message = service.receive_queue.consume()
  127. if message:
  128. service.process_single_message(message)
  129. # 验证第一次响应消息
  130. sent_msg = queues.send_queue.consume()
  131. assert sent_msg is None
  132. message = service.receive_queue.consume()
  133. if message:
  134. service.process_single_message(message)
  135. # 验证第二次响应消息
  136. sent_msg = queues.send_queue.consume()
  137. assert sent_msg is None
  138. # 模拟定时器产生空消息触发响应
  139. service.process_single_message(MqMessage.build(
  140. MessageType.AGGREGATION_TRIGGER, MessageChannel.CORP_WECHAT,
  141. 'user_id_0', 'staff_id_0', None, ts_begin + 2000
  142. ))
  143. # 验证第三次响应消息
  144. sent_msg = queues.send_queue.consume()
  145. assert sent_msg is not None
  146. assert sent_msg.receiver == "user_id_0"
  147. assert "模拟响应" in sent_msg.content
  148. def test_human_intervention_trigger(test_env):
  149. """测试触发人工干预"""
  150. service, queues = test_env
  151. service.get_agent_instance('staff_id_0', "user_id_0").message_aggregation_sec = 0
  152. # 准备需要人工干预的消息
  153. test_msg = MqMessage.build(
  154. MessageType.TEXT, MessageChannel.CORP_WECHAT,
  155. "user_id_0", "staff_id_0",
  156. "我需要帮助!", int(time.time() * 1000)
  157. )
  158. queues.receive_queue.produce(test_msg)
  159. # 处理消息
  160. message = service.receive_queue.consume()
  161. if message:
  162. service.process_single_message(message)
  163. # 验证人工队列消息
  164. human_msg = queues.human_queue.consume()
  165. # 由于相关逻辑未启用,临时关闭该测试
  166. return
  167. assert human_msg is not None
  168. assert human_msg.sender == "user_id_0"
  169. assert "用户对话需人工介入" in human_msg.content
  170. def test_initiative_conversation(test_env):
  171. """测试主动发起对话"""
  172. service, queues = test_env
  173. service.get_agent_instance('staff_id_0', "user_id_0").message_aggregation_sec = 0
  174. service._call_chat_api = Mock(return_value="主动发起模拟消息")
  175. # 设置Agent需要主动发起对话
  176. agent = service.get_agent_instance('staff_id_0', "user_id_0")
  177. agent.should_initiate_conversation = Mock(return_value=(True, MagicMock()))
  178. # 发起对话有时间限制
  179. agent.get_time_context = Mock(return_value=TimeContext.MORNING)
  180. service._check_initiative_conversations()
  181. # 验证主动发起的消息 (由于当前有白名单,无法支持测试)
  182. sent_msg = queues.send_queue.consume()
  183. # assert sent_msg is not None
  184. # assert "主动发起" in sent_msg.content
  185. def test_response_type_detector(test_env):
  186. case1 = '大哥,那可得提前了解下天气,以便安排行程~我帮您查查明天北京天气?'
  187. assert ResponseTypeDetector.is_chinese_only(case1) == True
  188. assert ResponseTypeDetector.if_message_suitable_for_voice(case1) == True
  189. case2 = 'hi'
  190. assert ResponseTypeDetector.is_chinese_only(case2) == False
  191. case3 = '这是链接:http://domain.com'
  192. assert ResponseTypeDetector.is_chinese_only(case3) == False
  193. case4 = '大哥,那可得提前了解下天气'
  194. assert ResponseTypeDetector.if_message_suitable_for_voice(case4) == True
  195. global_config = pqai_agent.configs.get()
  196. global_config.get('debug_flags', {}).update({'disable_llm_api_call': False})
  197. response_detector = ResponseTypeDetector()
  198. dialogue1 = [
  199. {'role': 'user', 'content': '你好', 'timestamp': 1744979571000, 'type': MessageType.TEXT},
  200. {'role': 'assistant', 'content': '你好呀', 'timestamp': 1744979581000},
  201. ]
  202. assert response_detector.detect_type(dialogue1[:-1], dialogue1[-1]) == MessageType.TEXT
  203. dialogue2 = [
  204. {'role': 'user', 'content': '你可以读一个故事给我听吗', 'timestamp': 1744979591000},
  205. {'role': 'assistant', 'content': '当然可以啦!想听什么?', 'timestamp': 1744979601000},
  206. {'role': 'user', 'content': '我想听小王子', 'timestamp': 1744979611000},
  207. {'role': 'assistant', 'content': '《小王子》讲述了一位年轻王子离开自己的小世界去探索宇宙的冒险经历。 在旅途中,他遇到了各种各样的人,包括被困的飞行员、狐狸和聪明的蛇。 王子通过这些遭遇学到了关于爱情、友谊和超越表面的必要性的重要教训。', 'timestamp': 1744979611000},
  208. ]
  209. assert response_detector.detect_type(dialogue2[:-1], dialogue2[-1]) == MessageType.VOICE
  210. dialogue3 = [
  211. {'role': 'user', 'content': '他说的是西洋参呢,晓不得到底是不是西洋参。那个样,那个茶是抽的真空的紧包包。我泡他两包,两包泡到十几盒,13盒,我还拿回来的。', 'timestamp': 1744979591000},
  212. {'role': 'assistant', 'content': '咋啦?是突然想到啥啦,还是有其他事想和我分享分享?', 'timestamp': 1744979601000},
  213. {'role': 'user', 'content': '不要打字,还不要打。听不到。不要打字,不要打字,打字我认不到。打字我认不到,不要打字不要打字,打字我认不到。', 'timestamp': 1744979611000},
  214. {'role': 'assistant', 'content': '真是不好意思', 'timestamp': 1744979611000},
  215. ]
  216. assert response_detector.detect_type(dialogue3[:-1], dialogue3[-1]) == MessageType.VOICE
  217. global_config.get('debug_flags', {}).update({'disable_llm_api_call': True})