|
@@ -13,6 +13,7 @@ from sqlalchemy import func
|
|
from pqai_agent import logging_service
|
|
from pqai_agent import logging_service
|
|
from pqai_agent.data_models.agent_configuration import AgentConfiguration
|
|
from pqai_agent.data_models.agent_configuration import AgentConfiguration
|
|
from pqai_agent.data_models.agent_test_task import AgentTestTask
|
|
from pqai_agent.data_models.agent_test_task import AgentTestTask
|
|
|
|
+from pqai_agent.data_models.agent_test_task_conversations import AgentTestTaskConversations
|
|
from pqai_agent_server.const.status_enum import TestTaskConversationsStatus, TestTaskStatus, get_test_task_status_desc, \
|
|
from pqai_agent_server.const.status_enum import TestTaskConversationsStatus, TestTaskStatus, get_test_task_status_desc, \
|
|
get_test_task_conversations_status_desc
|
|
get_test_task_conversations_status_desc
|
|
|
|
|
|
@@ -129,40 +130,6 @@ class TaskManager:
|
|
self.executor = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix='TaskWorker')
|
|
self.executor = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix='TaskWorker')
|
|
self.task_futures = {} # 任务ID -> Future
|
|
self.task_futures = {} # 任务ID -> Future
|
|
|
|
|
|
- # def get_test_task_list(self, page_num: int, page_size: int) -> Dict:
|
|
|
|
- # fetch_query = f"""
|
|
|
|
- # select t1.id, t2.name, t1.create_user, t1.update_user, t1.status, t1.create_time, t1.update_time
|
|
|
|
- # from {self.test_task_table} t1
|
|
|
|
- # left join {self.agent_configuration_table} t2 on t1.agent_id = t2.id
|
|
|
|
- # order by create_time desc
|
|
|
|
- # limit %s, %s;
|
|
|
|
- # """
|
|
|
|
- # messages = self.db.fetch(fetch_query, ((page_num - 1) * page_size, page_size))
|
|
|
|
- # total_size = self.db.fetch_one(f"""select count(*) as `count` from {self.test_task_table}""")
|
|
|
|
- #
|
|
|
|
- # total = total_size["count"]
|
|
|
|
- # total_page = total // page_size + 1 if total % page_size > 0 else total // page_size
|
|
|
|
- # total_page = 1 if total_page <= 0 else total_page
|
|
|
|
- # response_data = [
|
|
|
|
- # {
|
|
|
|
- # "id": message["id"],
|
|
|
|
- # "agentName": message["name"],
|
|
|
|
- # "createUser": message["create_user"],
|
|
|
|
- # "updateUser": message["update_user"],
|
|
|
|
- # "statusName": get_test_task_status_desc(message["status"]),
|
|
|
|
- # "createTime": message["create_time"].strftime("%Y-%m-%d %H:%M:%S"),
|
|
|
|
- # "updateTime": message["update_time"].strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
- # }
|
|
|
|
- # for message in messages
|
|
|
|
- # ]
|
|
|
|
- # return {
|
|
|
|
- # "currentPage": page_num,
|
|
|
|
- # "pageSize": page_size,
|
|
|
|
- # "totalSize": total_page,
|
|
|
|
- # "total": total,
|
|
|
|
- # "list": response_data,
|
|
|
|
- # }
|
|
|
|
-
|
|
|
|
def get_test_task_list(self, page_num: int, page_size: int) -> Dict:
|
|
def get_test_task_list(self, page_num: int, page_size: int) -> Dict:
|
|
with self.session_maker() as session:
|
|
with self.session_maker() as session:
|
|
# 计算偏移量
|
|
# 计算偏移量
|
|
@@ -197,121 +164,97 @@ class TaskManager:
|
|
}
|
|
}
|
|
|
|
|
|
def get_test_task_conversations(self, task_id: int, page_num: int, page_size: int) -> Dict:
|
|
def get_test_task_conversations(self, task_id: int, page_num: int, page_size: int) -> Dict:
|
|
- fetch_query = f"""
|
|
|
|
- select t1.id, t2.name, t3.create_user, t1.input, t1.output, t1.score, t1.status, t1.create_time, t1.update_time
|
|
|
|
- from {self.test_task_conversations_table} t1
|
|
|
|
- left join {self.agent_configuration_table} t2 on t1.agent_id = t2.id
|
|
|
|
- left join {self.test_task_table} t3 on t1.task_id = t3.id
|
|
|
|
- where t1.task_id = %s
|
|
|
|
- order by create_time desc
|
|
|
|
- limit %s, %s;
|
|
|
|
- """
|
|
|
|
- messages = self.db.fetch(fetch_query, (task_id, (page_num - 1) * page_size, page_size))
|
|
|
|
- total_size = self.db.fetch_one(
|
|
|
|
- f"""select count(*) as `count` from {self.test_task_conversations_table} where task_id = %s""",
|
|
|
|
- (task_id,))
|
|
|
|
-
|
|
|
|
- total = total_size["count"]
|
|
|
|
- total_page = total // page_size + 1 if total % page_size > 0 else total // page_size
|
|
|
|
- total_page = 1 if total_page <= 0 else total_page
|
|
|
|
- response_data = [
|
|
|
|
- {
|
|
|
|
- "id": message["id"],
|
|
|
|
- "agentName": message["name"],
|
|
|
|
- "createUser": message["create_user"],
|
|
|
|
- "input": message["input"],
|
|
|
|
- "output": message["output"],
|
|
|
|
- "score": message["score"],
|
|
|
|
- "statusName": get_test_task_conversations_status_desc(message["status"]),
|
|
|
|
- "createTime": message["create_time"].strftime("%Y-%m-%d %H:%M:%S"),
|
|
|
|
- "updateTime": message["update_time"].strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
|
|
+ with self.session_maker() as session:
|
|
|
|
+ # 计算偏移量
|
|
|
|
+ offset = (page_num - 1) * page_size
|
|
|
|
+ # 查询分页数据
|
|
|
|
+ result = (session.query(AgentTestTaskConversations, AgentConfiguration)
|
|
|
|
+ .outerjoin(AgentConfiguration, AgentTestTaskConversations.agent_id == AgentConfiguration.id)
|
|
|
|
+ .filter(AgentTestTaskConversations.task_id == task_id)
|
|
|
|
+ .limit(page_size).offset(offset).all())
|
|
|
|
+ # 查询总记录数
|
|
|
|
+ total = session.query(func.count(AgentTestTaskConversations.id)).scalar()
|
|
|
|
+
|
|
|
|
+ total_page = total // page_size + 1 if total % page_size > 0 else total // page_size
|
|
|
|
+ total_page = 1 if total_page <= 0 else total_page
|
|
|
|
+ response_data = [
|
|
|
|
+ {
|
|
|
|
+ "id": agent_test_task_conversation.id,
|
|
|
|
+ "agentName": agent_configuration.name,
|
|
|
|
+ "input": agent_test_task_conversation.input,
|
|
|
|
+ "output": agent_test_task_conversation.output,
|
|
|
|
+ "score": agent_test_task_conversation.score,
|
|
|
|
+ "statusName": get_test_task_status_desc(agent_test_task_conversation.status),
|
|
|
|
+ "createTime": agent_test_task_conversation.create_time.strftime("%Y-%m-%d %H:%M:%S"),
|
|
|
|
+ "updateTime": agent_test_task_conversation.update_time.strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
+ }
|
|
|
|
+ for agent_test_task_conversation, agent_configuration in result
|
|
|
|
+ ]
|
|
|
|
+ return {
|
|
|
|
+ "currentPage": page_num,
|
|
|
|
+ "pageSize": page_size,
|
|
|
|
+ "totalSize": total_page,
|
|
|
|
+ "total": total,
|
|
|
|
+ "list": response_data,
|
|
}
|
|
}
|
|
- for message in messages
|
|
|
|
- ]
|
|
|
|
- return {
|
|
|
|
- "currentPage": page_num,
|
|
|
|
- "pageSize": page_size,
|
|
|
|
- "totalSize": total_page,
|
|
|
|
- "total": total,
|
|
|
|
- "list": response_data,
|
|
|
|
- }
|
|
|
|
|
|
|
|
def create_task(self, agent_id: int, model_id: int) -> Dict:
|
|
def create_task(self, agent_id: int, model_id: int) -> Dict:
|
|
"""创建新任务"""
|
|
"""创建新任务"""
|
|
-
|
|
|
|
- conn = self.db.get_connection()
|
|
|
|
- try:
|
|
|
|
- conn.begin()
|
|
|
|
- # TODO 插入任务 当前测试模拟
|
|
|
|
- with conn.cursor() as cursor:
|
|
|
|
- cursor.execute(
|
|
|
|
- f"""INSERT INTO {self.test_task_table} (agent_id, status, create_user, update_user) VALUES (%s, %s, %s, %s)""",
|
|
|
|
- (agent_id, 0, 'xueyiming', 'xueyiming')
|
|
|
|
- )
|
|
|
|
- task_id = cursor.lastrowid
|
|
|
|
- task_conversations = []
|
|
|
|
- # TODO 查询具体的数据集信息后插入
|
|
|
|
- i = 0
|
|
|
|
- for _ in range(30):
|
|
|
|
- i = i + 1
|
|
|
|
- task_conversations.append((
|
|
|
|
- task_id, agent_id, i, i, "输入", "输出", 0
|
|
|
|
- ))
|
|
|
|
-
|
|
|
|
- with conn.cursor() as cursor:
|
|
|
|
- cursor.executemany(
|
|
|
|
- f"""INSERT INTO {self.test_task_conversations_table} (task_id, agent_id, dataset_id, conversation_id, input, output, status)
|
|
|
|
- VALUES (%s, %s, %s, %s, %s, %s, %s)""",
|
|
|
|
- task_conversations
|
|
|
|
- )
|
|
|
|
-
|
|
|
|
- conn.commit()
|
|
|
|
- except Exception as e:
|
|
|
|
- conn.rollback()
|
|
|
|
- logger.error(f"Failed to create task agent_id {agent_id}: {str(e)}")
|
|
|
|
- raise
|
|
|
|
- finally:
|
|
|
|
- self.db.release_connection(conn)
|
|
|
|
- logger.info(f"Created task {task_id} with 100 task_conversations")
|
|
|
|
- # 异步执行任务
|
|
|
|
|
|
+ with self.session_maker() as session:
|
|
|
|
+ with session.begin():
|
|
|
|
+ agent_test_task = AgentTestTask(agent_id=agent_id, model_id=model_id)
|
|
|
|
+ session.add(agent_test_task)
|
|
|
|
+ session.flush() # 强制SQL执行,但不提交事务
|
|
|
|
+ task_id = agent_test_task.id
|
|
|
|
+ agent_test_task_conversations = []
|
|
|
|
+ # TODO 查询具体的数据集信息后插入
|
|
|
|
+ i = 0
|
|
|
|
+ for _ in range(30):
|
|
|
|
+ i = i + 1
|
|
|
|
+ agent_test_task_conversation = AgentTestTaskConversations(task_id=task_id, agent_id=agent_id,
|
|
|
|
+ input='输入', output='输出',
|
|
|
|
+ dataset_id=i, conversation_id=i)
|
|
|
|
+ agent_test_task_conversations.append(agent_test_task_conversation)
|
|
|
|
+ session.add_all(agent_test_task_conversations)
|
|
|
|
+ # 异步执行任务
|
|
self._execute_task(task_id)
|
|
self._execute_task(task_id)
|
|
return self.get_task(task_id)
|
|
return self.get_task(task_id)
|
|
|
|
|
|
- def get_task(self, task_id: int) -> Optional[Dict]:
|
|
|
|
|
|
+ def get_task(self, task_id: int):
|
|
"""获取任务信息"""
|
|
"""获取任务信息"""
|
|
- return self.db.fetch_one(f"""SELECT * FROM {self.test_task_table} WHERE id = %s""", (task_id,))
|
|
|
|
|
|
+ with self.session_maker() as session:
|
|
|
|
+ return session.query(AgentTestTask).filter(AgentTestTask.id == task_id).one()
|
|
|
|
|
|
- def get_task_conversations(self, task_id: int) -> List[Dict]:
|
|
|
|
|
|
+ def get_task_conversations(self, task_id: int):
|
|
"""获取任务的所有子任务"""
|
|
"""获取任务的所有子任务"""
|
|
- return self.db.fetch(f"""SELECT * FROM {self.test_task_conversations_table} WHERE task_id = %s""", (task_id,))
|
|
|
|
|
|
+ with self.session_maker() as session:
|
|
|
|
+ return session.query(AgentTestTaskConversations).filter(AgentTestTaskConversations.task_id == task_id).all()
|
|
|
|
|
|
- def get_pending_task_conversations(self, task_id: int) -> List[Dict]:
|
|
|
|
|
|
+ def get_pending_task_conversations(self, task_id: int):
|
|
"""获取待处理的子任务"""
|
|
"""获取待处理的子任务"""
|
|
- return self.db.fetch(
|
|
|
|
- f"""SELECT * FROM {self.test_task_conversations_table} WHERE task_id = %s AND status = %s""",
|
|
|
|
- (task_id, TestTaskConversationsStatus.PENDING.value)
|
|
|
|
- )
|
|
|
|
|
|
+ with self.session_maker() as session:
|
|
|
|
+ return session.query(AgentTestTaskConversations).filter(AgentTestTaskConversations.task_id == task_id).filter(
|
|
|
|
+ AgentTestTaskConversations.status == TestTaskConversationsStatus.PENDING.value).all()
|
|
|
|
|
|
def update_task_status(self, task_id: int, status: int):
|
|
def update_task_status(self, task_id: int, status: int):
|
|
"""更新任务状态"""
|
|
"""更新任务状态"""
|
|
- self.db.execute(
|
|
|
|
- f"""UPDATE {self.test_task_table} SET status = %s WHERE id = %s""",
|
|
|
|
- (status, task_id)
|
|
|
|
- )
|
|
|
|
|
|
+ with self.session_maker() as session:
|
|
|
|
+ session.query(AgentTestTask).filter(AgentTestTask.id == task_id).update({"status": status})
|
|
|
|
+ session.commit()
|
|
|
|
|
|
def update_task_conversations_status(self, task_conversations_id: int, status: int):
|
|
def update_task_conversations_status(self, task_conversations_id: int, status: int):
|
|
"""更新子任务状态"""
|
|
"""更新子任务状态"""
|
|
- self.db.execute(
|
|
|
|
- f"""UPDATE {self.test_task_conversations_table} SET status = %s WHERE id = %s""",
|
|
|
|
- (status, task_conversations_id)
|
|
|
|
- )
|
|
|
|
|
|
+ with self.session_maker() as session:
|
|
|
|
+ session.query(AgentTestTaskConversations).filter(
|
|
|
|
+ AgentTestTaskConversations.id == task_conversations_id).update({"status": status})
|
|
|
|
+ session.commit()
|
|
|
|
|
|
def update_task_conversations_res(self, task_conversations_id: int, status: int, score: str):
|
|
def update_task_conversations_res(self, task_conversations_id: int, status: int, score: str):
|
|
- """更新子任务状态"""
|
|
|
|
- self.db.execute(
|
|
|
|
- f"""UPDATE {self.test_task_conversations_table} SET status = %s, score = %s WHERE id = %s""",
|
|
|
|
- (status, score, task_conversations_id)
|
|
|
|
- )
|
|
|
|
|
|
+ """更新子任务结果"""
|
|
|
|
+ with self.session_maker() as session:
|
|
|
|
+ session.query(AgentTestTaskConversations).filter(
|
|
|
|
+ AgentTestTaskConversations.id == task_conversations_id).update({"status": status, "score": score})
|
|
|
|
+ session.commit()
|
|
|
|
|
|
def cancel_task(self, task_id: int):
|
|
def cancel_task(self, task_id: int):
|
|
"""取消任务(带事务支持)"""
|
|
"""取消任务(带事务支持)"""
|
|
@@ -322,63 +265,32 @@ class TaskManager:
|
|
if task_id in self.task_futures:
|
|
if task_id in self.task_futures:
|
|
self.task_futures[task_id].cancel()
|
|
self.task_futures[task_id].cancel()
|
|
|
|
|
|
- conn = self.db.get_connection()
|
|
|
|
- try:
|
|
|
|
- conn.begin()
|
|
|
|
- # 更新任务状态为取消
|
|
|
|
- with conn.cursor() as cursor:
|
|
|
|
- cursor.execute(
|
|
|
|
- f"""UPDATE {self.test_task_table} SET status = %s WHERE id = %s""",
|
|
|
|
- (TestTaskStatus.CANCELLED.value, task_id)
|
|
|
|
- )
|
|
|
|
-
|
|
|
|
- # 取消所有待处理的子任务
|
|
|
|
- with conn.cursor() as cursor:
|
|
|
|
- cursor.execute(
|
|
|
|
- f"""UPDATE {self.test_task_conversations_table} SET status = %s WHERE task_id = %s AND status = %s""",
|
|
|
|
- (TestTaskConversationsStatus.CANCELLED.value, task_id, TestTaskConversationsStatus.PENDING.value)
|
|
|
|
- )
|
|
|
|
- conn.commit()
|
|
|
|
- logger.info(f"Cancelled task {task_id} and its pending {self.test_task_conversations_table}")
|
|
|
|
- except Exception as e:
|
|
|
|
- conn.rollback()
|
|
|
|
- logger.error(f"Failed to cancel task {task_id}: {str(e)}")
|
|
|
|
- finally:
|
|
|
|
- self.db.release_connection(conn)
|
|
|
|
|
|
+ with self.session_maker() as session:
|
|
|
|
+ with session.begin():
|
|
|
|
+ session.query(AgentTestTask).filter(AgentTestTask.id == task_id).update(
|
|
|
|
+ {"status": TestTaskStatus.CANCELLED.value})
|
|
|
|
+ session.query(AgentTestTaskConversations).filter(AgentTestTaskConversations.task_id == task_id).filter(
|
|
|
|
+ AgentTestTaskConversations.status == TestTaskConversationsStatus.PENDING.value).update(
|
|
|
|
+ {"status": TestTaskConversationsStatus.CANCELLED.value})
|
|
|
|
+ session.commit()
|
|
|
|
|
|
def resume_task(self, task_id: int) -> bool:
|
|
def resume_task(self, task_id: int) -> bool:
|
|
"""恢复已取消的任务"""
|
|
"""恢复已取消的任务"""
|
|
task = self.get_task(task_id)
|
|
task = self.get_task(task_id)
|
|
- if not task or task['status'] != TestTaskStatus.CANCELLED.value:
|
|
|
|
|
|
+ if not task or task.status != TestTaskStatus.CANCELLED.value:
|
|
return False
|
|
return False
|
|
|
|
|
|
- conn = self.db.get_connection()
|
|
|
|
- try:
|
|
|
|
- conn.begin()
|
|
|
|
- # 更新任务状态为待开始
|
|
|
|
- with conn.cursor() as cursor:
|
|
|
|
- cursor.execute(
|
|
|
|
- f"""UPDATE {self.test_task_table} SET status = %s WHERE id = %s""",
|
|
|
|
- (TestTaskStatus.NOT_STARTED.value, task_id)
|
|
|
|
- )
|
|
|
|
-
|
|
|
|
- # 恢复所有已取消的子任务
|
|
|
|
- with conn.cursor() as cursor:
|
|
|
|
- cursor.execute(
|
|
|
|
- f"""UPDATE {self.test_task_conversations_table} SET status = %s WHERE task_id = %s AND status = %s""",
|
|
|
|
- (TestTaskConversationsStatus.PENDING.value, task_id, TestTaskConversationsStatus.CANCELLED.value)
|
|
|
|
- )
|
|
|
|
- conn.commit()
|
|
|
|
- logger.info(f"Cancelled task {task_id} and its pending {self.test_task_conversations_table}")
|
|
|
|
- except Exception as e:
|
|
|
|
- conn.rollback()
|
|
|
|
- logger.error(f"Failed to cancel task {task_id}: {str(e)}")
|
|
|
|
- finally:
|
|
|
|
- self.db.release_connection(conn)
|
|
|
|
|
|
+ with self.session_maker() as session:
|
|
|
|
+ with session.begin():
|
|
|
|
+ session.query(AgentTestTask).filter(AgentTestTask.id == task_id).update(
|
|
|
|
+ {"status": TestTaskStatus.NOT_STARTED.value})
|
|
|
|
+ session.query(AgentTestTaskConversations).filter(AgentTestTaskConversations.task_id == task_id).filter(
|
|
|
|
+ AgentTestTaskConversations.status == TestTaskConversationsStatus.CANCELLED.value).update(
|
|
|
|
+ {"status": TestTaskConversationsStatus.PENDING.value})
|
|
|
|
+ session.commit()
|
|
|
|
|
|
# 重新执行任务
|
|
# 重新执行任务
|
|
self._execute_task(task_id)
|
|
self._execute_task(task_id)
|
|
-
|
|
|
|
logger.info(f"Resumed task {task_id}")
|
|
logger.info(f"Resumed task {task_id}")
|
|
return True
|
|
return True
|
|
|
|
|
|
@@ -418,7 +330,7 @@ class TaskManager:
|
|
break
|
|
break
|
|
|
|
|
|
# 更新子任务状态为运行中
|
|
# 更新子任务状态为运行中
|
|
- self.update_task_conversations_status(task_conversation['id'],
|
|
|
|
|
|
+ self.update_task_conversations_status(task_conversation.id,
|
|
TestTaskConversationsStatus.RUNNING.value)
|
|
TestTaskConversationsStatus.RUNNING.value)
|
|
try:
|
|
try:
|
|
# 模拟任务执行 - 在实际应用中替换为实际业务逻辑
|
|
# 模拟任务执行 - 在实际应用中替换为实际业务逻辑
|
|
@@ -426,18 +338,19 @@ class TaskManager:
|
|
time.sleep(1)
|
|
time.sleep(1)
|
|
score = '{"score":0.05}'
|
|
score = '{"score":0.05}'
|
|
# 更新子任务状态为已完成
|
|
# 更新子任务状态为已完成
|
|
- self.update_task_conversations_res(task_conversation['id'],
|
|
|
|
|
|
+ self.update_task_conversations_res(task_conversation.id,
|
|
TestTaskConversationsStatus.SUCCESS.value, score)
|
|
TestTaskConversationsStatus.SUCCESS.value, score)
|
|
except Exception as e:
|
|
except Exception as e:
|
|
logger.error(f"Error executing task {task_id}: {str(e)}")
|
|
logger.error(f"Error executing task {task_id}: {str(e)}")
|
|
- self.update_task_conversations_status(task_conversation['id'],
|
|
|
|
|
|
+ self.update_task_conversations_status(task_conversation.id,
|
|
TestTaskConversationsStatus.FAILED.value)
|
|
TestTaskConversationsStatus.FAILED.value)
|
|
|
|
+
|
|
# 检查任务是否完成
|
|
# 检查任务是否完成
|
|
task_conversations = self.get_task_conversations(task_id)
|
|
task_conversations = self.get_task_conversations(task_id)
|
|
- all_completed = all(task_conversation['status'] in
|
|
|
|
|
|
+ all_completed = all(task_conversation.status in
|
|
(TestTaskConversationsStatus.SUCCESS.value, TestTaskConversationsStatus.FAILED.value)
|
|
(TestTaskConversationsStatus.SUCCESS.value, TestTaskConversationsStatus.FAILED.value)
|
|
for task_conversation in task_conversations)
|
|
for task_conversation in task_conversations)
|
|
- any_pending = any(task_conversation['status'] in
|
|
|
|
|
|
+ any_pending = any(task_conversation.status in
|
|
(TestTaskConversationsStatus.PENDING.value, TestTaskConversationsStatus.RUNNING.value)
|
|
(TestTaskConversationsStatus.PENDING.value, TestTaskConversationsStatus.RUNNING.value)
|
|
for task_conversation in task_conversations)
|
|
for task_conversation in task_conversations)
|
|
|
|
|
|
@@ -446,7 +359,7 @@ class TaskManager:
|
|
logger.info(f"Task {task_id} completed")
|
|
logger.info(f"Task {task_id} completed")
|
|
elif not any_pending:
|
|
elif not any_pending:
|
|
# 没有待处理子任务但未全部完成(可能是取消了)
|
|
# 没有待处理子任务但未全部完成(可能是取消了)
|
|
- current_status = self.get_task(task_id)['status']
|
|
|
|
|
|
+ current_status = self.get_task(task_id).status
|
|
if current_status != TestTaskStatus.CANCELLED.value:
|
|
if current_status != TestTaskStatus.CANCELLED.value:
|
|
self.update_task_status(task_id, TestTaskStatus.COMPLETED.value
|
|
self.update_task_status(task_id, TestTaskStatus.COMPLETED.value
|
|
if all_completed else TestTaskStatus.CANCELLED.value)
|
|
if all_completed else TestTaskStatus.CANCELLED.value)
|