#74 2025-01-07 裂变率重拍上线

开启中
luojunhui 请求将 4 次代码提交从 Server/2025-01-04-fission-rerank 合并至 Server/master

+ 10 - 0
applications/const/server_const.py

@@ -62,5 +62,15 @@ class ServerConst:
     TASK_FAIL_CODE = 99
     TASK_PROCESSING_CODE = 101
 
+    # oss_rank
+    VIDEO_LIMIT = 3
 
 
+    # response code
+    SUCCESS_CODE = 0
+    FAIL_CODE = -1
+    PARAMS_CHECK_FAILED_CODE = -2
+
+    # oss rerank account uid
+    RE_RANK_UID = 77975938
+

+ 45 - 0
applications/functions/get_history_oss_path.py

@@ -0,0 +1,45 @@
+from typing import Dict, List
+
+import aiohttp
+
+from applications.const import server_const
+
+
+async def request_for_fission_info(content_id: str) -> Dict:
+    """
+    异步获取带有 fission 信息的已发布 OSS 路径列表。
+    :param content_id: 内容的唯一标识符
+    :return: 包含 OSS 路径列表及相关信息的字典
+    """
+    url = f"http://{server_const.NEW_SERVER_PUBLIC_IP}:{server_const.PORT}/oss_rank"
+    async with aiohttp.ClientSession() as session:
+        try:
+            async with session.post(
+                url,
+                json={"contentId": content_id},
+                headers={"Content-Type": "application/json"},
+                timeout=5
+            ) as response:
+                response.raise_for_status()
+                return await response.json()
+        except aiohttp.ClientError as e:
+            print(f"请求失败: {e}")
+            return {}
+        except Exception as e:
+            print(f"发生错误: {e}")
+            return {}
+
+
+async def get_history_oss_path(content_id: str) -> List:
+    """
+    获取content的oss_路径
+    """
+    fission_info = await request_for_fission_info(content_id=content_id)
+    if not fission_info:
+        return []
+    else:
+        response_code = fission_info["code"]
+        if response_code == server_const.SUCCESS_CODE:
+            return fission_info['oss_path_list']
+        else:
+            return []

+ 3 - 1
applications/match_algorithm/__init__.py

@@ -1,4 +1,6 @@
 """
 @author: luojunhui
 匹配算法
-"""
+"""
+
+from .rank_by_fission_on_read import *

+ 66 - 0
applications/match_algorithm/rank_by_fission_on_read.py

@@ -0,0 +1,66 @@
+"""
+@author: luojunhui
+@software: vscode
+@file: rank_by_fission_on_read.py
+@time: 2025-01-04
+"""
+
+from typing import Dict, List
+
+from aiomysql.cursors import DictCursor
+
+
+async def get_content_id_fission_info(content_id_tuple: tuple[str], db_client, video_limit: int) -> List[Dict]:
+    """
+    获取内容id的裂变/阅读信息
+    :param content_id:
+    :return:
+    """
+    select_sql = f"""
+        SELECT 
+            l.oss_name, l.read_total, l.fission_level_0_total, l.fission_0_on_read 
+        FROM 
+            long_articles_videos_fission_info l 
+        LEFT JOIN 
+            article_re_match_record r 
+            ON l.oss_name = r.oss_path
+        WHERE 
+            l.content_id IN {content_id_tuple}
+            AND r.oss_path IS NULL
+        ORDER BY l.fission_0_on_read DESC 
+        LIMIT {video_limit};
+    """
+    response = await db_client.async_select(select_sql, DictCursor)
+
+    # 过滤 fission_0_on_read为null的,或者为0的
+    if response:
+        response = [
+            i for i in response if i['fission_0_on_read']
+        ]
+
+    return response
+
+
+async def get_history_content_ids(content_id: str, db_client) -> tuple[str]:
+    """
+    Retrieve historical content IDs related to the given content ID.
+    :param content_id: The ID of the content for which related content IDs are to be retrieved.
+    :param db_client: The database client used to execute the query.
+    :return: A tuple containing the historical content IDs.
+    """
+    select_sql = f"""
+        SELECT 
+            DISTINCT c2.content_id
+        FROM crawler_produce_id_map c
+        JOIN article_pool_promotion_source ap1 
+            ON c.channel_content_id = ap1.channel_content_id
+        JOIN article_pool_promotion_source ap2
+            ON ap1.root_produce_content_id = ap2.root_produce_content_id
+        JOIN crawler_produce_id_map c2
+            ON ap2.channel_content_id = c2.channel_content_id
+        WHERE c.content_id = '{content_id}';
+    """
+    response = await db_client.async_select(select_sql, DictCursor)
+    # print("get_history_content_ids", response) 
+    response_tuple = tuple(item["content_id"] for item in response)
+    return response_tuple

+ 2 - 1
server/api/__init__.py

@@ -4,4 +4,5 @@
 from .get_off_videos import GetOffVideos
 from .minigram import Minigram
 from .response import Response
-from .record import Record
+from .record import Record
+from .oss_rank import OssRank

+ 79 - 0
server/api/oss_rank.py

@@ -0,0 +1,79 @@
+"""
+@author luojunhui
+@description: find top fission / read oss path list
+"""
+
+from applications.const import server_const
+from applications.match_algorithm import get_content_id_fission_info
+from applications.match_algorithm import get_history_content_ids
+
+
+class OssRank:
+    """
+    input: content_id
+    output: find the top fission / read oss path list
+    """
+    def __init__(self, db_client, params):
+        self.db_client = db_client
+        self.params = params
+        self.content_id = None
+
+    async def check_params(self):
+        """
+        check params
+        """
+        try:
+            self.content_id = self.params['contentId']
+            self.video_limit = self.params.get("videoLimit", server_const.VIDEO_LIMIT)
+            return
+        except KeyError as e:
+            result = {
+                "error": str(e),
+                "message": "invalid params",
+                "data": self.params,
+                "code": server_const.PARAMS_CHECK_FAILED_CODE
+            }
+            return result
+        
+    async def deal(self):
+        """
+        entrance of this class
+        """
+        params_error = await self.check_params()
+        if params_error:
+            return params_error
+        else:
+            history_content_tuple = await get_history_content_ids(
+                content_id=self.content_id, 
+                db_client=self.db_client
+                )
+            
+            if history_content_tuple:
+                oss_path_result = await get_content_id_fission_info(
+                    content_id_tuple=history_content_tuple, 
+                    db_client=self.db_client,
+                    video_limit=self.video_limit
+                    )
+                if oss_path_result:
+                    oss_path_list = [
+                        {
+                            "video_oss_path": i['oss_name'],
+                            "fission_0_on_read": i['fission_0_on_read'],
+                            "uid": server_const.RE_RANK_UID           
+                        }
+                        for i in oss_path_result
+                    ]
+                    code = server_const.SUCCESS_CODE
+                else:
+                    oss_path_list = []
+                    code = server_const.FAIL_CODE
+                
+                return {
+                    "oss_path_list": oss_path_list,
+                    "code": code
+                }
+            else:
+                return {
+                    "oss_path_list": [],
+                    "code": server_const.FAIL_CODE
+                }

+ 12 - 2
server/routes.py

@@ -3,7 +3,7 @@
 """
 from quart import Blueprint, jsonify, request
 
-from server.api import Response, Record, Minigram, GetOffVideos
+from server.api import Response, Record, Minigram, GetOffVideos, OssRank
 
 my_blueprint = Blueprint('LongArticlesMatchServer', __name__)
 
@@ -68,5 +68,15 @@ def Routes(mysql_client, config):
         get_off_video = GetOffVideos(params=data, mysql_client=mysql_client, config=config)
         result = await get_off_video.deal()
         return jsonify(result)
-
+    
+    @my_blueprint.route("/oss_rank", methods=['POST'])
+    async def get_video_oss_rank():
+        """
+        获取content_id以及溯源id的oss路径信息并且通过 fission / read 排序
+        """
+        data = await request.get_json()
+        oss_rank = OssRank(params=data, db_client=mysql_client)
+        result = await oss_rank.deal()
+        return jsonify(result)
+    
     return my_blueprint

+ 13 - 7
tasks/history_task.py

@@ -11,6 +11,7 @@ from applications.config import Config
 from applications.const import HistoryContentIdTaskConst
 from applications.log import logging
 from applications.functions.pqFunctions import publish_to_pq, get_pq_video_detail
+from applications.functions.get_history_oss_path import get_history_oss_path
 from applications.functions.common import shuffle_list
 from applications.functions.aigc import record_trace_id
 
@@ -86,7 +87,9 @@ class historyContentIdTask(object):
         sql = f"""
         SELECT platform, play_count, like_count, video_oss_path, cover_oss_path, user_id
         FROM {self.article_crawler_video_table}
-        WHERE content_id = '{content_id}' and download_status = {self.const.VIDEO_DOWNLOAD_SUCCESS_STATUS}
+        WHERE content_id = '{content_id}' 
+            AND download_status = {self.const.VIDEO_DOWNLOAD_SUCCESS_STATUS}
+            AND is_illegal = {self.const.VIDEO_SAFE}
         ORDER BY score DESC;
         """
         res_tuple = await self.mysql_client.async_select(sql)
@@ -143,11 +146,12 @@ class historyContentIdTask(object):
         )
         return row_counts
 
-    async def publish_videos_to_pq(self, trace_id, flow_pool_level, kimi_title, gh_id, download_videos, process_times):
+    async def publish_videos_to_pq(self, trace_id, content_id, flow_pool_level, kimi_title, gh_id, download_videos, process_times):
         """
         发布至 pq
         :param process_times:
         :param trace_id:
+        :param content_id:
         :param download_videos: 已下载的视频---> list [{}, {}, {}.... ]
         :param gh_id: 公众号 id ---> str
         :param kimi_title: kimi 标题 ---> str
@@ -167,10 +171,13 @@ class historyContentIdTask(object):
             case "autoArticlePoolLevel2":
                 video_list = []
             case "autoArticlePoolLevel1":
-                # 头条,先不做
+                # 头条, 利用oss裂变效果重排序
+                oss_path_list = await get_history_oss_path(content_id=content_id)
+                if oss_path_list:
+                    download_videos[:0] = oss_path_list
+
                 video_list = download_videos[:3]
             case _:
-                print("未传流量池信息")
                 video_list = download_videos[:3]
         L = []
         for video_obj in video_list:
@@ -182,7 +189,6 @@ class historyContentIdTask(object):
             publish_response = await publish_to_pq(params)
             video_id = publish_response['data']['id']
             response = await get_pq_video_detail(video_id)
-            # time.sleep(2)
             obj = {
                 "uid": video_obj['uid'],
                 "source": video_obj['platform'],
@@ -366,7 +372,6 @@ class historyContentIdTask(object):
                 return
 
         download_videos = await self.get_video_list(content_id=content_id)
-        # time.sleep(3)
         if download_videos:
             # 修改状态为执行状态,获取该任务的锁
             affected_rows = await self.update_content_status(
@@ -391,12 +396,13 @@ class historyContentIdTask(object):
                 )
                 return
             try:
-                kimi_title = await self.get_kimi_title(content_id)
+                kimi_title = await self.get_kimi_title(content_id)                   
                 await self.publish_videos_to_pq(
                     flow_pool_level=flow_pool_level,
                     kimi_title=kimi_title,
                     gh_id=gh_id,
                     trace_id=trace_id,
+                    content_id=content_id,
                     download_videos=download_videos,
                     process_times=process_times
                 )