core.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. from typing import Dict, Set, List
  2. from app.core.config import GlobalConfigSettings
  3. from app.core.database import DatabaseManager
  4. from app.core.observability import LogService
  5. from app.infra.mapper import LongArticleDatabaseMapper
  6. from app.infra.mapper import PiaoquanCrawlerDatabaseMapper
  7. from app.infra.shared.tools import init_odps_client
  8. from app.recommend.offline_recommend.strategy import I2I
  9. from app.recommend.offline_recommend.strategy import GetTopArticleStrategy
  10. from app.recommend.offline_recommend.utils import RecommendApolloClient
  11. class BaseOffRecommendUtils:
  12. def __init__(
  13. self,
  14. pool: DatabaseManager,
  15. config: GlobalConfigSettings,
  16. ):
  17. self.pool = pool
  18. self.recommend_apollo_client = RecommendApolloClient(config=config)
  19. self.odps_client = init_odps_client()
  20. # read from odps
  21. async def read_from_odps(self, query: str) -> List:
  22. with self.odps_client.execute_sql(query).open_reader() as reader:
  23. if reader:
  24. return [item for item in reader]
  25. else:
  26. return []
  27. # 获取全局过滤标题
  28. async def get_global_filter_title(self) -> Set[str]:
  29. unsafe_titles_set: Set[
  30. str
  31. ] = await LongArticleDatabaseMapper.Recommend.get_unsafe_articles(self.pool)
  32. apollo_unsafe_titles: List[
  33. str
  34. ] = await self.recommend_apollo_client.get_unsafe_titles_from_apollo()
  35. apollo_bad_titles: List[
  36. str
  37. ] = await self.recommend_apollo_client.get_bad_titles_from_apollo()
  38. unsafe_titles_set.update(apollo_unsafe_titles)
  39. unsafe_titles_set.update(apollo_bad_titles)
  40. return unsafe_titles_set
  41. # 获取一批标题的推荐标题
  42. async def get_recommend_articles_for_batch_titles(
  43. self, title_list: List[str], strategy: str
  44. ) -> List[Dict[str, str]]:
  45. match strategy:
  46. case "v1":
  47. query = I2I.strategy_v1(title_list)
  48. case _:
  49. query = I2I.batch_base(title_list)
  50. recommend_articles = await self.read_from_odps(query)
  51. return recommend_articles
  52. class BaseOfflineRecommend(BaseOffRecommendUtils):
  53. def __init__(
  54. self,
  55. pool: DatabaseManager,
  56. config: GlobalConfigSettings,
  57. log_service: LogService,
  58. ):
  59. super().__init__(pool, config)
  60. self.pool = pool
  61. self.log_service = log_service
  62. self.filter_title: Set[str] = set()
  63. self.filter_keys: List[str] = []
  64. # 解析策略base 的数据结构
  65. def extract_base(
  66. self, account_info, recommend_articles, published_titles: Set[str]
  67. ):
  68. account_name = account_info["account_name"]
  69. gh_id = account_info["gh_id"]
  70. candidate_articles: List[Dict] = [
  71. {
  72. "account_name": account_name,
  73. "gh_id": gh_id,
  74. "source_title": item.src_title,
  75. "recommend_title": item.rec_title,
  76. "collinear_cnt": item.collinear_cnt,
  77. "base_cnt": item.base_cnt,
  78. "collinear_ratio": item.rec_collinear_ratio,
  79. }
  80. for item in recommend_articles
  81. if item.rec_title
  82. and item.rec_title not in self.filter_title
  83. and item.rec_title not in published_titles
  84. ]
  85. return candidate_articles
  86. # 解析策略v1的数据结构
  87. def extract_v1(self, account_info, recommend_articles, published_titles: Set[str]):
  88. account_name = account_info["account_name"]
  89. gh_id = account_info["gh_id"]
  90. candidate_articles: List[Dict] = [
  91. {
  92. "account_name": account_name,
  93. "gh_id": gh_id,
  94. "recommend_title": item.recommend_title,
  95. "collinear_cnt": item.collinear_cnt,
  96. "base_cnt": item.base_cnt,
  97. "recommend_score": item.recommend_score,
  98. }
  99. for item in recommend_articles
  100. if item.recommend_title
  101. and item.recommend_title not in self.filter_title
  102. and item.recommend_title not in published_titles
  103. ]
  104. return candidate_articles
  105. # 初始化类的时候,加载全局过滤标题
  106. async def init_filter_titles(self):
  107. self.filter_title = await self.get_global_filter_title()
  108. # recommend for each account
  109. async def recommend_for_account(
  110. self,
  111. account_info: Dict,
  112. strategy: str,
  113. published_titles: Set[str],
  114. ):
  115. gh_id: str = account_info["gh_id"]
  116. account_name: str = account_info["account_name"]
  117. match strategy:
  118. case "v1":
  119. odps_query = GetTopArticleStrategy.strategy_v1(account_name)
  120. top_articles = await self.read_from_odps(odps_query)
  121. top_titles = [i.title for i in top_articles]
  122. case "base":
  123. mysql_query = GetTopArticleStrategy.base()
  124. top_articles = await LongArticleDatabaseMapper.Recommend.get_top_articles(
  125. pool=self.pool, query=mysql_query, gh_id=gh_id
  126. )
  127. top_titles = [i["title"] for i in top_articles]
  128. case _:
  129. return []
  130. recommend_articles = await self.get_recommend_articles_for_batch_titles(
  131. top_titles, strategy
  132. )
  133. match strategy:
  134. case "v1":
  135. return self.extract_v1(
  136. account_info, recommend_articles, published_titles
  137. )
  138. case _:
  139. return self.extract_base(
  140. account_info, recommend_articles, published_titles
  141. )
  142. async def deal(self, account_info: Dict[str, str], strategy: str):
  143. gh_id: str = account_info["gh_id"]
  144. published_titles: Set[
  145. str
  146. ] = await PiaoquanCrawlerDatabaseMapper.get_published_articles(self.pool, gh_id)
  147. recommend_articles = await self.recommend_for_account(
  148. account_info, strategy, published_titles
  149. )
  150. return recommend_articles