Procházet zdrojové kódy

解构增长头部视频

luojunhui před 1 týdnem
rodič
revize
5f37373fde

+ 4 - 12
app/domains/llm_tasks/decode_video/_utils.py

@@ -11,25 +11,17 @@ from app.infra.internal.piaoquan import fetch_piaoquan_video_list_detail
 class VideoDecodeUtils:
     # 获取检测的账号 list
     @staticmethod
-    def get_top_head_videos(duration="day"):
+    def get_top_head_videos(execute_dt):
         uv_threshold = 500
-        match duration:
-            case "day":
-                days = 1
-            case "history":
-                days = 7
-            case _:
-                return []
-        # dt = (datetime.today() - timedelta(days=days)).strftime("%Y%m%d")
-        dt = "20260420"
+        # dt = (datetime.today() - timedelta(days=1)).strftime("%Y%m%d")
+        dt = execute_dt
         query = f"""
             WITH top_video AS 
         (
             SELECT  dt, channel, 合作方名,包名,公众号名,hotsencetype, videoid, rootsourceid,title,`merge二级品类`
                     ,COUNT(DISTINCT mid) AS 访问uv
             FROM    loghubods.opengid_base_data
-            WHERE   dt >= '{dt}'
-            AND     dt <= '20260427'
+            WHERE   dt = '{dt}'
             AND     hotsencetype != 1167
             AND     videoid IS NOT NULL
             AND     usersharedepth = 0

+ 7 - 6
app/domains/llm_tasks/decode_video/decode_video_produce.py

@@ -2,6 +2,7 @@
 建立待解构视频 video_id 队列, 以视频的 oss_path 作为唯一视频粒度
 """
 
+from functools import partial
 from typing import Dict
 
 from app.core.database import DatabaseManager
@@ -48,7 +49,7 @@ class DecodeVideoProduce(VideoDecodeConst):
 
         else:
             await self.mapper.insert_into_decode_task_queue(
-                data=(video_path, video_id, "20260429")
+                data=(video_path, video_id, dt)
             )
 
     async def decode_daily_video(self, video_obj: Dict, dt: str):
@@ -108,23 +109,23 @@ class DecodeVideoProduce(VideoDecodeConst):
             dt=dt,
         )
 
-    async def process_single_video(self, video_obj: Dict):
-        dt = "20260427"
+    async def process_single_video(self, video_obj: Dict, dt):
         hot_scene_type = video_obj.get("hot_scene_type")
         if hot_scene_type == self.SceneType.DAILY_ARTICLE:
             return await self.decode_daily_video(video_obj, dt)
         else:
             return await self.decode_other_video(video_obj, dt)
 
-    async def deal(self):
-        odps_video_list = self.tool.get_top_head_videos(duration="history")
+    async def deal(self, execute_dt="20260401"):
+        odps_video_list = self.tool.get_top_head_videos(execute_dt=execute_dt)
         if not odps_video_list:
             return
         task_list = self.tool.process_odps_data(odps_video_list)
+        handler = partial(self.process_single_video, dt=execute_dt)
 
         await run_tasks_with_asyncio_task_group(
             task_list=task_list,
-            handler=self.process_single_video,
+            handler=handler,
             description="解构视频生产",
             unit="video",
             max_concurrency=10,