Browse Source

develop fission rerank

luojunhui 3 months ago
parent
commit
f2c1dc3efd

+ 2 - 0
applications/const/server_const.py

@@ -71,4 +71,6 @@ class ServerConst:
     FAIL_CODE = -1
     PARAMS_CHECK_FAILED_CODE = -2
 
+    # oss rerank account uid
+    RE_RANK_UID = 77975938
 

+ 44 - 0
applications/functions/get_history_oss_path.py

@@ -0,0 +1,44 @@
+from typing import Dict, List
+
+import aiohttp
+
+from applications.const import server_const
+
+
+async def request_for_fission_info(content_id: str) -> Dict:
+    """
+    异步获取带有 fission 信息的已发布 OSS 路径列表。
+    :param content_id: 内容的唯一标识符
+    :return: 包含 OSS 路径列表及相关信息的字典
+    """
+    url = f"http://{server_const.NEW_SERVER_PUBLIC_IP}:{server_const.PORT}/oss_rank"
+    async with aiohttp.ClientSession() as session:
+        try:
+            async with session.post(
+                url,
+                json={"contentId": content_id},
+                headers={"Content-Type": "application/json"},
+            ) as response:
+                response.raise_for_status()
+                return await response.json()
+        except aiohttp.ClientError as e:
+            print(f"请求失败: {e}")
+            return {}
+        except Exception as e:
+            print(f"发生错误: {e}")
+            return {}
+
+
+async def get_history_oss_path(content_id: str) -> List:
+    """
+    获取content的oss_路径
+    """
+    fission_info = await request_for_fission_info(content_id=content_id)
+    if not fission_info:
+        return []
+    else:
+        response_code = fission_info["code"]
+        if response_code == server_const.SUCCESS_CODE:
+            return fission_info['oss_path_list']
+        else:
+            return []

+ 8 - 2
applications/match_algorithm/rank_by_fission_on_read.py

@@ -26,12 +26,18 @@ async def get_content_id_fission_info(content_id_tuple: tuple[str], db_client, v
             ON l.oss_name = r.oss_path
         WHERE 
             l.content_id IN {content_id_tuple}
-            AND r.oss_path IS NULL 
+            AND r.oss_path IS NULL
         ORDER BY l.fission_0_on_read DESC 
         LIMIT {video_limit};
     """
     response = await db_client.async_select(select_sql, DictCursor)
-    # print("get_content_id_fission_info", response)
+
+    # 过滤 fission_0_on_read为null的,或者为0的
+    if response:
+        response = [
+            i for i in response if i['fission_0_on_read']
+        ]
+
     return response
 
 

+ 3 - 2
server/api/oss_rank.py

@@ -57,8 +57,9 @@ class OssRank:
                 if oss_path_result:
                     oss_path_list = [
                         {
-                            "oss_name": i['oss_name'],
-                            "fission_0_on_read": i['fission_0_on_read']
+                            "video_oss_path": i['oss_name'],
+                            "fission_0_on_read": i['fission_0_on_read'],
+                            "uid": server_const.RE_RANK_UID           
                         }
                         for i in oss_path_result
                     ]

+ 13 - 7
tasks/history_task.py

@@ -11,6 +11,7 @@ from applications.config import Config
 from applications.const import HistoryContentIdTaskConst
 from applications.log import logging
 from applications.functions.pqFunctions import publish_to_pq, get_pq_video_detail
+from applications.functions.get_history_oss_path import get_history_oss_path
 from applications.functions.common import shuffle_list
 from applications.functions.aigc import record_trace_id
 
@@ -86,7 +87,9 @@ class historyContentIdTask(object):
         sql = f"""
         SELECT platform, play_count, like_count, video_oss_path, cover_oss_path, user_id
         FROM {self.article_crawler_video_table}
-        WHERE content_id = '{content_id}' and download_status = {self.const.VIDEO_DOWNLOAD_SUCCESS_STATUS}
+        WHERE content_id = '{content_id}' 
+            AND download_status = {self.const.VIDEO_DOWNLOAD_SUCCESS_STATUS}
+            AND is_illegal = {self.const.VIDEO_SAFE}
         ORDER BY score DESC;
         """
         res_tuple = await self.mysql_client.async_select(sql)
@@ -143,11 +146,12 @@ class historyContentIdTask(object):
         )
         return row_counts
 
-    async def publish_videos_to_pq(self, trace_id, flow_pool_level, kimi_title, gh_id, download_videos, process_times):
+    async def publish_videos_to_pq(self, trace_id, content_id, flow_pool_level, kimi_title, gh_id, download_videos, process_times):
         """
         发布至 pq
         :param process_times:
         :param trace_id:
+        :param content_id:
         :param download_videos: 已下载的视频---> list [{}, {}, {}.... ]
         :param gh_id: 公众号 id ---> str
         :param kimi_title: kimi 标题 ---> str
@@ -167,10 +171,13 @@ class historyContentIdTask(object):
             case "autoArticlePoolLevel2":
                 video_list = []
             case "autoArticlePoolLevel1":
-                # 头条,先不做
+                # 头条, 利用oss裂变效果重排序
+                oss_path_list = await get_history_oss_path(content_id=content_id)
+                if oss_path_list:
+                    download_videos[:0] = oss_path_list
+
                 video_list = download_videos[:3]
             case _:
-                print("未传流量池信息")
                 video_list = download_videos[:3]
         L = []
         for video_obj in video_list:
@@ -182,7 +189,6 @@ class historyContentIdTask(object):
             publish_response = await publish_to_pq(params)
             video_id = publish_response['data']['id']
             response = await get_pq_video_detail(video_id)
-            # time.sleep(2)
             obj = {
                 "uid": video_obj['uid'],
                 "source": video_obj['platform'],
@@ -366,7 +372,6 @@ class historyContentIdTask(object):
                 return
 
         download_videos = await self.get_video_list(content_id=content_id)
-        # time.sleep(3)
         if download_videos:
             # 修改状态为执行状态,获取该任务的锁
             affected_rows = await self.update_content_status(
@@ -391,12 +396,13 @@ class historyContentIdTask(object):
                 )
                 return
             try:
-                kimi_title = await self.get_kimi_title(content_id)
+                kimi_title = await self.get_kimi_title(content_id)                   
                 await self.publish_videos_to_pq(
                     flow_pool_level=flow_pool_level,
                     kimi_title=kimi_title,
                     gh_id=gh_id,
                     trace_id=trace_id,
+                    content_id=content_id,
                     download_videos=download_videos,
                     process_times=process_times
                 )