unit_test.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. #! /usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # vim:fenc=utf-8
  4. import pytest
  5. from datetime import datetime, timedelta
  6. from typing import Dict, Optional, Tuple, Any
  7. from unittest.mock import Mock, MagicMock
  8. from agent_service import AgentService, MemoryQueueBackend
  9. from message import MessageType, Message, MessageChannel
  10. from user_manager import LocalUserManager
  11. import time
  12. import logging
  13. class TestMessageQueues:
  14. """测试用消息队列实现"""
  15. def __init__(self, receive_queue, send_queue, human_queue):
  16. self.receive_queue = receive_queue
  17. self.send_queue = send_queue
  18. self.human_queue = human_queue
  19. @pytest.fixture
  20. def test_env():
  21. """测试环境初始化"""
  22. logging.getLogger().setLevel(logging.DEBUG)
  23. user_manager = LocalUserManager()
  24. receive_queue = MemoryQueueBackend()
  25. send_queue = MemoryQueueBackend()
  26. human_queue = MemoryQueueBackend()
  27. queues = TestMessageQueues(receive_queue, send_queue, human_queue)
  28. # 创建Agent服务实例
  29. service = AgentService(
  30. receive_backend=receive_queue,
  31. send_backend=send_queue,
  32. human_backend=human_queue,
  33. user_manager=user_manager,
  34. user_relation_manager=None
  35. )
  36. service.user_profile_extractor.extract_profile_info = Mock(return_value=None)
  37. # 替换LLM调用为模拟响应
  38. service._call_chat_api = Mock(return_value="模拟响应")
  39. return service, queues
  40. def test_response_sanitization(test_env):
  41. case1 = '[2024-01-01 12:00:00] 你好'
  42. ret1 = AgentService.sanitize_response(case1)
  43. assert ret1 == '你好'
  44. case1 = '2024-01-01 12:00:00 你好'
  45. ret2 = AgentService.sanitize_response(case1)
  46. assert ret2 == '你好'
  47. def test_normal_conversation_flow(test_env):
  48. """测试正常对话流程"""
  49. service, queues = test_env
  50. service._get_agent_instance('staff_id_0', "user_id_0").message_aggregation_sec = 0
  51. # 准备测试消息
  52. test_msg = Message.build(
  53. MessageType.TEXT, MessageChannel.CORP_WECHAT,
  54. 'user_id_0', 'staff_id_0', '你好', int(time.time() * 1000))
  55. queues.receive_queue.produce(test_msg)
  56. # 处理消息
  57. message = service.receive_queue.consume()
  58. if message:
  59. service.process_single_message(message)
  60. # 验证响应消息
  61. sent_msg = queues.send_queue.consume()
  62. assert sent_msg is not None
  63. assert sent_msg.receiver == "user_id_0"
  64. assert "模拟响应" in sent_msg.content
  65. def test_aggregated_conversation_flow(test_env):
  66. """测试聚合对话流程"""
  67. service, queues = test_env
  68. service._get_agent_instance('staff_id_0', "user_id_0").message_aggregation_sec = 1
  69. # 准备测试消息
  70. ts_begin = int(time.time() * 1000)
  71. test_msg = Message.build(
  72. MessageType.TEXT, MessageChannel.CORP_WECHAT,
  73. 'user_id_0', 'staff_id_0', '你好', ts_begin)
  74. queues.receive_queue.produce(test_msg)
  75. test_msg = Message.build(
  76. MessageType.TEXT, MessageChannel.CORP_WECHAT,
  77. 'user_id_0', 'staff_id_0', '我是老李', ts_begin + 500)
  78. queues.receive_queue.produce(test_msg)
  79. # 处理消息
  80. message = service.receive_queue.consume()
  81. if message:
  82. service.process_single_message(message)
  83. # 验证第一次响应消息
  84. sent_msg = queues.send_queue.consume()
  85. assert sent_msg is None
  86. message = service.receive_queue.consume()
  87. if message:
  88. service.process_single_message(message)
  89. # 验证第二次响应消息
  90. sent_msg = queues.send_queue.consume()
  91. assert sent_msg is None
  92. # 模拟定时器产生空消息触发响应
  93. service.process_single_message(Message.build(
  94. MessageType.AGGREGATION_TRIGGER, MessageChannel.CORP_WECHAT,
  95. 'user_id_0', 'staff_id_0', None, ts_begin + 2000
  96. ))
  97. # 验证第三次响应消息
  98. sent_msg = queues.send_queue.consume()
  99. assert sent_msg is not None
  100. assert sent_msg.receiver == "user_id_0"
  101. assert "模拟响应" in sent_msg.content
  102. def test_human_intervention_trigger(test_env):
  103. """测试触发人工干预"""
  104. service, queues = test_env
  105. service._get_agent_instance('staff_id_0',"user_id_0").message_aggregation_sec = 0
  106. # 准备需要人工干预的消息
  107. test_msg = Message.build(
  108. MessageType.TEXT, MessageChannel.CORP_WECHAT,
  109. "user_id_0", "staff_id_0",
  110. "我需要帮助!", int(time.time() * 1000)
  111. )
  112. queues.receive_queue.produce(test_msg)
  113. # 处理消息
  114. message = service.receive_queue.consume()
  115. if message:
  116. service.process_single_message(message)
  117. # 验证人工队列消息
  118. human_msg = queues.human_queue.consume()
  119. assert human_msg is not None
  120. assert human_msg.sender == "user_id_0"
  121. assert "用户对话需人工介入" in human_msg.content
  122. def test_initiative_conversation(test_env):
  123. """测试主动发起对话"""
  124. service, queues = test_env
  125. service._get_agent_instance('staff_id_0', "user_id_0").message_aggregation_sec = 0
  126. service._call_chat_api = Mock(return_value="主动发起模拟消息")
  127. # 设置Agent需要主动发起对话
  128. agent = service._get_agent_instance('staff_id_0', "user_id_0")
  129. agent.should_initiate_conversation = Mock(return_value=(True, MagicMock()))
  130. service._check_initiative_conversations()
  131. # 验证主动发起的消息
  132. sent_msg = queues.send_queue.consume()
  133. assert sent_msg is not None
  134. assert "主动发起" in sent_msg.content