浏览代码

add task manager

luojunhui 17 小时之前
父节点
当前提交
92e78d4603
共有 2 个文件被更改,包括 18 次插入15 次删除
  1. 3 1
      applications/api/async_feishu_api.py
  2. 15 14
      applications/tasks/task_scheduler.py

+ 3 - 1
applications/api/async_feishu_api.py

@@ -194,7 +194,9 @@ class FeishuBotApi(Feishu):
         }
 
     # bot
-    async def bot(self, title, detail, mention=True, table=False, env="prod"):
+    async def bot(
+        self, title, detail, mention=True, table=False, env="long_articles_task"
+    ):
         match env:
             case "dev":
                 url = self.long_articles_bot_dev

+ 15 - 14
applications/tasks/task_scheduler.py

@@ -28,9 +28,10 @@ class TaskScheduler:
             start_timestamp = response[0]["start_timestamp"]
             # todo: every task should has a unique expire timestamp, remember to write that in a task config file
             if int(time.time()) - start_timestamp >= 86400:
-                feishu_robot.bot(
+                await feishu_robot.bot(
                     title=f"{task_name} has been processing for more than one day",
                     detail={"timestamp": start_timestamp},
+                    env="long_articles_task",
                 )
             return True
 
@@ -80,16 +81,18 @@ class TaskScheduler:
         if not date_string:
             date_string = datetime.today().strftime("%Y-%m-%d")
 
-        match task_name:
-            case "check_kimi_balance":
-                if await self.whether_task_processing(task_name):
-                    return await task_schedule_response.fail_response(
-                        error_code="5001", error_message="task is processing"
-                    )
+        # prepare for task
+        if await self.whether_task_processing(task_name):
+            return await task_schedule_response.fail_response(
+                error_code="5001", error_message="task is processing"
+            )
+
+        await self.record_task(task_name=task_name, date_string=date_string)
 
-                await self.record_task(task_name=task_name, date_string=date_string)
+        await self.lock_task(task_name, date_string)
 
-                await self.lock_task(task_name, date_string)
+        match task_name:
+            case "check_kimi_balance":
                 response = await check_kimi_balance()
                 await self.log_client.log(
                     contents={
@@ -100,14 +103,11 @@ class TaskScheduler:
                         "data": response,
                     }
                 )
-                await self.release_task(
-                    task_name=task_name,
-                    date_string=date_string,
-                    final_status=response["code"],
-                )
+                await self.release_task(task_name, date_string, response["code"])
                 return await task_schedule_response.success_response(
                     task_name=task_name, data=response
                 )
+
             case _:
                 await self.log_client.log(
                     contents={
@@ -118,6 +118,7 @@ class TaskScheduler:
                         "data": self.data,
                     }
                 )
+                await self.release_task(task_name, date_string, 99)
                 return await task_schedule_response.fail_response(
                     error_code="4001", error_message="wrong task name input"
                 )