core.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. from typing import Dict, Set, List
  2. from app.core.config import GlobalConfigSettings
  3. from app.core.database import DatabaseManager
  4. from app.core.observability import LogService
  5. from app.infra.mapper import LongArticleDatabaseMapper
  6. from app.infra.mapper import PiaoquanCrawlerDatabaseMapper
  7. from app.infra.shared.tools import init_odps_client
  8. from app.recommend.offline_recommend.strategy import I2I
  9. from app.recommend.offline_recommend.strategy import GetTopArticleStrategy
  10. from app.recommend.offline_recommend.utils import RecommendApolloClient
  11. class BaseOffRecommendUtils:
  12. def __init__(
  13. self,
  14. pool: DatabaseManager,
  15. config: GlobalConfigSettings,
  16. ):
  17. self.pool = pool
  18. self.recommend_apollo_client = RecommendApolloClient(config=config)
  19. self.odps_client = init_odps_client()
  20. # read from odps
  21. async def read_from_odps(self, query: str) -> List:
  22. with self.odps_client.execute_sql(query).open_reader() as reader:
  23. if reader:
  24. return [item for item in reader]
  25. else:
  26. return []
  27. # 获取全局过滤标题
  28. async def get_global_filter_title(self) -> Set[str]:
  29. unsafe_titles_set: Set[
  30. str
  31. ] = await LongArticleDatabaseMapper.Recommend.get_unsafe_articles(self.pool)
  32. apollo_unsafe_titles: List[
  33. str
  34. ] = await self.recommend_apollo_client.get_unsafe_titles_from_apollo()
  35. apollo_bad_titles: List[
  36. str
  37. ] = await self.recommend_apollo_client.get_bad_titles_from_apollo()
  38. unsafe_titles_set.update(apollo_unsafe_titles)
  39. unsafe_titles_set.update(apollo_bad_titles)
  40. return unsafe_titles_set
  41. # 获取账号的 Top 文章
  42. async def get_account_top_articles(
  43. self, gh_id: str, strategy: str
  44. ) -> List[Dict[str, str]]:
  45. match strategy:
  46. # 后续拓展策略
  47. case _:
  48. query = GetTopArticleStrategy.base()
  49. top_articles = await LongArticleDatabaseMapper.Recommend.get_top_articles(
  50. pool=self.pool, query=query, gh_id=gh_id
  51. )
  52. return top_articles
  53. # 获取标题的推荐文章
  54. async def get_recommend_articles(self, title: str, strategy: str) -> List:
  55. match strategy:
  56. # 后续拓展策略
  57. case _:
  58. query = I2I.base(title)
  59. recommend_articles = await self.read_from_odps(query)
  60. return recommend_articles
  61. # 获取一批标题的推荐标题
  62. async def get_recommend_articles_for_batch_titles(self, title_list: List[str], strategy: str) -> List[Dict[str, str]]:
  63. match strategy:
  64. case "v1":
  65. query = I2I.strategy_v1(title_list)
  66. case _:
  67. query = I2I.batch_base(title_list)
  68. print(query)
  69. recommend_articles = await self.read_from_odps(query)
  70. return recommend_articles
  71. class BaseOfflineRecommend(BaseOffRecommendUtils):
  72. def __init__(
  73. self,
  74. pool: DatabaseManager,
  75. config: GlobalConfigSettings,
  76. log_service: LogService,
  77. ):
  78. super().__init__(pool, config)
  79. self.pool = pool
  80. self.log_service = log_service
  81. self.filter_title: Set[str] = set()
  82. self.filter_keys: List[str] = []
  83. # 解析策略base 的数据结构
  84. def extract_base(self, account_info, recommend_articles, published_titles: Set[str]):
  85. account_name = account_info["account_name"]
  86. gh_id = account_info["gh_id"]
  87. candidate_articles: List[Dict] = [
  88. {
  89. "account_name": account_name,
  90. "gh_id": gh_id,
  91. "source_title": item.src_title,
  92. "recommend_title": item.rec_title,
  93. "collinear_cnt": item.collinear_cnt,
  94. "base_cnt": item.base_cnt,
  95. "collinear_ratio": item.rec_collinear_ratio,
  96. }
  97. for item in recommend_articles
  98. if item.rec_title
  99. and item.rec_title not in self.filter_title
  100. and item.rec_title not in published_titles
  101. ]
  102. return candidate_articles
  103. # 解析策略v1的数据结构
  104. def extract_v1(self, account_info, recommend_articles, published_titles: Set[str]):
  105. account_name = account_info["account_name"]
  106. gh_id = account_info["gh_id"]
  107. candidate_articles: List[Dict] = [
  108. {
  109. "account_name": account_name,
  110. "gh_id": gh_id,
  111. "recommend_title": item.recommend_title,
  112. "collinear_cnt": item.collinear_cnt,
  113. "base_cnt": item.base_cnt,
  114. "recommend_score": item.recommend_score,
  115. }
  116. for item in recommend_articles
  117. if item.recommend_title
  118. and item.recommend_title not in self.filter_title
  119. and item.recommend_title not in published_titles
  120. ]
  121. return candidate_articles
  122. # 初始化类的时候,加载全局过滤标题
  123. async def init_filter_titles(self):
  124. self.filter_title = await self.get_global_filter_title()
  125. # recommend for each account
  126. async def recommend_for_account(
  127. self,
  128. account_info: Dict,
  129. strategy: str,
  130. published_titles: Set[str],
  131. ):
  132. gh_id: str = account_info["gh_id"]
  133. top_articles = await self.get_account_top_articles(gh_id, strategy)
  134. top_titles = [i['title'] for i in top_articles]
  135. recommend_articles = await self.get_recommend_articles_for_batch_titles(top_titles, strategy)
  136. match strategy:
  137. case "v1":
  138. return self.extract_v1(
  139. account_info, recommend_articles, published_titles
  140. )
  141. case _:
  142. return self.extract_base(
  143. account_info, recommend_articles, published_titles
  144. )
  145. async def deal(self, account_info: Dict[str, str], strategy: str):
  146. gh_id: str = account_info["gh_id"]
  147. published_titles: Set[
  148. str
  149. ] = await PiaoquanCrawlerDatabaseMapper.get_published_articles(self.pool, gh_id)
  150. recommend_articles = await self.recommend_for_account(
  151. account_info, strategy, published_titles
  152. )
  153. return recommend_articles