Kaynağa Gözat

Add rate_limiter

StrayWarrior 3 ay önce
ebeveyn
işleme
8aadc1fa43
2 değiştirilmiş dosya ile 28 ekleme ve 0 silme
  1. 4 0
      agent_service.py
  2. 24 0
      rate_limiter.py

+ 4 - 0
agent_service.py

@@ -21,6 +21,7 @@ from configs import apollo_config
 from logging_service import logger
 from chat_service import CozeChat, ChatServiceType
 from dialogue_manager import DialogueManager, DialogueState, DialogueStateCache
+from rate_limiter import MessageSenderRateLimiter
 from response_type_detector import ResponseTypeDetector
 from user_manager import UserManager, LocalUserManager, MySQLUserManager, MySQLUserRelationManager, UserRelationManager, \
     LocalUserRelationManager
@@ -81,6 +82,8 @@ class AgentService:
         self.process_thread = None
         self._sigint_cnt = 0
 
+        self.send_rate_limiter = MessageSenderRateLimiter()
+
     def setup_initiative_conversations(self, schedule_params: Optional[Dict] = None):
         if not schedule_params:
             schedule_params = {'hour': '8,16,20'}
@@ -264,6 +267,7 @@ class AgentService:
         if not (staff_id in staff_white_lists or hit_white_list_tags or skip_check):
             logger.warning(f"staff[{staff_id}] user[{user_id}]: skip reply")
             return
+        self.send_rate_limiter.wait_for_sending(staff_id, response)
         self.send_queue.produce(
             Message.build(message_type, MessageChannel.CORP_WECHAT,
                           staff_id, user_id, response, current_ts)

+ 24 - 0
rate_limiter.py

@@ -0,0 +1,24 @@
+#! /usr/bin/env python
+# -*- coding: utf-8 -*-
+# vim:fenc=utf-8
+
+import time
+from logging_service import logger
+
+class MessageSenderRateLimiter:
+    MAX_CHAR_PER_SECOND = 5
+
+    def __init__(self):
+        self.last_send_time = {}
+
+    def wait_for_sending(self, sender_id: str, next_message: str):
+        current_time = time.time()
+        last_send_time = self.last_send_time.get(sender_id, 0)
+        elapsed_time = current_time - last_send_time
+        required_time = len(next_message) / self.MAX_CHAR_PER_SECOND
+        if elapsed_time < required_time:
+            logger.debug(f"Rate limit exceeded. Waiting for {required_time - elapsed_time:.2f} seconds.")
+            time.sleep(required_time - elapsed_time)
+        current_time = time.time()
+        self.last_send_time[sender_id] = current_time
+        return