core.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. from typing import Dict, Set, List, Tuple
  2. from app.core.config import GlobalConfigSettings
  3. from app.core.database import DatabaseManager
  4. from app.core.observability import LogService
  5. from app.infra.mapper import LongArticleDatabaseMapper
  6. from app.infra.mapper import PiaoquanCrawlerDatabaseMapper
  7. from app.infra.external import OdpsService
  8. from app.recommend.offline_recommend.strategy import I2I
  9. from app.recommend.offline_recommend.strategy import GetTopArticleStrategy
  10. from app.recommend.offline_recommend.utils import RecommendApolloClient
  11. from app.recommend.offline_recommend.utils import ProduceBaseData
  12. class BaseOffRecommendUtils:
  13. def __init__(
  14. self,
  15. pool: DatabaseManager,
  16. config: GlobalConfigSettings,
  17. ):
  18. self.pool = pool
  19. self.recommend_apollo_client = RecommendApolloClient(config=config)
  20. self.odps_client = OdpsService(
  21. access_id="LTAIWYUujJAm7CbH",
  22. secret_access_key="RfSjdiWwED1sGFlsjXv0DlfTnZTG1P",
  23. endpoint="http://service.cn.maxcompute.aliyun.com/api",
  24. project="loghubods",
  25. )
  26. # 获取全局过滤标题
  27. async def get_global_filter_title(self) -> Set[str]:
  28. unsafe_titles_set: Set[
  29. str
  30. ] = await LongArticleDatabaseMapper.Recommend.get_unsafe_articles(self.pool)
  31. apollo_unsafe_titles: List[
  32. str
  33. ] = await self.recommend_apollo_client.get_unsafe_titles_from_apollo()
  34. apollo_bad_titles: List[
  35. str
  36. ] = await self.recommend_apollo_client.get_bad_titles_from_apollo()
  37. unsafe_titles_set.update(apollo_unsafe_titles)
  38. unsafe_titles_set.update(apollo_bad_titles)
  39. return unsafe_titles_set
  40. # 获取一批标题的推荐标题
  41. async def get_recommend_articles_for_batch_titles(
  42. self, title_list: List[str], strategy: str
  43. ) -> List[Dict[str, str]]:
  44. match strategy:
  45. case "v1":
  46. query = I2I.strategy_v1(title_list)
  47. case _:
  48. query = I2I.batch_base(title_list)
  49. recommend_articles = await self.odps_client.read_from_odps(query)
  50. return recommend_articles
  51. class BaseOfflineDataProduce(BaseOffRecommendUtils):
  52. """
  53. 构建实验离线数据
  54. """
  55. def __init__(
  56. self,
  57. pool: DatabaseManager,
  58. config: GlobalConfigSettings,
  59. log_service: LogService,
  60. ):
  61. super().__init__(pool, config)
  62. self.pool = pool
  63. self.log_service = log_service
  64. async def produce_article_data(self, account_tuple: Tuple[str]):
  65. query = ProduceBaseData.article_unionid_mapper(account_tuple)
  66. print(query)
  67. await self.odps_client.execute_odps_query(query)
  68. async def produce_title_data(self):
  69. query2 = ProduceBaseData.title_unionid_mapper()
  70. await self.odps_client.execute_odps_query(query2)
  71. async def produce_i2i_table(self, dt: str):
  72. query = ProduceBaseData.i2i_mapper(dt=dt)
  73. await self.odps_client.execute_odps_query(query)
  74. class BaseOfflineRecommend(BaseOffRecommendUtils):
  75. def __init__(
  76. self,
  77. pool: DatabaseManager,
  78. config: GlobalConfigSettings,
  79. log_service: LogService,
  80. ):
  81. super().__init__(pool, config)
  82. self.pool = pool
  83. self.log_service = log_service
  84. self.filter_title: Set[str] = set()
  85. self.filter_keys: List[str] = []
  86. # 解析策略base 的数据结构
  87. def extract_base(
  88. self, account_info, recommend_articles, published_titles: Set[str]
  89. ):
  90. account_name = account_info["account_name"]
  91. gh_id = account_info["gh_id"]
  92. candidate_articles: List[Dict] = [
  93. {
  94. "account_name": account_name,
  95. "gh_id": gh_id,
  96. "source_title": item.src_title,
  97. "recommend_title": item.rec_title,
  98. "collinear_cnt": item.collinear_cnt,
  99. "base_cnt": item.base_cnt,
  100. "collinear_ratio": item.rec_collinear_ratio,
  101. }
  102. for item in recommend_articles
  103. if item.rec_title
  104. and item.rec_title not in self.filter_title
  105. and item.rec_title not in published_titles
  106. ]
  107. return candidate_articles
  108. # 解析策略v1的数据结构
  109. def extract_v1(self, account_info, recommend_articles, published_titles: Set[str]):
  110. account_name = account_info["account_name"]
  111. gh_id = account_info["gh_id"]
  112. candidate_articles: List[Dict] = [
  113. {
  114. "account_name": account_name,
  115. "gh_id": gh_id,
  116. "recommend_title": item.recommend_title,
  117. "collinear_cnt": item.collinear_cnt,
  118. "base_cnt": item.base_cnt,
  119. "recommend_score": item.recommend_score,
  120. }
  121. for item in recommend_articles
  122. if item.recommend_title
  123. and item.recommend_title not in self.filter_title
  124. and item.recommend_title not in published_titles
  125. ]
  126. return candidate_articles
  127. # 初始化类的时候,加载全局过滤标题
  128. async def init_filter_titles(self):
  129. self.filter_title = await self.get_global_filter_title()
  130. # recommend for each account
  131. async def recommend_for_account(
  132. self,
  133. account_info: Dict,
  134. strategy: str,
  135. published_titles: Set[str],
  136. ):
  137. gh_id: str = account_info["gh_id"]
  138. account_name: str = account_info["account_name"]
  139. match strategy:
  140. case "v1":
  141. odps_query = GetTopArticleStrategy.strategy_v1(account_name)
  142. top_articles = await self.odps_client.read_from_odps(odps_query)
  143. top_titles = [i.title for i in top_articles]
  144. case "base":
  145. mysql_query = GetTopArticleStrategy.base()
  146. top_articles = (
  147. await LongArticleDatabaseMapper.Recommend.get_top_articles(
  148. pool=self.pool, query=mysql_query, gh_id=gh_id
  149. )
  150. )
  151. top_titles = [i["title"] for i in top_articles]
  152. case _:
  153. return []
  154. recommend_articles = await self.get_recommend_articles_for_batch_titles(
  155. top_titles, strategy
  156. )
  157. match strategy:
  158. case "v1":
  159. return self.extract_v1(
  160. account_info, recommend_articles, published_titles
  161. )
  162. case _:
  163. return self.extract_base(
  164. account_info, recommend_articles, published_titles
  165. )
  166. async def deal(self, account_info: Dict[str, str], strategy: str):
  167. gh_id: str = account_info["gh_id"]
  168. published_titles: Set[
  169. str
  170. ] = await PiaoquanCrawlerDatabaseMapper.get_published_articles(self.pool, gh_id)
  171. recommend_articles = await self.recommend_for_account(
  172. account_info, strategy, published_titles
  173. )
  174. return recommend_articles