from typing import Dict, Set, List from app.core.config import GlobalConfigSettings from app.core.database import DatabaseManager from app.core.observability import LogService from app.infra.mapper import LongArticleDatabaseMapper from app.infra.mapper import PiaoquanCrawlerDatabaseMapper from app.infra.shared.tools import init_odps_client from app.recommend.offline_recommend.strategy import I2I from app.recommend.offline_recommend.strategy import GetTopArticleStrategy from app.recommend.offline_recommend.utils import RecommendApolloClient class BaseOffRecommendUtils: def __init__( self, pool: DatabaseManager, config: GlobalConfigSettings, ): self.pool = pool self.recommend_apollo_client = RecommendApolloClient(config=config) self.odps_client = init_odps_client() # read from odps async def read_from_odps(self, query: str) -> List: with self.odps_client.execute_sql(query).open_reader() as reader: if reader: return [item for item in reader] else: return [] # 获取全局过滤标题 async def get_global_filter_title(self) -> Set[str]: unsafe_titles_set: Set[ str ] = await LongArticleDatabaseMapper.Recommend.get_unsafe_articles(self.pool) apollo_unsafe_titles: List[ str ] = await self.recommend_apollo_client.get_unsafe_titles_from_apollo() apollo_bad_titles: List[ str ] = await self.recommend_apollo_client.get_bad_titles_from_apollo() unsafe_titles_set.update(apollo_unsafe_titles) unsafe_titles_set.update(apollo_bad_titles) return unsafe_titles_set # 获取一批标题的推荐标题 async def get_recommend_articles_for_batch_titles( self, title_list: List[str], strategy: str ) -> List[Dict[str, str]]: match strategy: case "v1": query = I2I.strategy_v1(title_list) case _: query = I2I.batch_base(title_list) recommend_articles = await self.read_from_odps(query) return recommend_articles class BaseOfflineRecommend(BaseOffRecommendUtils): def __init__( self, pool: DatabaseManager, config: GlobalConfigSettings, log_service: LogService, ): super().__init__(pool, config) self.pool = pool self.log_service = log_service self.filter_title: Set[str] = set() self.filter_keys: List[str] = [] # 解析策略base 的数据结构 def extract_base( self, account_info, recommend_articles, published_titles: Set[str] ): account_name = account_info["account_name"] gh_id = account_info["gh_id"] candidate_articles: List[Dict] = [ { "account_name": account_name, "gh_id": gh_id, "source_title": item.src_title, "recommend_title": item.rec_title, "collinear_cnt": item.collinear_cnt, "base_cnt": item.base_cnt, "collinear_ratio": item.rec_collinear_ratio, } for item in recommend_articles if item.rec_title and item.rec_title not in self.filter_title and item.rec_title not in published_titles ] return candidate_articles # 解析策略v1的数据结构 def extract_v1(self, account_info, recommend_articles, published_titles: Set[str]): account_name = account_info["account_name"] gh_id = account_info["gh_id"] candidate_articles: List[Dict] = [ { "account_name": account_name, "gh_id": gh_id, "recommend_title": item.recommend_title, "collinear_cnt": item.collinear_cnt, "base_cnt": item.base_cnt, "recommend_score": item.recommend_score, } for item in recommend_articles if item.recommend_title and item.recommend_title not in self.filter_title and item.recommend_title not in published_titles ] return candidate_articles # 初始化类的时候,加载全局过滤标题 async def init_filter_titles(self): self.filter_title = await self.get_global_filter_title() # recommend for each account async def recommend_for_account( self, account_info: Dict, strategy: str, published_titles: Set[str], ): gh_id: str = account_info["gh_id"] account_name: str = account_info["account_name"] match strategy: case "v1": odps_query = GetTopArticleStrategy.strategy_v1(account_name) top_articles = await self.read_from_odps(odps_query) top_titles = [i.title for i in top_articles] case "base": mysql_query = GetTopArticleStrategy.base() top_articles = await LongArticleDatabaseMapper.Recommend.get_top_articles( pool=self.pool, query=mysql_query, gh_id=gh_id ) top_titles = [i["title"] for i in top_articles] case _: return [] recommend_articles = await self.get_recommend_articles_for_batch_titles( top_titles, strategy ) match strategy: case "v1": return self.extract_v1( account_info, recommend_articles, published_titles ) case _: return self.extract_base( account_info, recommend_articles, published_titles ) async def deal(self, account_info: Dict[str, str], strategy: str): gh_id: str = account_info["gh_id"] published_titles: Set[ str ] = await PiaoquanCrawlerDatabaseMapper.get_published_articles(self.pool, gh_id) recommend_articles = await self.recommend_for_account( account_info, strategy, published_titles ) return recommend_articles