瀏覽代碼

Merge branch 'feature/luojunhui/20260211-low-read-title-forbidden' of Server/LongArticleTaskServer into master

luojunhui 3 周之前
父節點
當前提交
4ff0774347

+ 6 - 2
app/ab_test/get_cover.py

@@ -27,7 +27,9 @@ class GetCoverService(Response):
     async def fetch_cover_info(self, pool_name, channel_content_id: str):
         match pool_name:
             case "aigc":
-                fetch_response = await AigcDatabaseMapper.fetch_aigc_cover(self.pool, channel_content_id)
+                fetch_response = await AigcDatabaseMapper.fetch_aigc_cover(
+                    self.pool, channel_content_id
+                )
                 if fetch_response:
                     image_oss = fetch_response[0]["oss_object_key"]
                     if image_oss:
@@ -66,7 +68,9 @@ class GetCoverService(Response):
         if video_index == 2:
             return await self.fetch_cover_info("long_video", seed_video_id)
 
-        channel_info = await AigcDatabaseMapper.fetch_channel_info(self.pool, content_id)
+        channel_info = await AigcDatabaseMapper.fetch_channel_info(
+            self.pool, content_id
+        )
         if not channel_info:
             return self.error_response(
                 error_code="402",

+ 1 - 0
app/core/config/global_settings.py

@@ -40,6 +40,7 @@ class GlobalConfigSettings(BaseSettings):
     task_chinese_name: TaskChineseNameConfig = Field(
         default_factory=TaskChineseNameConfig
     )
+    read_rate_limit: ReadRateLimited = Field(default_factory=ReadRateLimited)
 
     model_config = SettingsConfigDict(
         env_file=".env", env_file_encoding="utf-8", case_sensitive=False, extra="ignore"

+ 2 - 0
app/core/config/settings/__init__.py

@@ -9,6 +9,7 @@ from .mysql import GrowthDatabaseConfig
 from .mysql import LongArticlesDatabaseConfig
 from .mysql import LongVideoDatabaseConfig
 from .mysql import PiaoquanCrawlerDatabaseConfig
+from .read_rate_limited import ReadRateLimited
 from .task_chinese_name import TaskChineseNameConfig
 
 
@@ -25,4 +26,5 @@ __ALL__ = [
     "LongVideoDatabaseConfig",
     "PiaoquanCrawlerDatabaseConfig",
     "TaskChineseNameConfig",
+    "ReadRateLimited"
 ]

+ 4 - 1
app/core/config/settings/apollo.py

@@ -16,7 +16,10 @@ class ApolloConfig(BaseSettings):
                 "pre": "http://preapolloconfig-internal.piaoquantv.com/",
                 "dev": "https://devapolloconfig-internal.piaoquantv.com/",
                 "prod": "https://apolloconfig-internal.piaoquantv.com/",
-            }
+            },
+            "longarticle-recommend": {
+                "prod": "https://apolloconfig-internal.piaoquantv.com/"
+            },
         }
     )
 

+ 23 - 0
app/core/config/settings/read_rate_limited.py

@@ -0,0 +1,23 @@
+from pydantic import Field
+from pydantic_settings import BaseSettings, SettingsConfigDict
+from typing import List
+
+
+class ReadRateLimited(BaseSettings):
+    """限流配置"""
+
+    # 统计周期
+    stat_durations: List[int] = Field(
+        default_factory=lambda: [
+           2, 10, 30, 60, 90, 120, 150, 365
+        ]
+    )
+
+    # 认为满足限流阅读均值倍数阈值
+    read_on_avg_threshold: float = 0.2
+
+    # 探索次数
+    base_discover_time: int = 1
+
+    # 认为文章限流占比阈值
+    low_read_rate_threshold: float = 0.5

+ 2 - 1
app/domains/analysis_task/__init__.py

@@ -2,11 +2,12 @@ from .crawler_detail import CrawlerDetailDeal
 from .account_position_info import AccountPositionReadRateAvg
 from .account_position_info import AccountPositionReadAvg
 from .account_position_info import AccountPositionOpenRateAvg
-
+from .rate_limited_article_filter import RateLimitedArticleFilter
 
 __all__ = [
     "CrawlerDetailDeal",
     "AccountPositionReadRateAvg",
     "AccountPositionReadAvg",
     "AccountPositionOpenRateAvg",
+    "RateLimitedArticleFilter"
 ]

+ 110 - 0
app/domains/analysis_task/rate_limited_article_filter.py

@@ -0,0 +1,110 @@
+import asyncio
+import datetime
+import hashlib, json
+from typing import List, Dict
+from tqdm.asyncio import tqdm
+
+from app.core.config import GlobalConfigSettings
+from app.core.database import DatabaseManager
+from app.core.observability import LogService
+
+from app.infra.internal import delete_illegal_gzh_articles
+
+
+class RateLimitedArticleMapper:
+    def __init__(self, pool: DatabaseManager):
+        self.pool = pool
+
+    async def find_rate_limited_articles(
+        self,
+        days_duration: int,
+        read_on_avg_threshold: float = 0.2,
+        base_discover_time: int = 1,
+        low_read_rate_threshold: float = 0.5,
+    ) -> List[Dict]:
+        query = f"""
+            SELECT
+                title,
+                gh_id,
+                COUNT(*) AS publish_count,
+                CAST(
+                    SUM(CASE WHEN read_rate < {read_on_avg_threshold} THEN 1 ELSE 0 END) AS UNSIGNED
+                )
+                AS low_read_count
+            FROM datastat_sort_strategy
+            WHERE position = 1
+              AND account_type = '订阅号'
+              AND date_str >= DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL {days_duration} DAY), '%Y%m%d')
+              AND date_str >= '20250501'
+            GROUP BY title
+            HAVING SUM(CASE WHEN read_rate < {read_on_avg_threshold} THEN 1 ELSE 0 END) / (COUNT(*) + {base_discover_time}) >= {low_read_rate_threshold}
+            ;
+        """
+        return await self.pool.async_fetch(query=query)
+
+    async def save_record(self, article_tuple):
+        query = """
+            INSERT IGNORE INTO rate_limited_title
+            (title_md5, title, remark)
+            VALUES
+            (%s, %s, %s)
+        """
+        return await self.pool.async_save(query=query, params=article_tuple)
+
+
+class RateLimitedArticleFilter(RateLimitedArticleMapper):
+    def __init__(
+        self,
+        pool: DatabaseManager,
+        config: GlobalConfigSettings,
+        # log_service: LogService,
+    ):
+        super().__init__(pool=pool)
+        self.config = config.read_rate_limit
+
+    async def _process_single_article(self, data: Dict, days: int, semaphore: asyncio.Semaphore):
+        """处理单个文章的异步任务"""
+        async with semaphore:
+            gh_id = data["gh_id"]
+            title = data["title"]
+            title_md5 = hashlib.md5(title.encode("utf-8")).hexdigest()
+            remark = json.dumps(
+                {
+                    "发文数量": data["publish_count"],
+                    "限流数量": data["low_read_count"],
+                    "周期": days,
+                    "执行日期": datetime.datetime.today().strftime("%Y-%m-%d"),
+                },
+                ensure_ascii=False,
+            )
+            insert_rows = await self.save_record(article_tuple=(title_md5, title, remark))
+            if insert_rows:
+                await delete_illegal_gzh_articles(gh_id=gh_id, title=title)
+            else:
+                print("该文章已经删过")
+
+    async def process_single_task(self, days: int, max_concurrent: int = 10):
+        """并发处理所有文章任务"""
+        data_list = await self.find_rate_limited_articles(
+            days_duration=days,
+            read_on_avg_threshold=self.config.read_on_avg_threshold,
+            base_discover_time=self.config.base_discover_time,
+            low_read_rate_threshold=self.config.low_read_rate_threshold
+        )
+
+        # 创建信号量限制并发数
+        semaphore = asyncio.Semaphore(max_concurrent)
+
+        # 创建所有任务
+        tasks = [
+            self._process_single_article(data, days, semaphore)
+            for data in data_list
+        ]
+
+        # 使用 tqdm 显示进度并发执行所有任务
+        for coro in tqdm.as_completed(tasks, total=len(tasks)):
+            await coro
+
+    async def deal(self):
+        for _day in self.config.stat_durations:
+            await self.process_single_task(_day)

+ 1 - 1
app/domains/data_recycle_tasks/recycle_daily_publish_articles.py

@@ -49,7 +49,7 @@ class Const:
         "gh_ff487cb5dab3",
         "gh_ac43eb24376d",
         "gh_b15de7c99912",
-        "gh_56ca3dae948c"
+        "gh_56ca3dae948c",
     ]
 
     # NOT USED SERVER ACCOUNT

+ 4 - 2
app/domains/monitor_tasks/auto_reply_cards_monitor.py

@@ -95,7 +95,7 @@ class AutoReplyCardsMonitorUtils(AutoReplyCardsMonitorConst):
                 "account_name": item.findtext("sources/source/name"),
                 "gh_id": "",
                 "desc": "",
-                "msg_type": msg_type
+                "msg_type": msg_type,
             }
             items.append(data)
 
@@ -739,7 +739,9 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorMapper):
                                 await self.store_card(task_id, index, msg_type, xml_obj)
 
                             case "5":
-                                await self.store_article(task_id, index, msg_type, xml_obj)
+                                await self.store_article(
+                                    task_id, index, msg_type, xml_obj
+                                )
 
                             case _:
                                 continue

+ 3 - 1
app/domains/monitor_tasks/gzh_article_monitor.py

@@ -262,7 +262,9 @@ class InnerGzhArticlesMonitor(MonitorConst):
             where publish_timestamp >= %s
             order by publish_timestamp desc;
         """
-        response = await self.pool.async_fetch(query=query, db_name="piaoquan_crawler", params=(start_timestamp,))
+        response = await self.pool.async_fetch(
+            query=query, db_name="piaoquan_crawler", params=(start_timestamp,)
+        )
         if not response:
             await feishu_robot.bot(
                 title="站内微信公众号发文监测任务异常",

+ 2 - 0
app/infra/mapper/__init__.py

@@ -1,2 +1,4 @@
 from .aigc_mapper import AigcDatabaseMapper
 from .long_video_mapper import LongVideoDatabaseMapper
+from .long_article_mapper import LongArticleDatabaseMapper
+from .piaoquan_crawler_mapper import PiaoquanCrawlerDatabaseMapper

+ 19 - 0
app/infra/mapper/long_article_mapper.py

@@ -0,0 +1,19 @@
+from typing import Set, List
+
+from app.core.database import DatabaseManager
+
+
+class LongArticleDatabaseMapper:
+    class Basic:
+        @staticmethod
+        async def get_unsafe_articles(pool: DatabaseManager) -> Set[str]:
+            query = """
+                    SELECT title FROM article_unsafe_title WHERE status = 1;
+                """
+            response = await pool.async_fetch(query=query)
+            return set([i["title"] for i in response])
+
+    class Recommend(Basic):
+        @staticmethod
+        async def get_top_articles(pool: DatabaseManager, query: str, gh_id: str):
+            return await pool.async_fetch(query=query, params=(gh_id,))

+ 12 - 1
app/infra/mapper/piaoquan_crawler_mapper.py

@@ -1,4 +1,4 @@
-from typing import List, Dict
+from typing import List, Dict, Set
 
 from app.core.database import DatabaseManager
 
@@ -35,3 +35,14 @@ class PiaoquanCrawlerDatabaseMapper:
         return await pool.async_save(
             query=query, params=(0,), db_name="piaoquan_crawler"
         )
+
+    # 获取账号的历史发文
+    @staticmethod
+    async def get_published_articles(pool: DatabaseManager, gh_id: str) -> Set[str]:
+        query = """
+               SELECT title FROM official_articles_v2 WHERE ghId = %s AND ItemIndex = %s;
+           """
+        response = await pool.async_fetch(
+            query=query, db_name="piaoquan_crawler", params=(gh_id, 1)
+        )
+        return set([i["title"] for i in response])

+ 9 - 0
app/infra/shared/tools.py

@@ -235,6 +235,15 @@ def fetch_from_odps(query):
             return []
 
 
+def init_odps_client():
+    return ODPS(
+        access_id="LTAIWYUujJAm7CbH",
+        secret_access_key="RfSjdiWwED1sGFlsjXv0DlfTnZTG1P",
+        endpoint="http://service.cn.maxcompute.aliyun.com/api",
+        project="loghubods",
+    )
+
+
 def upload_to_oss(local_video_path, oss_key):
     """
     把视频上传到 oss

+ 10 - 0
app/jobs/task_handler.py

@@ -7,6 +7,7 @@ from app.domains.analysis_task import CrawlerDetailDeal
 from app.domains.analysis_task import AccountPositionReadRateAvg
 from app.domains.analysis_task import AccountPositionReadAvg
 from app.domains.analysis_task import AccountPositionOpenRateAvg
+from app.domains.analysis_task import RateLimitedArticleFilter
 
 from app.domains.algorithm_tasks import AccountCategoryAnalysis
 
@@ -424,5 +425,14 @@ class TaskHandler:
         await task.deal(params=self.data)
         return TaskStatus.SUCCESS
 
+    @register("rate_limited_article_filter")
+    async def _rate_limited_article_filter(self) -> int:
+        """限流文章删除"""
+        task = RateLimitedArticleFilter(
+            pool=self.db_client, config=self.config
+        )
+        await task.deal()
+        return TaskStatus.SUCCESS
+
 
 __all__ = ["TaskHandler"]

+ 180 - 0
app/recommend/offline_recommend/core.py

@@ -0,0 +1,180 @@
+from typing import Dict, Set, List
+
+from app.core.config import GlobalConfigSettings
+from app.core.database import DatabaseManager
+from app.core.observability import LogService
+
+from app.infra.mapper import LongArticleDatabaseMapper
+from app.infra.mapper import PiaoquanCrawlerDatabaseMapper
+from app.infra.shared.tools import init_odps_client
+
+from app.recommend.offline_recommend.strategy import I2I
+from app.recommend.offline_recommend.strategy import GetTopArticleStrategy
+from app.recommend.offline_recommend.utils import RecommendApolloClient
+
+
+class BaseOffRecommendUtils:
+    def __init__(
+        self,
+        pool: DatabaseManager,
+        config: GlobalConfigSettings,
+    ):
+        self.pool = pool
+        self.recommend_apollo_client = RecommendApolloClient(config=config)
+        self.odps_client = init_odps_client()
+
+    # read from odps
+    async def read_from_odps(self, query: str) -> List:
+        with self.odps_client.execute_sql(query).open_reader() as reader:
+            if reader:
+                return [item for item in reader]
+            else:
+                return []
+
+    # 获取全局过滤标题
+    async def get_global_filter_title(self) -> Set[str]:
+        unsafe_titles_set: Set[
+            str
+        ] = await LongArticleDatabaseMapper.Recommend.get_unsafe_articles(self.pool)
+        apollo_unsafe_titles: List[
+            str
+        ] = await self.recommend_apollo_client.get_unsafe_titles_from_apollo()
+        apollo_bad_titles: List[
+            str
+        ] = await self.recommend_apollo_client.get_bad_titles_from_apollo()
+        unsafe_titles_set.update(apollo_unsafe_titles)
+        unsafe_titles_set.update(apollo_bad_titles)
+        return unsafe_titles_set
+
+    # 获取账号的 Top 文章
+    async def get_account_top_articles(
+        self, gh_id: str, strategy: str
+    ) -> List[Dict[str, str]]:
+        match strategy:
+            # 后续拓展策略
+            case _:
+                query = GetTopArticleStrategy.base()
+
+        top_articles = await LongArticleDatabaseMapper.Recommend.get_top_articles(
+            pool=self.pool, query=query, gh_id=gh_id
+        )
+
+        return top_articles
+
+    # 获取标题的推荐文章
+    async def get_recommend_articles(self, title: str, strategy: str) -> List:
+        match strategy:
+            # 后续拓展策略
+            case _:
+                query = I2I.base(title)
+
+        recommend_articles = await self.read_from_odps(query)
+        return recommend_articles
+
+    # 获取一批标题的推荐标题
+    async def get_recommend_articles_for_batch_titles(self, title_list: List[str], strategy: str) -> List[Dict[str, str]]:
+        match strategy:
+            case "v1":
+                query = I2I.strategy_v1(title_list)
+
+            case _:
+                query = I2I.batch_base(title_list)
+
+        print(query)
+
+        recommend_articles = await self.read_from_odps(query)
+        return recommend_articles
+
+
+class BaseOfflineRecommend(BaseOffRecommendUtils):
+    def __init__(
+        self,
+        pool: DatabaseManager,
+        config: GlobalConfigSettings,
+        log_service: LogService,
+    ):
+        super().__init__(pool, config)
+        self.pool = pool
+        self.log_service = log_service
+        self.filter_title: Set[str] = set()
+        self.filter_keys: List[str] = []
+
+    # 解析策略base 的数据结构
+    def extract_base(self, account_info, recommend_articles, published_titles: Set[str]):
+        account_name = account_info["account_name"]
+        gh_id = account_info["gh_id"]
+        candidate_articles: List[Dict] = [
+            {
+                "account_name": account_name,
+                "gh_id": gh_id,
+                "source_title": item.src_title,
+                "recommend_title": item.rec_title,
+                "collinear_cnt": item.collinear_cnt,
+                "base_cnt": item.base_cnt,
+                "collinear_ratio": item.rec_collinear_ratio,
+            }
+            for item in recommend_articles
+            if item.rec_title
+               and item.rec_title not in self.filter_title
+               and item.rec_title not in published_titles
+        ]
+        return candidate_articles
+
+    # 解析策略v1的数据结构
+    def extract_v1(self, account_info, recommend_articles, published_titles: Set[str]):
+        account_name = account_info["account_name"]
+        gh_id = account_info["gh_id"]
+        candidate_articles: List[Dict] = [
+            {
+                "account_name": account_name,
+                "gh_id": gh_id,
+                "recommend_title": item.recommend_title,
+                "collinear_cnt": item.collinear_cnt,
+                "base_cnt": item.base_cnt,
+                "recommend_score": item.recommend_score,
+            }
+            for item in recommend_articles
+            if item.recommend_title
+               and item.recommend_title not in self.filter_title
+               and item.recommend_title not in published_titles
+        ]
+        return candidate_articles
+
+    # 初始化类的时候,加载全局过滤标题
+    async def init_filter_titles(self):
+        self.filter_title = await self.get_global_filter_title()
+
+    # recommend for each account
+    async def recommend_for_account(
+        self,
+        account_info: Dict,
+        strategy: str,
+        published_titles: Set[str],
+    ):
+        gh_id: str = account_info["gh_id"]
+        top_articles = await self.get_account_top_articles(gh_id, strategy)
+        top_titles = [i['title'] for i in top_articles]
+
+        recommend_articles = await self.get_recommend_articles_for_batch_titles(top_titles, strategy)
+        match strategy:
+            case "v1":
+                return self.extract_v1(
+                    account_info, recommend_articles, published_titles
+                )
+
+            case _:
+                return self.extract_base(
+                    account_info, recommend_articles, published_titles
+                )
+
+
+    async def deal(self, account_info: Dict[str, str], strategy: str):
+        gh_id: str = account_info["gh_id"]
+
+        published_titles: Set[
+            str
+        ] = await PiaoquanCrawlerDatabaseMapper.get_published_articles(self.pool, gh_id)
+        recommend_articles = await self.recommend_for_account(
+            account_info, strategy, published_titles
+        )
+        return recommend_articles

+ 5 - 0
app/recommend/offline_recommend/strategy/__init__.py

@@ -0,0 +1,5 @@
+from .i2i import I2I
+from .get_top_article import GetTopArticleStrategy
+
+
+__all__ = ["GetTopArticleStrategy", "I2I"]

+ 2 - 0
app/recommend/offline_recommend/strategy/base.py

@@ -0,0 +1,2 @@
+class BaseStrategy:
+    pass

+ 34 - 0
app/recommend/offline_recommend/strategy/get_top_article.py

@@ -0,0 +1,34 @@
+from .base import BaseStrategy
+
+
+class GetTopArticleStrategy(BaseStrategy):
+    @staticmethod
+    def base() -> str:
+        query = """
+            SELECT title, sum(view_count) as total_view_count, sum(fans) as total_fan_count
+            FROM datastat_sort_strategy
+            WHERE position = 1 and gh_id = %s AND date_str >= '20250501' AND view_count > 1000
+            GROUP BY title
+            ORDER BY sum(view_count) / sum(fans) DESC
+            LIMIT 25;
+        """
+        return query
+
+    @staticmethod
+    def strategy_v1() -> str:
+        query = """
+            SELECT title, sum(view_count) as total_view_count, sum(fans) as total_fan_count
+            FROM datastat_sort_strategy
+            WHERE position = 1 and gh_id = %s AND date_str >= '20250501' AND view_count > 1000
+            GROUP BY title
+            ORDER BY sum(view_count) / sum(fans) DESC
+            LIMIT 25;
+        """
+        return query
+
+    @staticmethod
+    def strategy_v2() -> str:
+        query = """
+            SELECT date_str, title, view_count from datastat_sort_strategy where position = 1 and gh_id = %s
+        """
+        return query

+ 85 - 0
app/recommend/offline_recommend/strategy/i2i.py

@@ -0,0 +1,85 @@
+from .base import BaseStrategy
+
+
+class I2I(BaseStrategy):
+
+    @staticmethod
+    def base(title: str, limit: int = 50) -> str:
+        query = f"""
+        -- ① 源标题(可限定,强烈建议)
+        WITH src_title_user AS (
+            SELECT DISTINCT title AS 源标题, union_id
+            FROM loghubods.title_union_id_info
+            WHERE title = '{title}'
+        ),
+        -- ② 源标题 → 联想标题 共现
+        co_occur AS (
+            SELECT
+                s.源标题, t.title AS 联想标题, COUNT(*) AS 联想次数
+            FROM src_title_user s
+            JOIN loghubods.title_union_id_info t ON s.union_id = t.union_id
+            WHERE s.源标题 != t.title
+            GROUP BY s.源标题, t.title
+        ),
+        -- ③ 联想标题的 uid 覆盖量(独立算)
+        title_uid_cnt AS (
+            SELECT
+                title, COUNT(DISTINCT union_id) AS 联想标题_uid数量
+            FROM loghubods.title_union_id_info
+            GROUP BY title
+        )
+        -- ④ 合并
+        SELECT c.源标题, c.联想标题, c.联想次数, u.联想标题_uid数量, c.联想次数 / (u.联想标题_uid数量 + 1000) AS 联想标题_uid覆盖率
+        FROM co_occur c
+        LEFT JOIN title_uid_cnt u ON c.联想标题 = u.title
+        ORDER BY 联想标题_uid覆盖率 DESC
+        LIMIT {limit};
+        """
+        return query
+
+    @staticmethod
+    def batch_base(title_list, limit: int = 1000):
+        title_tuple = tuple(title_list)
+        query = f"""
+            SELECT  src_title, rec_title, collinear_cnt, base_cnt, rec_collinear_ratio
+            FROM    loghubods.t2i_records
+            WHERE   src_title IN {title_tuple}
+            AND data_version = 'v1' ORDER BY rec_collinear_ratio DESC
+            LIMIT {limit};
+        """
+        return query
+
+    @staticmethod
+    def batch_summary(title_list, limit: int = 500):
+        title_tuple = tuple(title_list)
+        query = f"""
+            SELECT  rec_title
+                    ,SUM(collinear_cnt) AS total_collinear_cnt
+                    ,SUM(base_cnt) AS total_base_cnt
+                    ,SUM(collinear_cnt) / (SUM(base_cnt) + 1000) AS rec_collinear_ratio
+            FROM    loghubods.t2i_records
+            WHERE   src_title IN {title_tuple}
+            AND     data_version = 'v3'
+            GROUP BY rec_title
+            ORDER BY rec_collinear_ratio DESC
+            LIMIT {limit};
+        """
+        return query
+
+
+    @staticmethod
+    def strategy_v1(title_list, limit: int = 500):
+        title_tuple = tuple(title_list)
+        query = f"""
+            SELECT  associated_title AS recommend_title
+                ,sum(association_count) as collinear_cnt
+                ,sum(associated_title_uid_count) as base_cnt
+                ,sum(association_count) / (sum(associated_title_uid_count) + 10000) AS recommend_score
+            FROM    loghubods.i2i_table
+            WHERE     dt = MAX_PT('i2i_table')
+            AND   source_title IN {title_tuple}
+            GROUP BY recommend_title
+            ORDER BY recommend_score DESC
+            LIMIT {limit};
+            """
+        return query

+ 4 - 0
app/recommend/offline_recommend/utils/__init__.py

@@ -0,0 +1,4 @@
+from .recommend_apollo import RecommendApolloClient
+
+
+__all__ = ["RecommendApolloClient"]

+ 18 - 0
app/recommend/offline_recommend/utils/recommend_apollo.py

@@ -0,0 +1,18 @@
+from app.infra.external import AsyncApolloApi
+from app.core.config import GlobalConfigSettings
+
+
+class RecommendApolloClient:
+    def __init__(self, config: GlobalConfigSettings):
+        self.apollo_client = AsyncApolloApi(
+            apollo_config=config.apollo, app_id="longarticle-recommend", env="prod"
+        )
+
+    async def get_unsafe_titles_from_apollo(self):
+        return await self.apollo_client.get_config_value(key="UnSafeTitles")
+
+    async def get_unsafe_keywords_from_apollo(self):
+        return self.apollo_client.get_config_value(key="keywords")
+
+    async def get_bad_titles_from_apollo(self):
+        return await self.apollo_client.get_config_value(key="badTitles")