ソースを参照

解构增长头部视频

luojunhui 1 週間 前
コミット
00406fe1f6

+ 3 - 0
app/core/config/global_settings.py

@@ -26,6 +26,9 @@ class GlobalConfigSettings(BaseSettings):
         default_factory=PiaoquanCrawlerDatabaseConfig
     )
     growth_db: GrowthDatabaseConfig = Field(default_factory=GrowthDatabaseConfig)
+    content_decode_db: ContentDeconstructionSupplyConfig = Field(
+        default_factory=ContentDeconstructionSupplyConfig
+    )
 
     # ============ 外部服务配置 ============
     deepseek: DeepSeekConfig = Field(default_factory=DeepSeekConfig)

+ 2 - 0
app/core/config/settings/__init__.py

@@ -10,6 +10,7 @@ from .mysql import GrowthDatabaseConfig
 from .mysql import LongArticlesDatabaseConfig
 from .mysql import LongVideoDatabaseConfig
 from .mysql import PiaoquanCrawlerDatabaseConfig
+from .mysql import ContentDeconstructionSupplyConfig
 from .read_rate_limited import ReadRateLimited
 from .task_chinese_name import TaskChineseNameConfig
 
@@ -29,4 +30,5 @@ __ALL__ = [
     "PiaoquanCrawlerDatabaseConfig",
     "TaskChineseNameConfig",
     "ReadRateLimited",
+    "ContentDeconstructionSupplyConfig",
 ]

+ 14 - 0
app/core/config/settings/mysql.py

@@ -103,3 +103,17 @@ class GrowthDatabaseConfig(DatabaseConfig):
     model_config = SettingsConfigDict(
         env_prefix="GROWTH_DB_", env_file=".env", case_sensitive=False, extra="ignore"
     )
+
+
+class ContentDeconstructionSupplyConfig(DatabaseConfig):
+    """解构数据库配置"""
+
+    host: str = "rm-t4nh1xx6o2a6vj8qu3o.mysql.singapore.rds.aliyuncs.com"
+    user: str = "content_rw"
+    password: str = "bC1aH4bA1lB0"
+    db: str = "content-deconstruction-supply"
+
+    model_config = SettingsConfigDict(
+        env_prefix="CONTENT_DECONSREUCTION_DB_", env_file=".env", case_sensitive=False, extra="ignore"
+    )
+

+ 1 - 0
app/core/database/mysql_pools.py

@@ -19,6 +19,7 @@ class DatabaseManager(LogService):
             "long_video": config.long_video_db,
             "long_articles": config.long_articles_db,
             "piaoquan_crawler": config.piaoquan_crawler_db,
+            "content_decode": config.content_decode_db,
         }
         self.pools = {}
 

+ 2 - 0
app/domains/llm_tasks/__init__.py

@@ -2,10 +2,12 @@ from .process_title import TitleRewrite
 from .process_title import ArticlePoolCategoryGeneration
 from .process_title import ExtractTitleFeatures
 from .candidate_account_process import CandidateAccountQualityScoreRecognizer
+from .decode_video import DecodeVideoProduce
 
 __all__ = [
     "TitleRewrite",
     "CandidateAccountQualityScoreRecognizer",
     "ArticlePoolCategoryGeneration",
     "ExtractTitleFeatures",
+    "DecodeVideoProduce"
 ]

+ 5 - 0
app/domains/llm_tasks/decode_video/__init__.py

@@ -1 +1,6 @@
 from .decode_video_produce import DecodeVideoProduce
+
+
+__all__ = [
+    "DecodeVideoProduce"
+]

+ 11 - 1
app/domains/llm_tasks/decode_video/_mapper.py

@@ -1,7 +1,9 @@
 from app.core.database import DatabaseManager
 
+from ._const import VideoDecodeConst
 
-class VideoDecodeMapper:
+
+class VideoDecodeMapper(VideoDecodeConst):
     def __init__(self, pool: DatabaseManager):
         self.pool = pool
 
@@ -69,3 +71,11 @@ class VideoDecodeMapper:
             query=query,
             params=data,
         )
+
+    async def is_video_decoded(self, video_id: int):
+        query = """
+            SELECT vid FROM aigc_topic_decode_task_result WHERE vid = %s AND status = %s;
+        """
+        return await self.pool.async_fetch(
+            query=query, db_name="content_decode", params=(video_id, self.TaskStatus.SUCCESS)
+        )

+ 7 - 5
app/domains/llm_tasks/decode_video/_utils.py

@@ -1,7 +1,7 @@
 import json
-from typing import List, Dict
-from datetime import datetime, timedelta
+from typing import List
 
+from datetime import datetime, timedelta
 from odps.types import Record
 
 from app.infra.shared.tools import fetch_from_odps
@@ -13,15 +13,13 @@ class VideoDecodeUtils:
     @staticmethod
     def get_top_head_videos(execute_dt):
         uv_threshold = 500
-        # dt = (datetime.today() - timedelta(days=1)).strftime("%Y%m%d")
-        dt = execute_dt
         query = f"""
             WITH top_video AS 
         (
             SELECT  dt, channel, 合作方名,包名,公众号名,hotsencetype, videoid, rootsourceid,title,`merge二级品类`
                     ,COUNT(DISTINCT mid) AS 访问uv
             FROM    loghubods.opengid_base_data
-            WHERE   dt = '{dt}'
+            WHERE   dt = '{execute_dt}'
             AND     hotsencetype != 1167
             AND     videoid IS NOT NULL
             AND     usersharedepth = 0
@@ -82,3 +80,7 @@ class VideoDecodeUtils:
         response = await fetch_piaoquan_video_list_detail(video_list=[video_id])
         video_path = response["data"][0]["ossVideoPath"]
         return video_path
+
+    @staticmethod
+    def get_yesterday_dt():
+        return (datetime.today() - timedelta(days=1)).strftime("%Y%m%d")

+ 10 - 1
app/domains/llm_tasks/decode_video/decode_video_produce.py

@@ -7,6 +7,8 @@ import traceback
 from functools import partial
 from typing import Dict
 
+from _testcapi import awaitType
+
 from app.core.database import DatabaseManager
 from app.core.observability import LogService
 
@@ -155,6 +157,10 @@ class DecodeVideoProduce(VideoDecodeConst):
 
     async def process_single_video(self, video_obj: Dict, dt, trace_id: str):
         video_id = int(video_obj["video_id"])
+        # 判断视频是否解构过,如果有解构结果,那么直接 return
+        if await self.mapper.is_video_decoded(video_id):
+            return
+
         hot_scene_type = video_obj.get("hot_scene_type")
         try:
             if hot_scene_type == self.SceneType.DAILY_ARTICLE:
@@ -181,7 +187,10 @@ class DecodeVideoProduce(VideoDecodeConst):
             )
             raise
 
-    async def deal(self, execute_dt="20260401"):
+    async def deal(self, execute_dt=None):
+        if not execute_dt:
+            execute_dt = self.tool.get_yesterday_dt()
+
         trace_id = generate_task_trace_id()
         start_time = time.time()
 

+ 2 - 0
app/jobs/domains/llm_task.py

@@ -6,6 +6,7 @@ from app.domains.llm_tasks import TitleRewrite
 from app.domains.llm_tasks import ArticlePoolCategoryGeneration
 from app.domains.llm_tasks import CandidateAccountQualityScoreRecognizer
 from app.domains.llm_tasks import ExtractTitleFeatures
+from app.domains.llm_tasks import DecodeVideoProduce
 
 __all__ = [
     "CreateAdPlatformArticlesDecodeTask",
@@ -16,4 +17,5 @@ __all__ = [
     "ArticlePoolCategoryGeneration",
     "CandidateAccountQualityScoreRecognizer",
     "ExtractTitleFeatures",
+    "DecodeVideoProduce",
 ]

+ 7 - 0
app/jobs/task_handler.py

@@ -357,6 +357,13 @@ class TaskHandler:
         await task.deal(data=self.data)
         return TaskStatus.SUCCESS
 
+    @register("produce_decode_growth_head_videos")
+    async def _produce_decode_growth_head_videos(self) -> int:
+        """生产增长头部待解构视频"""
+        task = DecodeVideoProduce(pool=self.db_client, log_service=self.log_client)
+        await task.deal()
+        return TaskStatus.SUCCESS
+
     # ==================== 统计分析类任务 ====================
 
     @register("update_account_read_rate_avg")