Browse Source

Merge branch '2024-10-21-rerank-by-fission-on-read' of Server/title_with_video into 2024-09-23newDbTasks

luojunhui 5 months ago
parent
commit
309a168fb9
3 changed files with 119 additions and 5 deletions
  1. 3 0
      applications/config/__init__.py
  2. 81 0
      applications/match_algorithm/rank.py
  3. 35 5
      tasks/history_task.py

+ 3 - 0
applications/config/__init__.py

@@ -25,6 +25,7 @@ class Config(object):
                 self.article_crawler_video_table = "long_articles_crawler_videos"
                 self.root_source_id_table = "long_articles_root_source_id"
                 self.get_off_video_table = "get_off_videos"
+                self.fission_detail_table = "long_articles_videos_fission_info"
             case "dev":
                 self.apollo_connection = pyapollos.ApolloClient(
                     app_id="LongArticlesMatchServer",
@@ -36,6 +37,7 @@ class Config(object):
                 self.article_crawler_video_table = "long_articles_crawler_videos_copy1"
                 self.root_source_id_table = "long_articles_root_source_id_copy1"
                 self.get_off_video_table = "get_off_videos_copy1"
+                self.fission_detail_table = "long_articles_videos_fission_info_copy1"
             case "pre":
                 self.apollo_connection = pyapollos.ApolloClient(
                     app_id="LongArticlesMatchServer",
@@ -47,6 +49,7 @@ class Config(object):
                 self.article_crawler_video_table = "long_articles_crawler_videos"
                 self.root_source_id_table = "long_articles_root_source_id"
                 self.get_off_video_table = "get_off_videos"
+                self.fission_detail_table = "long_articles_videos_fission_info"
 
     def get_config_value(self, key):
         """

+ 81 - 0
applications/match_algorithm/rank.py

@@ -1,6 +1,8 @@
 """
 @author: luojunhui
 """
+from datetime import datetime, timedelta
+
 from applications.match_algorithm.title_similarity import jcd_title_similarity
 
 
@@ -51,3 +53,82 @@ def title_similarity_rank(content_title, recall_list):
         include_title_list.append(item)
     sorted_list = sorted(include_title_list, key=lambda x: x['score'], reverse=True)
     return sorted_list
+
+
+async def get_content_oss_fission_dict(db_client, config, content_id) -> dict[str: float]:
+    """
+    通过 content_id 对应的 oss 路径对应的裂变表现进行排序
+    oss 数据每天凌晨 2 点更新
+    :return:
+    """
+    FISSION_DETAIL_TABLE = config.fission_detail_table
+    two_days_ago_dt = (datetime.now() - timedelta(days=2)).strftime('%Y%m%d')
+    sql = f"""
+        SELECT 
+            oss_name, fission_rate_0, fission_0_on_read
+        FROM
+            {FISSION_DETAIL_TABLE}
+        WHERE content_id = '{content_id}' and dt >= '{two_days_ago_dt}'
+        ORDER BY dt DESC;
+    """
+    result = await db_client.async_select(sql)
+    fission_info_dict = {}
+    if result:
+        for item in result:
+            key = item[0]
+            value = {
+                "fission_rate_0": item[1],
+                "fission_0_on_read": item[2]
+            }
+            if fission_info_dict.get(key):
+                continue
+            else:
+                fission_info_dict[key] = value
+        return fission_info_dict
+    else:
+        return {}
+
+
+async def get_title_oss_fission_list(db_client, config, content_id) -> list[dict]:
+    """
+    通过 content_id 对应的 oss 路径对应的裂变表现进行排序
+    oss 数据每天凌晨 2 点更新
+    todo: 获取有数据的最新dt
+    :return:
+    """
+    FISSION_DETAIL_TABLE = config.fission_detail_table
+    LONG_ARTICLES_TEXT_TABLE = config.article_text_table
+    LONG_ARTICLES_CRAWLER_TABLE = config.article_crawler_video_table
+    # two_days_ago_dt = (datetime.now() - timedelta(days=2)).strftime('%Y%m%d')
+    sql = f"""
+        SELECT
+            lavfi.oss_name, lavfi.fission_0_on_read, lacv.platform, lacv.cover_oss_path, lacv.user_id
+        FROM
+            {FISSION_DETAIL_TABLE} lavfi
+        JOIN {LONG_ARTICLES_CRAWLER_TABLE} lacv on lavfi.oss_name = lacv.video_oss_path
+        WHERE title = (
+            SELECT article_title
+            FROM {LONG_ARTICLES_TEXT_TABLE}
+            WHERE content_id = '{content_id}'
+            );
+        AND lavfi.dt = (
+            SELECT MAX(dt)
+            FROM long_articles_videos_fission_info
+            WHERE oss_name = lavfi.oss_name
+            )
+    """
+    result = await db_client.async_select(sql)
+    fission_info_list = []
+    if result:
+        for item in result:
+            obj = {
+                "platform": item[2],
+                "video_oss_path": item[0],
+                "cover_oss_path": item[3],
+                "uid": item[4],
+                "fission_0_on_read": item[1],
+            }
+            fission_info_list.append(obj)
+        return sorted(fission_info_list, key=lambda x: x['fission_0_on_read'], reverse=True)
+    else:
+        return []

+ 35 - 5
tasks/history_task.py

@@ -9,6 +9,7 @@ from applications.config import Config
 from applications.log import logging
 from applications.functions.pqFunctions import publish_to_pq, get_pq_video_detail
 from applications.functions.common import shuffle_list
+from applications.match_algorithm.rank import get_title_oss_fission_list
 
 
 class historyContentIdTask(object):
@@ -72,7 +73,7 @@ class historyContentIdTask(object):
         )
         return task_obj_list
 
-    async def get_video_list(self, content_id):
+    async def get_video_list(self, content_id) -> list[dict]:
         """
         content_id
         :return:
@@ -137,9 +138,10 @@ class historyContentIdTask(object):
         )
         return row_counts
 
-    async def publish_videos_to_pq(self, trace_id, flow_pool_level, kimi_title, gh_id, download_videos, process_times):
+    async def publish_videos_to_pq(self, trace_id, flow_pool_level, kimi_title, gh_id, download_videos, process_times, content_id):
         """
         发布至 pq
+        :param content_id:
         :param process_times:
         :param trace_id:
         :param download_videos: 已下载的视频---> list [{}, {}, {}.... ]
@@ -161,8 +163,35 @@ class historyContentIdTask(object):
             case "autoArticlePoolLevel2":
                 video_list = []
             case "autoArticlePoolLevel1":
-                # 头条,先不做
-                video_list = download_videos[:3]
+                # 头条内容,使用重排后结果
+                fission_resort_list = await get_title_oss_fission_list(
+                    db_client=self.mysql_client,
+                    config=self.config,
+                    content_id=content_id
+                )
+                if fission_resort_list:
+                    total_video_list = fission_resort_list + download_videos
+                    logging(
+                        code=1106,
+                        info="查找裂变信息成功",
+                        trace_id=trace_id,
+                        data={
+                            "ori_list": download_videos[:3],
+                            "fission_list": fission_resort_list
+                        }
+                    )
+                    video_list = total_video_list[:3]
+                else:
+                    # 未找到裂变信息,采用原来的顺序
+                    logging(
+                        code=1107,
+                        info="查找裂变信息失败",
+                        trace_id=trace_id,
+                        data={
+                            "ori_list": download_videos[:3]
+                        }
+                    )
+                    video_list = download_videos[:3]
             case _:
                 print("未传流量池信息")
                 video_list = download_videos[:3]
@@ -300,7 +329,8 @@ class historyContentIdTask(object):
                     gh_id=gh_id,
                     trace_id=trace_id,
                     download_videos=download_videos,
-                    process_times=process_times
+                    process_times=process_times,
+                    content_id=content_id
                 )
             except Exception as e:
                 logging(