|
@@ -358,7 +358,6 @@ class TaskManager:
|
|
for future in concurrent.futures.as_completed(futures):
|
|
for future in concurrent.futures.as_completed(futures):
|
|
conv_id = futures[future]
|
|
conv_id = futures[future]
|
|
try:
|
|
try:
|
|
- # 设置单个任务超时时间(秒),可根据任务复杂度调整
|
|
|
|
future.result() # 获取结果(如有异常会在此抛出)
|
|
future.result() # 获取结果(如有异常会在此抛出)
|
|
except Exception as e:
|
|
except Exception as e:
|
|
logger.error(f"Subtask {conv_id} failed: {str(e)}")
|
|
logger.error(f"Subtask {conv_id} failed: {str(e)}")
|
|
@@ -397,7 +396,7 @@ class TaskManager:
|
|
tools=json.loads(agent_configuration.tools)
|
|
tools=json.loads(agent_configuration.tools)
|
|
)
|
|
)
|
|
|
|
|
|
- # 获取对话数据(与原始代码相同)
|
|
|
|
|
|
+ # 获取对话数据
|
|
conversation_data = self.dataset_service.get_conversation_data_by_id(
|
|
conversation_data = self.dataset_service.get_conversation_data_by_id(
|
|
task_conversation.conversation_id)
|
|
task_conversation.conversation_id)
|
|
user_profile_data = self.dataset_service.get_user_profile_data(
|
|
user_profile_data = self.dataset_service.get_user_profile_data(
|
|
@@ -413,7 +412,7 @@ class TaskManager:
|
|
)
|
|
)
|
|
conversations = sorted(conversations, key=lambda i: i['timestamp'], reverse=False)
|
|
conversations = sorted(conversations, key=lambda i: i['timestamp'], reverse=False)
|
|
|
|
|
|
- # 生成推送消息(与原始代码相同)
|
|
|
|
|
|
+ # 生成推送消息
|
|
last_timestamp = int(conversations[-1]["timestamp"])
|
|
last_timestamp = int(conversations[-1]["timestamp"])
|
|
push_time = int(last_timestamp / 1000) + 24 * 3600
|
|
push_time = int(last_timestamp / 1000) + 24 * 3600
|
|
push_dt = datetime.fromtimestamp(push_time).strftime('%Y-%m-%d %H:%M:%S')
|
|
push_dt = datetime.fromtimestamp(push_time).strftime('%Y-%m-%d %H:%M:%S')
|
|
@@ -487,108 +486,6 @@ class TaskManager:
|
|
del self.task_events[task_id]
|
|
del self.task_events[task_id]
|
|
if task_id in self.task_futures:
|
|
if task_id in self.task_futures:
|
|
del self.task_futures[task_id]
|
|
del self.task_futures[task_id]
|
|
- # def _process_task(self, task_id: int):
|
|
|
|
- # """处理任务的所有子任务"""
|
|
|
|
- # try:
|
|
|
|
- # # 更新任务状态为运行中
|
|
|
|
- # self.update_task_status(task_id, TestTaskStatus.IN_PROGRESS.value)
|
|
|
|
- #
|
|
|
|
- # # 获取所有待处理的子任务
|
|
|
|
- # task_conversations = self.get_pending_task_conversations(task_id)
|
|
|
|
- #
|
|
|
|
- # agent_configuration = self.get_agent_configuration_by_task_id(task_id)
|
|
|
|
- # query_prompt_template = agent_configuration.task_prompt
|
|
|
|
- # agent = MultiModalChatAgent(model=agent_configuration.execution_model,
|
|
|
|
- # system_prompt=agent_configuration.system_prompt,
|
|
|
|
- # tools=json.loads(agent_configuration.tools))
|
|
|
|
- # # 执行每个子任务
|
|
|
|
- # for task_conversation in task_conversations:
|
|
|
|
- # # 检查任务是否被取消
|
|
|
|
- # if self.task_events[task_id].is_set():
|
|
|
|
- # break
|
|
|
|
- #
|
|
|
|
- # # 更新子任务状态为运行中
|
|
|
|
- # if task_conversation.status == TestTaskConversationsStatus.PENDING.value:
|
|
|
|
- # self.update_task_conversations_status(task_conversation.id,
|
|
|
|
- # TestTaskConversationsStatus.RUNNING.value)
|
|
|
|
- # try:
|
|
|
|
- # conversation_data = self.dataset_service.get_conversation_data_by_id(
|
|
|
|
- # task_conversation.conversation_id)
|
|
|
|
- # user_profile_data = self.dataset_service.get_user_profile_data(conversation_data.user_id,
|
|
|
|
- # conversation_data.version_date.replace(
|
|
|
|
- # "-", ""))
|
|
|
|
- # user_profile = json.loads(user_profile_data['profile_data_v1'])
|
|
|
|
- # avatar = user_profile_data['iconurl']
|
|
|
|
- # staff_profile_data = self.dataset_service.get_staff_profile_data(
|
|
|
|
- # conversation_data.staff_id).agent_profile
|
|
|
|
- # conversations = self.dataset_service.get_chat_conversation_list_by_ids(
|
|
|
|
- # json.loads(conversation_data.conversation), conversation_data.staff_id)
|
|
|
|
- # conversations = sorted(conversations, key=lambda i: i['timestamp'], reverse=False)
|
|
|
|
- #
|
|
|
|
- # last_timestamp = int(conversations[-1]["timestamp"])
|
|
|
|
- # push_time = int(last_timestamp / 1000) + 24 * 3600
|
|
|
|
- # push_dt = datetime.fromtimestamp(push_time).strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
- # push_message = agent._generate_message(
|
|
|
|
- # context={
|
|
|
|
- # "formatted_staff_profile": staff_profile_data,
|
|
|
|
- # "nickname": user_profile['nickname'],
|
|
|
|
- # "name": user_profile['name'],
|
|
|
|
- # "avatar": avatar,
|
|
|
|
- # "preferred_nickname": user_profile['preferred_nickname'],
|
|
|
|
- # "gender": user_profile['gender'],
|
|
|
|
- # "age": user_profile['age'],
|
|
|
|
- # "region": user_profile['region'],
|
|
|
|
- # "health_conditions": user_profile['health_conditions'],
|
|
|
|
- # "medications": user_profile['medications'],
|
|
|
|
- # "interests": user_profile['interests'],
|
|
|
|
- # "current_datetime": push_dt
|
|
|
|
- # },
|
|
|
|
- # dialogue_history=conversations,
|
|
|
|
- # query_prompt_template=query_prompt_template
|
|
|
|
- # )
|
|
|
|
- # # TODO 获取打分
|
|
|
|
- # score = '{"score":0.05}'
|
|
|
|
- # # 更新子任务状态为已完成
|
|
|
|
- # self.update_task_conversations_res(task_conversation.id,
|
|
|
|
- # TestTaskConversationsStatus.SUCCESS.value,
|
|
|
|
- # json.dumps(conversations, ensure_ascii=False),
|
|
|
|
- # json.dumps(push_message, ensure_ascii=False),
|
|
|
|
- # score)
|
|
|
|
- # except Exception as e:
|
|
|
|
- # logger.error(f"Error executing task {task_id}: {str(e)}")
|
|
|
|
- # self.update_task_conversations_status(task_conversation.id,
|
|
|
|
- # TestTaskConversationsStatus.FAILED.value)
|
|
|
|
- #
|
|
|
|
- # # 检查任务是否完成
|
|
|
|
- # task_conversations = self.get_task_conversations(task_id)
|
|
|
|
- # all_completed = all(task_conversation.status in
|
|
|
|
- # (TestTaskConversationsStatus.SUCCESS.value, TestTaskConversationsStatus.FAILED.value)
|
|
|
|
- # for task_conversation in task_conversations)
|
|
|
|
- # any_pending = any(task_conversation.status in
|
|
|
|
- # (TestTaskConversationsStatus.PENDING.value, TestTaskConversationsStatus.RUNNING.value)
|
|
|
|
- # for task_conversation in task_conversations)
|
|
|
|
- #
|
|
|
|
- # if all_completed:
|
|
|
|
- # self.update_task_status(task_id, TestTaskStatus.COMPLETED.value)
|
|
|
|
- # logger.info(f"Task {task_id} completed")
|
|
|
|
- # elif not any_pending:
|
|
|
|
- # # 没有待处理子任务但未全部完成(可能是取消了)
|
|
|
|
- # current_status = self.get_task(task_id).status
|
|
|
|
- # if current_status != TestTaskStatus.CANCELLED.value:
|
|
|
|
- # self.update_task_status(task_id, TestTaskStatus.COMPLETED.value
|
|
|
|
- # if all_completed else TestTaskStatus.CANCELLED.value)
|
|
|
|
- # except Exception as e:
|
|
|
|
- # logger.error(f"Error executing task {task_id}: {str(e)}")
|
|
|
|
- # self.update_task_status(task_id, TestTaskStatus.FAILED.value)
|
|
|
|
- # finally:
|
|
|
|
- # # 清理资源
|
|
|
|
- # with self.task_locks[task_id]:
|
|
|
|
- # if task_id in self.running_tasks:
|
|
|
|
- # self.running_tasks.remove(task_id)
|
|
|
|
- # if task_id in self.task_events:
|
|
|
|
- # del self.task_events[task_id]
|
|
|
|
- # if task_id in self.task_futures:
|
|
|
|
- # del self.task_futures[task_id]
|
|
|
|
|
|
|
|
def shutdown(self):
|
|
def shutdown(self):
|
|
"""关闭执行器"""
|
|
"""关闭执行器"""
|