Browse Source

add get_off_videos task

luojunhui 11 hours ago
parent
commit
ec3d195a4a

+ 25 - 10
applications/tasks/monitor_tasks/get_off_videos.py

@@ -27,24 +27,37 @@ class GetOffVideos(GetOffVideosConst):
         self.log_client = log_client
         self.table = "get_off_videos"
 
-    async def get_task_list(self, earliest_timestamp_threshold, expire_timestamp_threshold):
+    async def get_task_list(
+        self, earliest_timestamp_threshold, expire_timestamp_threshold
+    ):
         """get videos which need get off"""
         earliest_timestamp_threshold = 0
         query = f"""
             select video_id from {self.table} where video_status = %s and publish_time between %s and %s;
         """
-        video_list, error = await self.db_client.async_fetch(query, params=(self.VIDEO_AVAILABLE_STATUS, earliest_timestamp_threshold, expire_timestamp_threshold))
+        video_list, error = await self.db_client.async_fetch(
+            query,
+            params=(
+                self.VIDEO_AVAILABLE_STATUS,
+                earliest_timestamp_threshold,
+                expire_timestamp_threshold,
+            ),
+        )
         return video_list
 
     async def update_video_status(self, video_id):
         query = f"""
             update {self.table} set video_status = %s, get_off_time = %s where video_id = %s;
         """
-        return await self.db_client.async_save(query, params=(self.VIDEO_DISABLE_STATUS, int(time.time()), video_id))
+        return await self.db_client.async_save(
+            query, params=(self.VIDEO_DISABLE_STATUS, int(time.time()), video_id)
+        )
 
     async def update_video_audit_status(self, video_id):
         """use pq api to update video status"""
-        response = await change_video_audit_status(video_id, self.VIDEO_AUDIT_FAIL_STATUS)
+        response = await change_video_audit_status(
+            video_id, self.VIDEO_AUDIT_FAIL_STATUS
+        )
         await self.update_video_status(video_id)
         return response
 
@@ -52,9 +65,11 @@ class GetOffVideos(GetOffVideosConst):
         """get off videos out of expire time"""
         expire_timestamp_threshold = int(time.time()) - self.EXPIRE_TIME
         earliest_timestamp_threshold = int(time.time()) - self.EARLIEST_TIME
-        task_list = await self.get_task_list(earliest_timestamp_threshold, expire_timestamp_threshold)
+        task_list = await self.get_task_list(
+            earliest_timestamp_threshold, expire_timestamp_threshold
+        )
         for task in tqdm(task_list):
-            video_id = task['video_id']
+            video_id = task["video_id"]
             try:
                 await self.update_video_audit_status(video_id)
 
@@ -68,8 +83,8 @@ class GetOffVideos(GetOffVideosConst):
                         "data": {
                             "video_id": video_id,
                             "error": str(e),
-                            "traceback": traceback.format_exc()
-                        }
+                            "traceback": traceback.format_exc(),
+                        },
                     }
                 )
 
@@ -82,9 +97,9 @@ class GetOffVideos(GetOffVideosConst):
                 title="自动下架视频失败",
                 detail={
                     "total_video": len(task_list),
-                    "video_list": [i['video_id'] for i in task_list]
+                    "video_list": [i["video_id"] for i in task_list],
                 },
-                mention=False
+                mention=False,
             )
             return self.TASK_FAILED_STATUS
         else:

+ 19 - 9
applications/tasks/task_scheduler.py

@@ -1,3 +1,4 @@
+import asyncio
 import time
 from datetime import datetime
 
@@ -54,9 +55,10 @@ class TaskScheduler:
         任务执行完成之后,将任务状态设置为完成状态/失败状态
         """
         query = f"""
-            update {self.table} set task_status = %s, finish_timestamp = %s
-            where task_name = %s and date_string = %s and task_status = %s;
-        """
+               update {self.table} 
+               set task_status = %s, finish_timestamp = %s
+               where task_name = %s and date_string = %s and task_status = %s;
+           """
         return await self.db_client.async_save(
             query=query,
             params=(final_status, int(time.time()), task_name, date_string, 1),
@@ -104,18 +106,26 @@ class TaskScheduler:
                         "data": response,
                     }
                 )
-                await self.release_task(task_name, date_string, response["code"])
                 return await task_schedule_response.success_response(
                     task_name=task_name, data=response
                 )
 
             case "get_off_videos":
-                sub_task = GetOffVideos(self.db_client, self.log_client)
-                await sub_task.get_off_job()
-                response_code = await sub_task.check()
-                await self.release_task(task_name, date_string, response_code)
+
+                async def background_get_off_videos():
+                    sub_task = GetOffVideos(self.db_client, self.log_client)
+                    await sub_task.get_off_job()
+                    task_status = await sub_task.check()
+                    await self.release_task(
+                        task_name=task_name,
+                        date_string=date_string,
+                        final_status=task_status,
+                    )
+
+                asyncio.create_task(background_get_off_videos())
                 return await task_schedule_response.success_response(
-                    task_name=task_name, data={"code": response_code, "message": "get off_videos finished"}
+                    task_name=task_name,
+                    data={"code": 0, "message": "get off_videos started background"},
                 )
 
             case _:

+ 5 - 0
routes/blueprint.py

@@ -21,4 +21,9 @@ def server_routes(pools, log_service):
         response = await task_scheduler.deal()
         return jsonify(response)
 
+    @server_blueprint.route("/finish_task", methods=["POST"])
+    async def finish_task():
+        data = await request.get_json()
+        return jsonify({"message": "hello world"})
+
     return server_blueprint