Преглед на файлове

Update push_service: improve efficiency

StrayWarrior преди 3 дни
родител
ревизия
fa10713507
променени са 1 файла, в които са добавени 46 реда и са изтрити 41 реда
  1. 46 41
      pqai_agent/push_service.py

+ 46 - 41
pqai_agent/push_service.py

@@ -54,7 +54,10 @@ class PushScanThread:
         first_initiate_tags = set(apollo_config.get_json_value('agent_first_initiate_whitelist_tags', []))
         # 合并白名单,减少配置成本
         white_list_tags.update(first_initiate_tags)
-        for staff_user in self.service.user_relation_manager.list_staff_users(staff_id=self.staff_id):
+        all_staff_users = self.service.user_relation_manager.list_staff_users(staff_id=self.staff_id)
+        all_users = list({pair['user_id'] for pair in all_staff_users})
+        all_user_tags = self.service.user_manager.get_user_tags(all_users)
+        for staff_user in all_staff_users:
             staff_id = staff_user['staff_id']
             user_id = staff_user['user_id']
             # 通过AB实验配置控制用户组是否启用push
@@ -62,7 +65,7 @@ class PushScanThread:
             # if abtest_params.get('agent_push_enabled', 'false').lower() != 'true':
             #     logger.debug(f"User {user_id} not enabled agent push, skipping.")
             #     continue
-            user_tags = self.service.user_relation_manager.get_user_tags(user_id)
+            user_tags = all_user_tags.get(user_id, list())
             if not white_list_tags.intersection(user_tags):
                 should_initiate = False
             else:
@@ -90,66 +93,67 @@ class PushTaskWorkerPool:
         self.generate_consumer = mq_consumer_generate
         self.send_consumer = mq_consumer_send
         self.producer = mq_producer
-        self.loop_thread = None
+        self.generate_loop_thread = None
+        self.send_loop_thread = None
         self.is_generator_running = True
-        self.generate_send_done = False # set by wait_to_finish
-        self.no_more_generate_task = False # set by self
+        self.generate_send_done = False # set by wait_to_finish,表示所有生成任务均已进入队列
+        self.no_more_generate_task = False # generate_send_done被设置之后队列中未再收到生成任务时设置
 
     def start(self):
-        self.loop_thread = Thread(target=self.process_push_tasks)
-        self.loop_thread.start()
+        self.send_loop_thread = Thread(target=self.process_send_tasks)
+        self.send_loop_thread.start()
+        self.generate_loop_thread = Thread(target=self.process_generate_tasks)
+        self.generate_loop_thread.start()
 
-    def process_push_tasks(self):
-        # RMQ consumer疑似有bug,创建后立即消费可能报NPE
+    def process_send_tasks(self):
         time.sleep(1)
         while True:
-            # FIXME: 拆分为两个单独的线程
-            # 目前优先处理发送任务
-            task_source = 'send'
             msgs = self.send_consumer.receive(1, 60)
-            if not msgs:
-                task_source = 'generate'
-                msgs = self.generate_consumer.receive(1, 300)
             if not msgs:
                 # 没有生成任务在执行且没有消息,才可退出
-                if self.generate_send_done:
-                    if not self.no_more_generate_task:
-                        logger.debug("no message received, there should be no more generate task")
-                        self.no_more_generate_task = True
-                        continue
-                    else:
-                        if self.is_generator_running:
-                            logger.debug("Waiting for generator threads to finish")
-                            continue
-                        else:
-                            break
+                if self.no_more_generate_task and not self.is_generator_running:
+                    break
                 else:
                     continue
             msg = msgs[0]
             task = json.loads(msg.body.decode('utf-8'))
             msg_time = datetime.fromtimestamp(task['timestamp'] / 1000).strftime("%Y-%m-%d %H:%M:%S")
             logger.debug(f"recv message:{msg_time} - {task}")
-            if task['task_type'] == TaskType.GENERATE.value:
-                # FIXME: 临时方案,避免消息在消费后等待超时并重复消费
-                if self.generate_executor._work_queue.qsize() > self.max_push_workers * 5:
-                    logger.warning("Too many generate tasks in queue, consume this task later")
-                    while self.generate_executor._work_queue.qsize() > self.max_push_workers * 5:
-                        time.sleep(10)
-                    # do not submit and ack this message
-                    continue
-                self.generate_executor.submit(self.handle_generate_task, task, msg)
-            elif task['task_type'] == TaskType.SEND.value:
+            if task['task_type'] == TaskType.SEND.value:
                 staff_id = task['staff_id']
                 if staff_id not in self.send_executors:
                     self.send_executors[staff_id] = ThreadPoolExecutor(max_workers=1)
                 self.send_executors[staff_id].submit(self.handle_send_task, task, msg)
             else:
                 logger.error(f"Unknown task type: {task['task_type']}")
-                if task_source == 'send':
-                    self.send_consumer.ack(msg)
+                self.send_consumer.ack(msg)
+        logger.info("PushGenerateWorkerPool send thread stopped")
+
+    def process_generate_tasks(self):
+        time.sleep(1)
+        while True:
+            if self.generate_executor._work_queue.qsize() > self.max_push_workers * 5:
+                logger.warning("Too many generate tasks in queue, consume later")
+                time.sleep(10)
+                continue
+            msgs = self.generate_consumer.receive(1, 300)
+            if not msgs:
+                # 生成任务已经创建完成 且 未收到新任务,才可退出
+                if self.generate_send_done:
+                    logger.debug("no message received, there should be no more generate task")
+                    self.no_more_generate_task = True
+                    break
                 else:
-                    self.generate_consumer.ack(msg)
-        logger.info("PushGenerateWorkerPool stopped")
+                    continue
+            msg = msgs[0]
+            task = json.loads(msg.body.decode('utf-8'))
+            msg_time = datetime.fromtimestamp(task['timestamp'] / 1000).strftime("%Y-%m-%d %H:%M:%S")
+            logger.debug(f"recv message:{msg_time} - {task}")
+            if task['task_type'] == TaskType.GENERATE.value:
+                self.generate_executor.submit(self.handle_generate_task, task, msg)
+            else:
+                self.generate_consumer.ack(msg)
+        logger.info("PushGenerateWorkerPool generator thread stopped")
 
     def wait_to_finish(self):
         self.generate_send_done = True
@@ -158,7 +162,8 @@ class PushTaskWorkerPool:
             time.sleep(1)
         self.generate_executor.shutdown(wait=True)
         self.is_generator_running = False
-        self.loop_thread.join()
+        self.generate_loop_thread.join()
+        self.send_loop_thread.join()
 
     def handle_send_task(self, task: Dict, msg: rocketmq.Message):
         try: