from typing import Dict, Set, List, Tuple from app.core.config import GlobalConfigSettings from app.core.database import DatabaseManager from app.core.observability import LogService from app.infra.mapper import LongArticleDatabaseMapper from app.infra.mapper import PiaoquanCrawlerDatabaseMapper from app.infra.external import OdpsService from app.recommend.offline_recommend.strategy import I2I from app.recommend.offline_recommend.strategy import GetTopArticleStrategy from app.recommend.offline_recommend.utils import RecommendApolloClient from app.recommend.offline_recommend.utils import ProduceBaseData class BaseOffRecommendUtils: def __init__( self, pool: DatabaseManager, config: GlobalConfigSettings, ): self.pool = pool self.recommend_apollo_client = RecommendApolloClient(config=config) self.odps_client = OdpsService( access_id="LTAIWYUujJAm7CbH", secret_access_key="RfSjdiWwED1sGFlsjXv0DlfTnZTG1P", endpoint="http://service.cn.maxcompute.aliyun.com/api", project="loghubods", ) # 获取全局过滤标题 async def get_global_filter_title(self) -> Set[str]: unsafe_titles_set: Set[ str ] = await LongArticleDatabaseMapper.Recommend.get_unsafe_articles(self.pool) apollo_unsafe_titles: List[ str ] = await self.recommend_apollo_client.get_unsafe_titles_from_apollo() apollo_bad_titles: List[ str ] = await self.recommend_apollo_client.get_bad_titles_from_apollo() unsafe_titles_set.update(apollo_unsafe_titles) unsafe_titles_set.update(apollo_bad_titles) return unsafe_titles_set # 获取一批标题的推荐标题 async def get_recommend_articles_for_batch_titles( self, title_list: List[str], strategy: str ) -> List[Dict[str, str]]: match strategy: case "v1": query = I2I.strategy_v1(title_list) case _: query = I2I.batch_base(title_list) recommend_articles = await self.odps_client.read_from_odps(query) return recommend_articles class BaseOfflineDataProduce(BaseOffRecommendUtils): """ 构建实验离线数据 """ def __init__( self, pool: DatabaseManager, config: GlobalConfigSettings, log_service: LogService, ): super().__init__(pool, config) self.pool = pool self.log_service = log_service async def produce_article_data(self, account_tuple: Tuple[str]): query = ProduceBaseData.article_unionid_mapper(account_tuple) print(query) await self.odps_client.execute_odps_query(query) async def produce_title_data(self): query2 = ProduceBaseData.title_unionid_mapper() await self.odps_client.execute_odps_query(query2) async def produce_i2i_table(self, dt: str): query = ProduceBaseData.i2i_mapper(dt=dt) await self.odps_client.execute_odps_query(query) class BaseOfflineRecommend(BaseOffRecommendUtils): def __init__( self, pool: DatabaseManager, config: GlobalConfigSettings, log_service: LogService, ): super().__init__(pool, config) self.pool = pool self.log_service = log_service self.filter_title: Set[str] = set() self.filter_keys: List[str] = [] # 解析策略base 的数据结构 def extract_base( self, account_info, recommend_articles, published_titles: Set[str] ): account_name = account_info["account_name"] gh_id = account_info["gh_id"] candidate_articles: List[Dict] = [ { "account_name": account_name, "gh_id": gh_id, "source_title": item.src_title, "recommend_title": item.rec_title, "collinear_cnt": item.collinear_cnt, "base_cnt": item.base_cnt, "collinear_ratio": item.rec_collinear_ratio, } for item in recommend_articles if item.rec_title and item.rec_title not in self.filter_title and item.rec_title not in published_titles ] return candidate_articles # 解析策略v1的数据结构 def extract_v1(self, account_info, recommend_articles, published_titles: Set[str]): account_name = account_info["account_name"] gh_id = account_info["gh_id"] candidate_articles: List[Dict] = [ { "account_name": account_name, "gh_id": gh_id, "recommend_title": item.recommend_title, "collinear_cnt": item.collinear_cnt, "base_cnt": item.base_cnt, "recommend_score": item.recommend_score, } for item in recommend_articles if item.recommend_title and item.recommend_title not in self.filter_title and item.recommend_title not in published_titles ] return candidate_articles # 初始化类的时候,加载全局过滤标题 async def init_filter_titles(self): self.filter_title = await self.get_global_filter_title() # recommend for each account async def recommend_for_account( self, account_info: Dict, strategy: str, published_titles: Set[str], ): gh_id: str = account_info["gh_id"] account_name: str = account_info["account_name"] match strategy: case "v1": odps_query = GetTopArticleStrategy.strategy_v1(account_name) top_articles = await self.odps_client.read_from_odps(odps_query) top_titles = [i.title for i in top_articles] case "base": mysql_query = GetTopArticleStrategy.base() top_articles = ( await LongArticleDatabaseMapper.Recommend.get_top_articles( pool=self.pool, query=mysql_query, gh_id=gh_id ) ) top_titles = [i["title"] for i in top_articles] case _: return [] recommend_articles = await self.get_recommend_articles_for_batch_titles( top_titles, strategy ) match strategy: case "v1": return self.extract_v1( account_info, recommend_articles, published_titles ) case _: return self.extract_base( account_info, recommend_articles, published_titles ) async def deal(self, account_info: Dict[str, str], strategy: str): gh_id: str = account_info["gh_id"] published_titles: Set[ str ] = await PiaoquanCrawlerDatabaseMapper.get_published_articles(self.pool, gh_id) recommend_articles = await self.recommend_for_account( account_info, strategy, published_titles ) return recommend_articles