Browse Source

Udpate agent_service: add push service

StrayWarrior 1 month ago
parent
commit
f8316a507c
2 changed files with 45 additions and 2 deletions
  1. 44 1
      pqai_agent/agent_service.py
  2. 1 1
      pqai_agent_server/agent_server.py

+ 44 - 1
pqai_agent/agent_service.py

@@ -13,6 +13,7 @@ import threading
 import traceback
 
 import apscheduler.triggers.cron
+import rocketmq
 from apscheduler.schedulers.background import BackgroundScheduler
 
 from pqai_agent import configs
@@ -21,6 +22,7 @@ from pqai_agent.logging_service import logger
 from pqai_agent import chat_service
 from pqai_agent.chat_service import CozeChat, ChatServiceType
 from pqai_agent.dialogue_manager import DialogueManager, DialogueState, DialogueStateCache
+from pqai_agent.push_service import PushScanThread, PushTaskWorkerPool
 from pqai_agent.rate_limiter import MessageSenderRateLimiter
 from pqai_agent.response_type_detector import ResponseTypeDetector
 from pqai_agent.user_manager import UserManager, UserRelationManager
@@ -78,6 +80,11 @@ class AgentService:
         self.process_thread = None
         self._sigint_cnt = 0
 
+        # Push相关
+        self.push_task_producer = None
+        self.push_task_consumer = None
+        self._init_push_task_queue()
+
         self.send_rate_limiter = MessageSenderRateLimiter()
 
     def setup_initiative_conversations(self, schedule_params: Optional[Dict] = None):
@@ -100,7 +107,8 @@ class AgentService:
                 topic,
                 has_consumer=True, has_producer=True,
                 group_id=mq_conf['scheduler_group'],
-                topic_type='DELAY'
+                topic_type='DELAY',
+                await_duration=5
             )
             self.msg_scheduler_thread = threading.Thread(target=self.process_scheduler_events)
             self.msg_scheduler_thread.start()
@@ -284,7 +292,42 @@ class AgentService:
             int(time.time() * 1000)
         ))
 
+    def _init_push_task_queue(self):
+        credentials = rocketmq.Credentials()
+        mq_conf = configs.get()['mq']
+        rmq_client_conf = rocketmq.ClientConfiguration(mq_conf['endpoints'], credentials, mq_conf['instance_id'])
+        rmq_topic = mq_conf['push_tasks_topic']
+        rmq_group = mq_conf['push_tasks_group']
+        self.push_task_rmq_topic = rmq_topic
+        self.push_task_producer = rocketmq.Producer(rmq_client_conf, (rmq_topic,))
+        self.push_task_producer.startup()
+        self.push_task_consumer = rocketmq.SimpleConsumer(rmq_client_conf, rmq_group, await_duration=5)
+        self.push_task_consumer.startup()
+        self.push_task_consumer.subscribe(rmq_topic)
+
     def _check_initiative_conversations(self):
+        logger.info("start to check initiative conversations")
+        if not DialogueManager.is_time_suitable_for_active_conversation():
+            logger.info("time is not suitable for active conversation")
+            return
+
+        push_scan_threads = []
+        for staff in self.user_relation_manager.list_staffs():
+            staff_id = staff['third_party_user_id']
+            scan_thread = threading.Thread(target=PushScanThread(
+                staff_id, self, self.push_task_rmq_topic, self.push_task_producer).run)
+            scan_thread.start()
+            push_scan_threads.append(scan_thread)
+
+        push_task_worker_pool = PushTaskWorkerPool(
+            self, self.push_task_rmq_topic, self.push_task_consumer, self.push_task_producer)
+        push_task_worker_pool.start()
+        for thread in push_scan_threads:
+            thread.join()
+        # 先等待生成任务全部提交,再等待任务处理线程池完成
+        push_task_worker_pool.wait_to_finish()
+
+    def _check_initiative_conversations_v1(self):
         logger.info("start to check initiative conversations")
         if not DialogueManager.is_time_suitable_for_active_conversation():
             logger.info("time is not suitable for active conversation")

+ 1 - 1
pqai_agent_server/agent_server.py

@@ -29,7 +29,7 @@ if __name__ == "__main__":
             config['mq']['receive_topic'],
             has_consumer=True, has_producer=True,
             group_id=config['mq']['receive_group'],
-            topic_type='FIFO'
+            topic_type='FIFO', await_duration=10
         )
         send_queue = AliyunRocketMQQueueBackend(
             config['mq']['endpoints'],