|
@@ -256,12 +256,27 @@ class TaskManager:
|
|
|
|
|
|
def cancel_task(self, task_id: int, user: str):
|
|
|
"""取消任务(带事务支持)"""
|
|
|
- # 设置取消事件
|
|
|
+ # 1. 设置取消事件(通知任务内部)
|
|
|
if task_id in self.task_events:
|
|
|
self.task_events[task_id].set()
|
|
|
- # 如果任务正在执行,尝试取消Future
|
|
|
- if task_id in self.task_futures:
|
|
|
- self.task_futures[task_id].cancel()
|
|
|
+
|
|
|
+ # 2. 尝试取消Future并等待任务停止
|
|
|
+ future = self.task_futures.get(task_id)
|
|
|
+ if future:
|
|
|
+ # 尝试取消(如果任务尚未开始执行)
|
|
|
+ cancelled_immediately = future.cancel()
|
|
|
+
|
|
|
+ if not cancelled_immediately:
|
|
|
+ # 任务已在执行,等待其响应取消事件
|
|
|
+ if not future.done():
|
|
|
+ try:
|
|
|
+ # 最多等待5秒让任务完成清理
|
|
|
+ future.result(timeout=5)
|
|
|
+ except concurrent.futures.TimeoutError:
|
|
|
+ logger.warning(f"Task {task_id} did not stop within 5 seconds, forcing cleanup")
|
|
|
+ except Exception as e:
|
|
|
+ logger.info(f"Task {task_id} stopped with exception: {e}")
|
|
|
+
|
|
|
|
|
|
with self.session_maker() as session:
|
|
|
with session.begin():
|