| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- from typing import Dict, Set, List
- from app.core.config import GlobalConfigSettings
- from app.core.database import DatabaseManager
- from app.core.observability import LogService
- from app.infra.mapper import LongArticleDatabaseMapper
- from app.infra.mapper import PiaoquanCrawlerDatabaseMapper
- from app.infra.shared.tools import init_odps_client
- from app.recommend.offline_recommend.strategy import I2I
- from app.recommend.offline_recommend.strategy import GetTopArticleStrategy
- from app.recommend.offline_recommend.utils import RecommendApolloClient
- class BaseOffRecommendUtils:
- def __init__(
- self,
- pool: DatabaseManager,
- config: GlobalConfigSettings,
- ):
- self.pool = pool
- self.recommend_apollo_client = RecommendApolloClient(config=config)
- self.odps_client = init_odps_client()
- # read from odps
- async def read_from_odps(self, query: str) -> List:
- with self.odps_client.execute_sql(query).open_reader() as reader:
- if reader:
- return [item for item in reader]
- else:
- return []
- # 获取全局过滤标题
- async def get_global_filter_title(self) -> Set[str]:
- unsafe_titles_set: Set[
- str
- ] = await LongArticleDatabaseMapper.Recommend.get_unsafe_articles(self.pool)
- apollo_unsafe_titles: List[
- str
- ] = await self.recommend_apollo_client.get_unsafe_titles_from_apollo()
- apollo_bad_titles: List[
- str
- ] = await self.recommend_apollo_client.get_bad_titles_from_apollo()
- unsafe_titles_set.update(apollo_unsafe_titles)
- unsafe_titles_set.update(apollo_bad_titles)
- return unsafe_titles_set
- # 获取一批标题的推荐标题
- async def get_recommend_articles_for_batch_titles(
- self, title_list: List[str], strategy: str
- ) -> List[Dict[str, str]]:
- match strategy:
- case "v1":
- query = I2I.strategy_v1(title_list)
- case _:
- query = I2I.batch_base(title_list)
- recommend_articles = await self.read_from_odps(query)
- return recommend_articles
- class BaseOfflineRecommend(BaseOffRecommendUtils):
- def __init__(
- self,
- pool: DatabaseManager,
- config: GlobalConfigSettings,
- log_service: LogService,
- ):
- super().__init__(pool, config)
- self.pool = pool
- self.log_service = log_service
- self.filter_title: Set[str] = set()
- self.filter_keys: List[str] = []
- # 解析策略base 的数据结构
- def extract_base(
- self, account_info, recommend_articles, published_titles: Set[str]
- ):
- account_name = account_info["account_name"]
- gh_id = account_info["gh_id"]
- candidate_articles: List[Dict] = [
- {
- "account_name": account_name,
- "gh_id": gh_id,
- "source_title": item.src_title,
- "recommend_title": item.rec_title,
- "collinear_cnt": item.collinear_cnt,
- "base_cnt": item.base_cnt,
- "collinear_ratio": item.rec_collinear_ratio,
- }
- for item in recommend_articles
- if item.rec_title
- and item.rec_title not in self.filter_title
- and item.rec_title not in published_titles
- ]
- return candidate_articles
- # 解析策略v1的数据结构
- def extract_v1(self, account_info, recommend_articles, published_titles: Set[str]):
- account_name = account_info["account_name"]
- gh_id = account_info["gh_id"]
- candidate_articles: List[Dict] = [
- {
- "account_name": account_name,
- "gh_id": gh_id,
- "recommend_title": item.recommend_title,
- "collinear_cnt": item.collinear_cnt,
- "base_cnt": item.base_cnt,
- "recommend_score": item.recommend_score,
- }
- for item in recommend_articles
- if item.recommend_title
- and item.recommend_title not in self.filter_title
- and item.recommend_title not in published_titles
- ]
- return candidate_articles
- # 初始化类的时候,加载全局过滤标题
- async def init_filter_titles(self):
- self.filter_title = await self.get_global_filter_title()
- # recommend for each account
- async def recommend_for_account(
- self,
- account_info: Dict,
- strategy: str,
- published_titles: Set[str],
- ):
- gh_id: str = account_info["gh_id"]
- account_name: str = account_info["account_name"]
- match strategy:
- case "v1":
- odps_query = GetTopArticleStrategy.strategy_v1(account_name)
- top_articles = await self.read_from_odps(odps_query)
- top_titles = [i.title for i in top_articles]
- case "base":
- mysql_query = GetTopArticleStrategy.base()
- top_articles = await LongArticleDatabaseMapper.Recommend.get_top_articles(
- pool=self.pool, query=mysql_query, gh_id=gh_id
- )
- top_titles = [i["title"] for i in top_articles]
- case _:
- return []
- recommend_articles = await self.get_recommend_articles_for_batch_titles(
- top_titles, strategy
- )
- match strategy:
- case "v1":
- return self.extract_v1(
- account_info, recommend_articles, published_titles
- )
- case _:
- return self.extract_base(
- account_info, recommend_articles, published_titles
- )
- async def deal(self, account_info: Dict[str, str], strategy: str):
- gh_id: str = account_info["gh_id"]
- published_titles: Set[
- str
- ] = await PiaoquanCrawlerDatabaseMapper.get_published_articles(self.pool, gh_id)
- recommend_articles = await self.recommend_for_account(
- account_info, strategy, published_titles
- )
- return recommend_articles
|