luojunhui пре 3 недеља
родитељ
комит
a9403c5e11

+ 6 - 2
app/ab_test/get_cover.py

@@ -27,7 +27,9 @@ class GetCoverService(Response):
     async def fetch_cover_info(self, pool_name, channel_content_id: str):
         match pool_name:
             case "aigc":
-                fetch_response = await AigcDatabaseMapper.fetch_aigc_cover(self.pool, channel_content_id)
+                fetch_response = await AigcDatabaseMapper.fetch_aigc_cover(
+                    self.pool, channel_content_id
+                )
                 if fetch_response:
                     image_oss = fetch_response[0]["oss_object_key"]
                     if image_oss:
@@ -66,7 +68,9 @@ class GetCoverService(Response):
         if video_index == 2:
             return await self.fetch_cover_info("long_video", seed_video_id)
 
-        channel_info = await AigcDatabaseMapper.fetch_channel_info(self.pool, content_id)
+        channel_info = await AigcDatabaseMapper.fetch_channel_info(
+            self.pool, content_id
+        )
         if not channel_info:
             return self.error_response(
                 error_code="402",

+ 4 - 1
app/core/config/settings/apollo.py

@@ -16,7 +16,10 @@ class ApolloConfig(BaseSettings):
                 "pre": "http://preapolloconfig-internal.piaoquantv.com/",
                 "dev": "https://devapolloconfig-internal.piaoquantv.com/",
                 "prod": "https://apolloconfig-internal.piaoquantv.com/",
-            }
+            },
+            "longarticle-recommend": {
+                "prod": "https://apolloconfig-internal.piaoquantv.com/"
+            },
         }
     )
 

+ 1 - 1
app/domains/data_recycle_tasks/recycle_daily_publish_articles.py

@@ -49,7 +49,7 @@ class Const:
         "gh_ff487cb5dab3",
         "gh_ac43eb24376d",
         "gh_b15de7c99912",
-        "gh_56ca3dae948c"
+        "gh_56ca3dae948c",
     ]
 
     # NOT USED SERVER ACCOUNT

+ 4 - 2
app/domains/monitor_tasks/auto_reply_cards_monitor.py

@@ -95,7 +95,7 @@ class AutoReplyCardsMonitorUtils(AutoReplyCardsMonitorConst):
                 "account_name": item.findtext("sources/source/name"),
                 "gh_id": "",
                 "desc": "",
-                "msg_type": msg_type
+                "msg_type": msg_type,
             }
             items.append(data)
 
@@ -739,7 +739,9 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorMapper):
                                 await self.store_card(task_id, index, msg_type, xml_obj)
 
                             case "5":
-                                await self.store_article(task_id, index, msg_type, xml_obj)
+                                await self.store_article(
+                                    task_id, index, msg_type, xml_obj
+                                )
 
                             case _:
                                 continue

+ 3 - 1
app/domains/monitor_tasks/gzh_article_monitor.py

@@ -262,7 +262,9 @@ class InnerGzhArticlesMonitor(MonitorConst):
             where publish_timestamp >= %s
             order by publish_timestamp desc;
         """
-        response = await self.pool.async_fetch(query=query, db_name="piaoquan_crawler", params=(start_timestamp,))
+        response = await self.pool.async_fetch(
+            query=query, db_name="piaoquan_crawler", params=(start_timestamp,)
+        )
         if not response:
             await feishu_robot.bot(
                 title="站内微信公众号发文监测任务异常",

+ 2 - 0
app/infra/mapper/__init__.py

@@ -1,2 +1,4 @@
 from .aigc_mapper import AigcDatabaseMapper
 from .long_video_mapper import LongVideoDatabaseMapper
+from .long_article_mapper import LongArticleDatabaseMapper
+from .piaoquan_crawler_mapper import PiaoquanCrawlerDatabaseMapper

+ 19 - 0
app/infra/mapper/long_article_mapper.py

@@ -0,0 +1,19 @@
+from typing import Set, List
+
+from app.core.database import DatabaseManager
+
+
+class LongArticleDatabaseMapper:
+    class Basic:
+        @staticmethod
+        async def get_unsafe_articles(pool: DatabaseManager) -> Set[str]:
+            query = """
+                    SELECT title FROM article_unsafe_title WHERE status = 1;
+                """
+            response = await pool.async_fetch(query=query)
+            return set([i["title"] for i in response])
+
+    class Recommend(Basic):
+        @staticmethod
+        async def get_top_articles(pool: DatabaseManager, query: str, gh_id: str):
+            return await pool.async_fetch(query=query, params=(gh_id,))

+ 12 - 1
app/infra/mapper/piaoquan_crawler_mapper.py

@@ -1,4 +1,4 @@
-from typing import List, Dict
+from typing import List, Dict, Set
 
 from app.core.database import DatabaseManager
 
@@ -35,3 +35,14 @@ class PiaoquanCrawlerDatabaseMapper:
         return await pool.async_save(
             query=query, params=(0,), db_name="piaoquan_crawler"
         )
+
+    # 获取账号的历史发文
+    @staticmethod
+    async def get_published_articles(pool: DatabaseManager, gh_id: str) -> Set[str]:
+        query = """
+               SELECT title FROM official_articles_v2 WHERE ghId = %s AND ItemIndex = %s;
+           """
+        response = await pool.async_fetch(
+            query=query, db_name="piaoquan_crawler", params=(gh_id, 1)
+        )
+        return set([i["title"] for i in response])

+ 9 - 0
app/infra/shared/tools.py

@@ -235,6 +235,15 @@ def fetch_from_odps(query):
             return []
 
 
+def init_odps_client():
+    return ODPS(
+        access_id="LTAIWYUujJAm7CbH",
+        secret_access_key="RfSjdiWwED1sGFlsjXv0DlfTnZTG1P",
+        endpoint="http://service.cn.maxcompute.aliyun.com/api",
+        project="loghubods",
+    )
+
+
 def upload_to_oss(local_video_path, oss_key):
     """
     把视频上传到 oss

+ 180 - 0
app/recommend/offline_recommend/core.py

@@ -0,0 +1,180 @@
+from typing import Dict, Set, List
+
+from app.core.config import GlobalConfigSettings
+from app.core.database import DatabaseManager
+from app.core.observability import LogService
+
+from app.infra.mapper import LongArticleDatabaseMapper
+from app.infra.mapper import PiaoquanCrawlerDatabaseMapper
+from app.infra.shared.tools import init_odps_client
+
+from app.recommend.offline_recommend.strategy import I2I
+from app.recommend.offline_recommend.strategy import GetTopArticleStrategy
+from app.recommend.offline_recommend.utils import RecommendApolloClient
+
+
+class BaseOffRecommendUtils:
+    def __init__(
+        self,
+        pool: DatabaseManager,
+        config: GlobalConfigSettings,
+    ):
+        self.pool = pool
+        self.recommend_apollo_client = RecommendApolloClient(config=config)
+        self.odps_client = init_odps_client()
+
+    # read from odps
+    async def read_from_odps(self, query: str) -> List:
+        with self.odps_client.execute_sql(query).open_reader() as reader:
+            if reader:
+                return [item for item in reader]
+            else:
+                return []
+
+    # 获取全局过滤标题
+    async def get_global_filter_title(self) -> Set[str]:
+        unsafe_titles_set: Set[
+            str
+        ] = await LongArticleDatabaseMapper.Recommend.get_unsafe_articles(self.pool)
+        apollo_unsafe_titles: List[
+            str
+        ] = await self.recommend_apollo_client.get_unsafe_titles_from_apollo()
+        apollo_bad_titles: List[
+            str
+        ] = await self.recommend_apollo_client.get_bad_titles_from_apollo()
+        unsafe_titles_set.update(apollo_unsafe_titles)
+        unsafe_titles_set.update(apollo_bad_titles)
+        return unsafe_titles_set
+
+    # 获取账号的 Top 文章
+    async def get_account_top_articles(
+        self, gh_id: str, strategy: str
+    ) -> List[Dict[str, str]]:
+        match strategy:
+            # 后续拓展策略
+            case _:
+                query = GetTopArticleStrategy.base()
+
+        top_articles = await LongArticleDatabaseMapper.Recommend.get_top_articles(
+            pool=self.pool, query=query, gh_id=gh_id
+        )
+
+        return top_articles
+
+    # 获取标题的推荐文章
+    async def get_recommend_articles(self, title: str, strategy: str) -> List:
+        match strategy:
+            # 后续拓展策略
+            case _:
+                query = I2I.base(title)
+
+        recommend_articles = await self.read_from_odps(query)
+        return recommend_articles
+
+    # 获取一批标题的推荐标题
+    async def get_recommend_articles_for_batch_titles(self, title_list: List[str], strategy: str) -> List[Dict[str, str]]:
+        match strategy:
+            case "v1":
+                query = I2I.batch_summary(title_list)
+
+            case _:
+                query = I2I.batch_base(title_list)
+
+        print(query)
+
+        recommend_articles = await self.read_from_odps(query)
+        return recommend_articles
+
+
+class BaseOfflineRecommend(BaseOffRecommendUtils):
+    def __init__(
+        self,
+        pool: DatabaseManager,
+        config: GlobalConfigSettings,
+        log_service: LogService,
+    ):
+        super().__init__(pool, config)
+        self.pool = pool
+        self.log_service = log_service
+        self.filter_title: Set[str] = set()
+        self.filter_keys: List[str] = []
+
+    # 解析策略base 的数据结构
+    def extract_base(self, account_info, recommend_articles, published_titles: Set[str]):
+        account_name = account_info["account_name"]
+        gh_id = account_info["gh_id"]
+        candidate_articles: List[Dict] = [
+            {
+                "account_name": account_name,
+                "gh_id": gh_id,
+                "source_title": item.src_title,
+                "recommend_title": item.rec_title,
+                "collinear_cnt": item.collinear_cnt,
+                "base_cnt": item.base_cnt,
+                "collinear_ratio": item.rec_collinear_ratio,
+            }
+            for item in recommend_articles
+            if item.rec_title
+               and item.rec_title not in self.filter_title
+               and item.rec_title not in published_titles
+        ]
+        return candidate_articles
+
+    # 解析策略v1的数据结构
+    def extract_v1(self, account_info, recommend_articles, published_titles: Set[str]):
+        account_name = account_info["account_name"]
+        gh_id = account_info["gh_id"]
+        candidate_articles: List[Dict] = [
+            {
+                "account_name": account_name,
+                "gh_id": gh_id,
+                "recommend_title": item.rec_title,
+                "collinear_cnt": item.total_collinear_cnt,
+                "base_cnt": item.total_base_cnt,
+                "collinear_ratio": item.rec_collinear_ratio,
+            }
+            for item in recommend_articles
+            if item.rec_title
+               and item.rec_title not in self.filter_title
+               and item.rec_title not in published_titles
+        ]
+        return candidate_articles
+
+    # 初始化类的时候,加载全局过滤标题
+    async def init_filter_titles(self):
+        self.filter_title = await self.get_global_filter_title()
+
+    # recommend for each account
+    async def recommend_for_account(
+        self,
+        account_info: Dict,
+        strategy: str,
+        published_titles: Set[str],
+    ):
+        gh_id: str = account_info["gh_id"]
+        top_articles = await self.get_account_top_articles(gh_id, strategy)
+        top_titles = [i['title'] for i in top_articles]
+
+        recommend_articles = await self.get_recommend_articles_for_batch_titles(top_titles, strategy)
+        match strategy:
+            case "v1":
+                return self.extract_v1(
+                    account_info, recommend_articles, published_titles
+                )
+
+            case _:
+                return self.extract_base(
+                    account_info, recommend_articles, published_titles
+                )
+
+
+    async def deal(self, account_info: Dict[str, str], strategy: str):
+        gh_id: str = account_info["gh_id"]
+
+        published_titles: Set[
+            str
+        ] = await PiaoquanCrawlerDatabaseMapper.get_published_articles(self.pool, gh_id)
+        recommend_articles = await self.recommend_for_account(
+            account_info, strategy, published_titles
+        )
+        return recommend_articles

+ 5 - 0
app/recommend/offline_recommend/strategy/__init__.py

@@ -0,0 +1,5 @@
+from .i2i import I2I
+from .get_top_article import GetTopArticleStrategy
+
+
+__all__ = ["GetTopArticleStrategy", "I2I"]

+ 2 - 0
app/recommend/offline_recommend/strategy/base.py

@@ -0,0 +1,2 @@
+class BaseStrategy:
+    pass

+ 29 - 0
app/recommend/offline_recommend/strategy/get_top_article.py

@@ -0,0 +1,29 @@
+from .base import BaseStrategy
+
+
+class GetTopArticleStrategy(BaseStrategy):
+    @staticmethod
+    def base() -> str:
+        query = """
+            SELECT title, sum(view_count) as total_view_count, sum(fans) as total_fan_count
+            FROM datastat_sort_strategy
+            WHERE position = 1 and gh_id = %s AND date_str >= '20250501' AND view_count > 1000
+            GROUP BY title
+            ORDER BY sum(view_count) / sum(fans) DESC
+            LIMIT 25;
+        """
+        return query
+
+    @staticmethod
+    def strategy_v1() -> str:
+        query = """
+            SELECT date_str, title, view_count from datastat_sort_strategy where position = 1 and gh_id = %s
+        """
+        return query
+
+    @staticmethod
+    def strategy_v2() -> str:
+        query = """
+            SELECT date_str, title, view_count from datastat_sort_strategy where position = 1 and gh_id = %s
+        """
+        return query

+ 67 - 0
app/recommend/offline_recommend/strategy/i2i.py

@@ -0,0 +1,67 @@
+from .base import BaseStrategy
+
+
+class I2I(BaseStrategy):
+
+    @staticmethod
+    def base(title: str, limit: int = 50) -> str:
+        query = f"""
+        -- ① 源标题(可限定,强烈建议)
+        WITH src_title_user AS (
+            SELECT DISTINCT title AS 源标题, union_id
+            FROM loghubods.title_union_id_info
+            WHERE title = '{title}'
+        ),
+        -- ② 源标题 → 联想标题 共现
+        co_occur AS (
+            SELECT
+                s.源标题, t.title AS 联想标题, COUNT(*) AS 联想次数
+            FROM src_title_user s
+            JOIN loghubods.title_union_id_info t ON s.union_id = t.union_id
+            WHERE s.源标题 != t.title
+            GROUP BY s.源标题, t.title
+        ),
+        -- ③ 联想标题的 uid 覆盖量(独立算)
+        title_uid_cnt AS (
+            SELECT
+                title, COUNT(DISTINCT union_id) AS 联想标题_uid数量
+            FROM loghubods.title_union_id_info
+            GROUP BY title
+        )
+        -- ④ 合并
+        SELECT c.源标题, c.联想标题, c.联想次数, u.联想标题_uid数量, c.联想次数 / (u.联想标题_uid数量 + 1000) AS 联想标题_uid覆盖率
+        FROM co_occur c
+        LEFT JOIN title_uid_cnt u ON c.联想标题 = u.title
+        ORDER BY 联想标题_uid覆盖率 DESC
+        LIMIT {limit};
+        """
+        return query
+
+    @staticmethod
+    def batch_base(title_list, limit: int = 1000):
+        title_tuple = tuple(title_list)
+        query = f"""
+            SELECT  src_title, rec_title, collinear_cnt, base_cnt, rec_collinear_ratio
+            FROM    loghubods.t2i_records
+            WHERE   src_title IN {title_tuple}
+            AND data_version = 'v1' ORDER BY rec_collinear_ratio DESC
+            LIMIT {limit};
+        """
+        return query
+
+    @staticmethod
+    def batch_summary(title_list, limit: int = 500):
+        title_tuple = tuple(title_list)
+        query = f"""
+            SELECT  rec_title
+                    ,SUM(collinear_cnt) AS total_collinear_cnt
+                    ,SUM(base_cnt) AS total_base_cnt
+                    ,SUM(collinear_cnt) / (SUM(base_cnt) + 1000) AS rec_collinear_ratio
+            FROM    loghubods.t2i_records
+            WHERE   src_title IN {title_tuple}
+            AND     data_version = 'v3'
+            GROUP BY rec_title
+            ORDER BY rec_collinear_ratio DESC
+            LIMIT {limit};
+        """
+        return query

+ 4 - 0
app/recommend/offline_recommend/utils/__init__.py

@@ -0,0 +1,4 @@
+from .recommend_apollo import RecommendApolloClient
+
+
+__all__ = ["RecommendApolloClient"]

+ 18 - 0
app/recommend/offline_recommend/utils/recommend_apollo.py

@@ -0,0 +1,18 @@
+from app.infra.external import AsyncApolloApi
+from app.core.config import GlobalConfigSettings
+
+
+class RecommendApolloClient:
+    def __init__(self, config: GlobalConfigSettings):
+        self.apollo_client = AsyncApolloApi(
+            apollo_config=config.apollo, app_id="longarticle-recommend", env="prod"
+        )
+
+    async def get_unsafe_titles_from_apollo(self):
+        return await self.apollo_client.get_config_value(key="UnSafeTitles")
+
+    async def get_unsafe_keywords_from_apollo(self):
+        return self.apollo_client.get_config_value(key="keywords")
+
+    async def get_bad_titles_from_apollo(self):
+        return await self.apollo_client.get_config_value(key="badTitles")