浏览代码

add get_off_videos task

luojunhui 5 小时之前
父节点
当前提交
b40981f3c8

+ 2 - 2
app_config.toml

@@ -1,6 +1,6 @@
 reload = true
 bind = "0.0.0.0:6060"
-workers = 4
+workers = 6
 keep_alive_timeout = 120  # 保持连接的最大秒数,根据需要调整
 graceful_timeout = 30    # 重启或停止之前等待当前工作完成的时间
-loglevel = "debug"  # 日志级别
+loglevel = "warning"  # 日志级别

+ 6 - 3
applications/database/mysql_pools.py

@@ -1,7 +1,10 @@
+import logging
 from aiomysql import create_pool
 from aiomysql.cursors import DictCursor
 from applications.config import *
 
+logging.basicConfig(level=logging.INFO)
+
 
 class DatabaseManager:
     def __init__(self):
@@ -31,9 +34,9 @@ class DatabaseManager:
                     autocommit=True,
                 )
                 self.pools[db_name] = pool
-                print(f"✅ Created connection pool for {db_name}")
+                logging.info(f"Created connection pool for {db_name}")
             except Exception as e:
-                print(f"❌ Failed to create pool for {db_name}: {str(e)}")
+                logging.error(f"Failed to create pool for {db_name}: {str(e)}")
                 self.pools[db_name] = None
 
     async def close_pools(self):
@@ -41,7 +44,7 @@ class DatabaseManager:
             if pool:
                 pool.close()
                 await pool.wait_closed()
-                print(f"🔌 Closed connection pool for {name}")
+                logging.info("🔌 Closed connection pool for {name}")
 
     async def async_fetch(
         self, query, db_name="long_articles", params=None, cursor_type=DictCursor

+ 1 - 0
applications/tasks/monitor_tasks/__init__.py

@@ -1,2 +1,3 @@
 from .kimi_balance import check_kimi_balance
 from .get_off_videos import GetOffVideos
+from .get_off_videos import CheckVideoAuditStatus

+ 81 - 4
applications/tasks/monitor_tasks/get_off_videos.py

@@ -1,9 +1,11 @@
 import time
 import traceback
+from typing import List, Optional
 
 from tqdm import tqdm
 
 from applications.api import change_video_audit_status
+from applications.api import fetch_piaoquan_video_list_detail
 from applications.api import feishu_robot
 
 
@@ -15,23 +17,29 @@ class GetOffVideosConst:
 
     # VIDEO_AUDIT
     VIDEO_AUDIT_FAIL_STATUS = 2
+    VIDEO_AUDIT_SUCCESS_STATUS = 5
 
     # Task code
     TASK_SUCCESS_STATUS = 2
     TASK_FAILED_STATUS = 99
 
+    # check status
+    CHECK_INIT_STATUS = 0
+    CHECK_FINISHED_STATUS = 1
+
+    # table
+    table = "get_off_videos"
+
 
 class GetOffVideos(GetOffVideosConst):
     def __init__(self, db_client, log_client):
         self.db_client = db_client
         self.log_client = log_client
-        self.table = "get_off_videos"
 
     async def get_task_list(
-        self, earliest_timestamp_threshold, expire_timestamp_threshold
+        self, earliest_timestamp_threshold: int, expire_timestamp_threshold: int
     ):
         """get videos which need get off"""
-        earliest_timestamp_threshold = 0
         query = f"""
             select video_id from {self.table} where video_status = %s and publish_time between %s and %s;
         """
@@ -74,7 +82,7 @@ class GetOffVideos(GetOffVideosConst):
                 await self.update_video_audit_status(video_id)
 
             except Exception as e:
-                self.log_client.log(
+                await self.log_client.log(
                     contents={
                         "task": "get_off_videos",
                         "function": "get_off_job",
@@ -104,3 +112,72 @@ class GetOffVideos(GetOffVideosConst):
             return self.TASK_FAILED_STATUS
         else:
             return self.TASK_SUCCESS_STATUS
+
+
+class CheckVideoAuditStatus(GetOffVideosConst):
+    def __init__(self, db_client, log_client):
+        self.db_client = db_client
+        self.log_client = log_client
+
+    async def get_video_list_status(self, video_list: List[int]):
+        response = await fetch_piaoquan_video_list_detail(video_list)
+        video_detail_list = response.get("data", [])
+        if video_detail_list:
+            bad_video_list = [
+                i["id"]
+                for i in video_detail_list
+                if i["auditStatus"] != self.VIDEO_AUDIT_SUCCESS_STATUS
+            ]
+        else:
+            bad_video_list = []
+
+        return bad_video_list
+
+    async def get_unchecked_video_list(self) -> Optional[List[int]]:
+        """find unchecked videos"""
+        query = f"""
+            select video_id from {self.table} where check_status = %s and video_status = %s limit 1000;
+        """
+        video_id_list, error = await self.db_client.async_fetch(
+            query, params=(self.CHECK_INIT_STATUS, self.VIDEO_AVAILABLE_STATUS)
+        )
+        if error:
+            print("error", error)
+            return None
+        else:
+            return [i["video_id"] for i in video_id_list]
+
+    async def update_check_status(self, video_list: List[int]):
+        query = f"""update {self.table} set check_status = %s where video_id in %s;"""
+        return await self.db_client.async_save(
+            query, params=(self.CHECK_FINISHED_STATUS, tuple(video_list))
+        )
+
+    async def deal(self):
+        def chuck_iterator(arr, chunk_size):
+            for i in range(0, len(arr), chunk_size):
+                yield arr[i : i + chunk_size]
+
+        video_id_list = await self.get_unchecked_video_list()
+        video_chunks = chuck_iterator(video_id_list, 10)
+
+        bad_videos_count = 0
+        fail_list = []
+        for video_chunk in video_chunks:
+            bad_video_id_list = await self.get_video_list_status(video_chunk)
+            if bad_video_id_list:
+                bad_videos_count += len(bad_video_id_list)
+                for bad_video_id in tqdm(bad_video_id_list):
+                    response = await change_video_audit_status(bad_video_id)
+                    if not response:
+                        fail_list.append(bad_video_id)
+
+            await self.update_check_status(video_chunk)
+
+        if fail_list:
+            await feishu_robot.bot(
+                title="校验已发布视频状态出现错误", detail=fail_list, mention=False
+            )
+            return self.TASK_FAILED_STATUS
+        else:
+            return self.TASK_SUCCESS_STATUS

+ 23 - 0
applications/tasks/task_scheduler.py

@@ -6,6 +6,7 @@ from applications.api import feishu_robot
 from applications.utils import task_schedule_response
 from applications.tasks.monitor_tasks import check_kimi_balance
 from applications.tasks.monitor_tasks import GetOffVideos
+from applications.tasks.monitor_tasks import CheckVideoAuditStatus
 
 
 class TaskScheduler:
@@ -128,6 +129,28 @@ class TaskScheduler:
                     data={"code": 0, "message": "get off_videos started background"},
                 )
 
+            case "check_publish_video_audit_status":
+
+                async def background_check_publish_video_audit_status():
+                    sub_task = CheckVideoAuditStatus(self.db_client, self.log_client)
+                    print("start processing task status: ")
+                    task_status = await sub_task.deal()
+                    await self.release_task(
+                        task_name=task_name,
+                        date_string=date_string,
+                        final_status=task_status,
+                    )
+                    print("finish task status: ", task_status)
+
+                asyncio.create_task(background_check_publish_video_audit_status())
+                return await task_schedule_response.success_response(
+                    task_name=task_name,
+                    data={
+                        "code": 0,
+                        "message": "check publish video audit status started",
+                    },
+                )
+
             case _:
                 await self.log_client.log(
                     contents={

+ 1 - 1
requirements.txt

@@ -11,5 +11,5 @@ aliyun-python-sdk-core
 aliyun-python-sdk-kms
 odps~=3.5.1
 apscheduler~=3.10.4
-tqdm
+tqdm~=4.66.6
 pyapollos~=0.1.5

+ 10 - 6
task_app.py

@@ -1,3 +1,5 @@
+import logging
+
 from quart import Quart
 from applications.config import aliyun_log_config
 from applications.database import mysql_manager
@@ -10,20 +12,22 @@ app = Quart(__name__)
 routes = server_routes(mysql_manager, log_service)
 app.register_blueprint(routes)
 
+logging.basicConfig(level=logging.INFO)
+
 
 @app.before_serving
 async def startup():
-    print("Starting application...")
+    logging.info("Starting application...")
     await mysql_manager.init_pools()
-    print("Mysql pools init successfully")
+    logging.info("Mysql pools init successfully")
     await log_service.start()
-    print("aliyun log service init successfully")
+    logging.info("aliyun log service init successfully")
 
 
 @app.after_serving
 async def shutdown():
-    print("Shutting down application...")
+    logging.info("Shutting down application...")
     await mysql_manager.close_pools()
-    print("Mysql pools close successfully")
+    logging.info("Mysql pools close successfully")
     await log_service.stop()
-    print("aliyun log service stop successfully")
+    logging.info("aliyun log service stop successfully")