|
@@ -356,9 +356,6 @@ class TaskManager:
|
|
|
with ThreadPoolExecutor(max_workers=8) as executor: # 可根据需要调整并发数
|
|
|
futures = {}
|
|
|
for task_conversation in task_conversations:
|
|
|
- if self.task_events[task_id].is_set():
|
|
|
- break # 检查任务取消事件
|
|
|
-
|
|
|
# 提交子任务到线程池
|
|
|
future = executor.submit(
|
|
|
self._process_single_conversation,
|
|
@@ -394,9 +391,17 @@ class TaskManager:
|
|
|
def _process_single_conversation(self, task_id, task, task_conversation, query_prompt_template,
|
|
|
agent_configuration):
|
|
|
"""处理单个对话子任务(线程安全)"""
|
|
|
- # 检查任务是否被取消
|
|
|
- if self.task_events[task_id].is_set():
|
|
|
- return
|
|
|
+ # 获取锁(避免竞态条件)
|
|
|
+ task_lock = self.task_locks.get(task_id, threading.Lock())
|
|
|
+ with task_lock:
|
|
|
+ # 检查任务是否被取消或不存在
|
|
|
+ if task_id not in self.task_events:
|
|
|
+ logger.warning(f"Task {task_id} not found in task_events")
|
|
|
+ return
|
|
|
+
|
|
|
+ if self.task_events[task_id].is_set():
|
|
|
+ logger.info(f"Task {task_id} already cancelled")
|
|
|
+ return
|
|
|
|
|
|
# 更新子任务状态
|
|
|
if task_conversation.status == TestTaskConversationsStatus.PENDING.value:
|