| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202 |
- from typing import Dict, Set, List, Tuple
- from app.core.config import GlobalConfigSettings
- from app.core.database import DatabaseManager
- from app.core.observability import LogService
- from app.infra.mapper import LongArticleDatabaseMapper
- from app.infra.mapper import PiaoquanCrawlerDatabaseMapper
- from app.infra.external import OdpsService
- from app.recommend.offline_recommend.strategy import I2I
- from app.recommend.offline_recommend.strategy import GetTopArticleStrategy
- from app.recommend.offline_recommend.utils import RecommendApolloClient
- from app.recommend.offline_recommend.utils import ProduceBaseData
- class BaseOffRecommendUtils:
- def __init__(
- self,
- pool: DatabaseManager,
- config: GlobalConfigSettings,
- ):
- self.pool = pool
- self.recommend_apollo_client = RecommendApolloClient(config=config)
- self.odps_client = OdpsService(
- access_id="LTAIWYUujJAm7CbH",
- secret_access_key="RfSjdiWwED1sGFlsjXv0DlfTnZTG1P",
- endpoint="http://service.cn.maxcompute.aliyun.com/api",
- project="loghubods",
- )
- # 获取全局过滤标题
- async def get_global_filter_title(self) -> Set[str]:
- unsafe_titles_set: Set[
- str
- ] = await LongArticleDatabaseMapper.Recommend.get_unsafe_articles(self.pool)
- apollo_unsafe_titles: List[
- str
- ] = await self.recommend_apollo_client.get_unsafe_titles_from_apollo()
- apollo_bad_titles: List[
- str
- ] = await self.recommend_apollo_client.get_bad_titles_from_apollo()
- unsafe_titles_set.update(apollo_unsafe_titles)
- unsafe_titles_set.update(apollo_bad_titles)
- return unsafe_titles_set
- # 获取一批标题的推荐标题
- async def get_recommend_articles_for_batch_titles(
- self, title_list: List[str], strategy: str
- ) -> List[Dict[str, str]]:
- match strategy:
- case "v1":
- query = I2I.strategy_v1(title_list)
- case _:
- query = I2I.batch_base(title_list)
- recommend_articles = await self.odps_client.read_from_odps(query)
- return recommend_articles
- class BaseOfflineDataProduce(BaseOffRecommendUtils):
- """
- 构建实验离线数据
- """
- def __init__(
- self,
- pool: DatabaseManager,
- config: GlobalConfigSettings,
- log_service: LogService,
- ):
- super().__init__(pool, config)
- self.pool = pool
- self.log_service = log_service
- async def produce_article_data(self, account_tuple: Tuple[str]):
- query = ProduceBaseData.article_unionid_mapper(account_tuple)
- print(query)
- await self.odps_client.execute_odps_query(query)
- async def produce_title_data(self):
- query2 = ProduceBaseData.title_unionid_mapper()
- await self.odps_client.execute_odps_query(query2)
- async def produce_i2i_table(self, dt: str):
- query = ProduceBaseData.i2i_mapper(dt=dt)
- await self.odps_client.execute_odps_query(query)
- class BaseOfflineRecommend(BaseOffRecommendUtils):
- def __init__(
- self,
- pool: DatabaseManager,
- config: GlobalConfigSettings,
- log_service: LogService,
- ):
- super().__init__(pool, config)
- self.pool = pool
- self.log_service = log_service
- self.filter_title: Set[str] = set()
- self.filter_keys: List[str] = []
- # 解析策略base 的数据结构
- def extract_base(
- self, account_info, recommend_articles, published_titles: Set[str]
- ):
- account_name = account_info["account_name"]
- gh_id = account_info["gh_id"]
- candidate_articles: List[Dict] = [
- {
- "account_name": account_name,
- "gh_id": gh_id,
- "source_title": item.src_title,
- "recommend_title": item.rec_title,
- "collinear_cnt": item.collinear_cnt,
- "base_cnt": item.base_cnt,
- "collinear_ratio": item.rec_collinear_ratio,
- }
- for item in recommend_articles
- if item.rec_title
- and item.rec_title not in self.filter_title
- and item.rec_title not in published_titles
- ]
- return candidate_articles
- # 解析策略v1的数据结构
- def extract_v1(self, account_info, recommend_articles, published_titles: Set[str]):
- account_name = account_info["account_name"]
- gh_id = account_info["gh_id"]
- candidate_articles: List[Dict] = [
- {
- "account_name": account_name,
- "gh_id": gh_id,
- "recommend_title": item.recommend_title,
- "collinear_cnt": item.collinear_cnt,
- "base_cnt": item.base_cnt,
- "recommend_score": item.recommend_score,
- }
- for item in recommend_articles
- if item.recommend_title
- and item.recommend_title not in self.filter_title
- and item.recommend_title not in published_titles
- ]
- return candidate_articles
- # 初始化类的时候,加载全局过滤标题
- async def init_filter_titles(self):
- self.filter_title = await self.get_global_filter_title()
- # recommend for each account
- async def recommend_for_account(
- self,
- account_info: Dict,
- strategy: str,
- published_titles: Set[str],
- ):
- gh_id: str = account_info["gh_id"]
- account_name: str = account_info["account_name"]
- match strategy:
- case "v1":
- odps_query = GetTopArticleStrategy.strategy_v1(account_name)
- top_articles = await self.odps_client.read_from_odps(odps_query)
- top_titles = [i.title for i in top_articles]
- case "base":
- mysql_query = GetTopArticleStrategy.base()
- top_articles = (
- await LongArticleDatabaseMapper.Recommend.get_top_articles(
- pool=self.pool, query=mysql_query, gh_id=gh_id
- )
- )
- top_titles = [i["title"] for i in top_articles]
- case _:
- return []
- recommend_articles = await self.get_recommend_articles_for_batch_titles(
- top_titles, strategy
- )
- match strategy:
- case "v1":
- return self.extract_v1(
- account_info, recommend_articles, published_titles
- )
- case _:
- return self.extract_base(
- account_info, recommend_articles, published_titles
- )
- async def deal(self, account_info: Dict[str, str], strategy: str):
- gh_id: str = account_info["gh_id"]
- published_titles: Set[
- str
- ] = await PiaoquanCrawlerDatabaseMapper.get_published_articles(self.pool, gh_id)
- recommend_articles = await self.recommend_for_account(
- account_info, strategy, published_titles
- )
- return recommend_articles
|