|
@@ -81,6 +81,7 @@ class PushTaskWorkerPool:
|
|
|
mq_consumer: rocketmq.SimpleConsumer, mq_producer: rocketmq.Producer):
|
|
|
self.agent_service = agent_service
|
|
|
max_workers = configs.get()['system'].get('push_task_workers', 5)
|
|
|
+ self.max_push_workers = max_workers
|
|
|
self.generate_executor = ThreadPoolExecutor(max_workers=max_workers)
|
|
|
self.send_executors = {}
|
|
|
self.rmq_topic = mq_topic
|
|
@@ -120,6 +121,13 @@ class PushTaskWorkerPool:
|
|
|
msg_time = datetime.fromtimestamp(task['timestamp'] / 1000).strftime("%Y-%m-%d %H:%M:%S")
|
|
|
logger.debug(f"recv message:{msg_time} - {task}")
|
|
|
if task['task_type'] == TaskType.GENERATE.value:
|
|
|
+ # FIXME: 临时方案,避免消息在消费后等待超时并重复消费
|
|
|
+ if self.generate_executor._work_queue.qsize() > self.max_push_workers * 5:
|
|
|
+ logger.warning("Too many generate tasks in queue, consume this task later")
|
|
|
+ while self.generate_executor._work_queue.qsize() > self.max_push_workers * 5:
|
|
|
+ time.sleep(10)
|
|
|
+ # do not submit and ack this message
|
|
|
+ continue
|
|
|
self.generate_executor.submit(self.handle_generate_task, task, msg)
|
|
|
elif task['task_type'] == TaskType.SEND.value:
|
|
|
staff_id = task['staff_id']
|