task_handler.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. from datetime import datetime
  2. from applications.tasks.analysis_task import CrawlerDetailDeal
  3. from applications.tasks.analysis_task import AccountPositionReadRateAvg
  4. from applications.tasks.analysis_task import AccountPositionReadAvg
  5. from applications.tasks.algorithm_tasks import AccountCategoryAnalysis
  6. from applications.tasks.cold_start_tasks import ArticlePoolColdStart
  7. from applications.tasks.crawler_tasks import CrawlerToutiao
  8. from applications.tasks.crawler_tasks import WeixinAccountManager
  9. from applications.tasks.crawler_tasks import CrawlerGzhAccountArticles
  10. from applications.tasks.crawler_tasks import CrawlerGzhSearchArticles
  11. from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
  12. from applications.tasks.data_recycle_tasks import RecycleOutsideAccountArticlesTask
  13. from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
  14. from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
  15. from applications.tasks.data_recycle_tasks import RecycleFwhDailyPublishArticlesTask
  16. from applications.tasks.data_recycle_tasks import RecycleMiniProgramDetailTask
  17. from applications.tasks.data_recycle_tasks import (
  18. UpdateOutsideRootSourceIdAndUpdateTimeTask,
  19. )
  20. from applications.tasks.llm_tasks import TitleRewrite
  21. from applications.tasks.llm_tasks import ArticlePoolCategoryGeneration
  22. from applications.tasks.llm_tasks import CandidateAccountQualityScoreRecognizer
  23. from applications.tasks.llm_tasks import ExtractTitleFeatures
  24. from applications.tasks.monitor_tasks import check_kimi_balance
  25. from applications.tasks.monitor_tasks import GetOffVideos
  26. from applications.tasks.monitor_tasks import CheckVideoAuditStatus
  27. from applications.tasks.monitor_tasks import InnerGzhArticlesMonitor
  28. from applications.tasks.monitor_tasks import OutsideGzhArticlesMonitor
  29. from applications.tasks.monitor_tasks import OutsideGzhArticlesCollector
  30. from applications.tasks.monitor_tasks import TaskProcessingMonitor
  31. from applications.tasks.monitor_tasks import LimitedAccountAnalysisTask
  32. from applications.tasks.task_mapper import TaskMapper
  33. class TaskHandler(TaskMapper):
  34. def __init__(self, data, log_service, db_client, trace_id):
  35. self.data = data
  36. self.log_client = log_service
  37. self.db_client = db_client
  38. self.trace_id = trace_id
  39. # ---------- 下面是若干复合任务的局部实现 ----------
  40. async def _check_kimi_balance_handler(self) -> int:
  41. response = await check_kimi_balance()
  42. await self.log_client.log(
  43. contents={
  44. "trace_id": self.trace_id,
  45. "task": "check_kimi_balance",
  46. "data": response,
  47. }
  48. )
  49. return self.TASK_SUCCESS_STATUS
  50. async def _get_off_videos_task_handler(self) -> int:
  51. sub_task = GetOffVideos(self.db_client, self.log_client, self.trace_id)
  52. return await sub_task.deal()
  53. async def _check_video_audit_status_handler(self) -> int:
  54. sub_task = CheckVideoAuditStatus(self.db_client, self.log_client, self.trace_id)
  55. return await sub_task.deal()
  56. async def _task_processing_monitor_handler(self) -> int:
  57. sub_task = TaskProcessingMonitor(self.db_client)
  58. await sub_task.deal()
  59. return self.TASK_SUCCESS_STATUS
  60. async def _inner_gzh_articles_monitor_handler(self) -> int:
  61. sub_task = InnerGzhArticlesMonitor(self.db_client)
  62. return await sub_task.deal()
  63. async def _title_rewrite_handler(self):
  64. sub_task = TitleRewrite(self.db_client, self.log_client, self.trace_id)
  65. return await sub_task.deal()
  66. async def _update_root_source_id_handler(self) -> int:
  67. sub_task = UpdateRootSourceIdAndUpdateTimeTask(self.db_client, self.log_client)
  68. await sub_task.deal()
  69. return self.TASK_SUCCESS_STATUS
  70. async def _outside_monitor_handler(self) -> int:
  71. collector = OutsideGzhArticlesCollector(self.db_client)
  72. await collector.deal()
  73. monitor = OutsideGzhArticlesMonitor(self.db_client)
  74. return await monitor.deal() # 应返回 SUCCESS / FAILED
  75. async def _recycle_article_data_handler(self) -> int:
  76. date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
  77. recycle = RecycleDailyPublishArticlesTask(
  78. self.db_client, self.log_client, date_str
  79. )
  80. await recycle.deal()
  81. check = CheckDailyPublishArticlesTask(self.db_client, self.log_client, date_str)
  82. await check.deal()
  83. return self.TASK_SUCCESS_STATUS
  84. async def _crawler_toutiao_handler(self) -> int:
  85. sub_task = CrawlerToutiao(self.db_client, self.log_client, self.trace_id)
  86. method = self.data.get("method", "account")
  87. media_type = self.data.get("media_type", "article")
  88. category_list = self.data.get("category_list", [])
  89. match method:
  90. case "account":
  91. await sub_task.crawler_task(media_type=media_type)
  92. case "recommend":
  93. await sub_task.crawl_toutiao_recommend_task(category_list)
  94. case "search":
  95. await sub_task.search_candidate_accounts()
  96. case _:
  97. raise ValueError(f"Unsupported method {method}")
  98. return self.TASK_SUCCESS_STATUS
  99. async def _article_pool_cold_start_handler(self) -> int:
  100. cold_start = ArticlePoolColdStart(
  101. self.db_client, self.log_client, self.trace_id
  102. )
  103. platform = self.data.get("platform", "weixin")
  104. crawler_methods = self.data.get("crawler_methods", [])
  105. category_list = self.data.get("category_list", [])
  106. strategy = self.data.get("strategy", "strategy_v1")
  107. await cold_start.deal(
  108. platform=platform,
  109. crawl_methods=crawler_methods,
  110. category_list=category_list,
  111. strategy=strategy,
  112. )
  113. return self.TASK_SUCCESS_STATUS
  114. async def _candidate_account_quality_score_handler(self) -> int:
  115. task = CandidateAccountQualityScoreRecognizer(
  116. self.db_client, self.log_client, self.trace_id
  117. )
  118. await task.deal()
  119. return self.TASK_SUCCESS_STATUS
  120. async def _article_pool_category_generation_handler(self) -> int:
  121. task = ArticlePoolCategoryGeneration(
  122. self.db_client, self.log_client, self.trace_id
  123. )
  124. limit_num = self.data.get("limit")
  125. await task.deal(limit=limit_num)
  126. return self.TASK_SUCCESS_STATUS
  127. async def _crawler_account_manager_handler(self) -> int:
  128. platform = self.data.get("platform", "weixin")
  129. account_id_list = self.data.get("account_id_list")
  130. match platform:
  131. case "weixin":
  132. task = WeixinAccountManager(
  133. self.db_client, self.log_client, self.trace_id
  134. )
  135. case _:
  136. raise ValueError(f"Unsupported platform {platform}")
  137. await task.deal(platform=platform, account_id_list=account_id_list)
  138. return self.TASK_SUCCESS_STATUS
  139. # 抓取公众号文章
  140. async def _crawler_gzh_article_handler(self) -> int:
  141. account_method = self.data.get("account_method")
  142. crawl_mode = self.data.get("crawl_mode")
  143. strategy = self.data.get("strategy")
  144. match crawl_mode:
  145. case "account":
  146. task = CrawlerGzhAccountArticles(
  147. self.db_client, self.log_client, self.trace_id
  148. )
  149. await task.deal(account_method, strategy)
  150. case "search":
  151. task = CrawlerGzhSearchArticles(
  152. self.db_client, self.log_client, self.trace_id
  153. )
  154. await task.deal(strategy)
  155. case _:
  156. raise ValueError(f"Unsupported crawl mode {crawl_mode}")
  157. return self.TASK_SUCCESS_STATUS
  158. # 回收服务号文章
  159. async def _recycle_fwh_article_handler(self) -> int:
  160. task = RecycleFwhDailyPublishArticlesTask(self.db_client, self.log_client)
  161. await task.deal()
  162. return self.TASK_SUCCESS_STATUS
  163. # 账号品类处理任务
  164. async def _account_category_analysis_handler(self) -> int:
  165. task = AccountCategoryAnalysis(
  166. pool=self.db_client,
  167. log_client=self.log_client,
  168. trace_id=self.trace_id,
  169. data=self.data,
  170. date_string=None,
  171. )
  172. await task.deal()
  173. return self.TASK_SUCCESS_STATUS
  174. # 抓取视频/文章详情分析统计
  175. async def _crawler_article_analysis_handler(self) -> int:
  176. task = CrawlerDetailDeal(pool=self.db_client, trace_id=self.trace_id)
  177. await task.deal(params=self.data)
  178. return self.TASK_SUCCESS_STATUS
  179. # 更新小程序裂变信息
  180. async def _mini_program_detail_handler(self) -> int:
  181. task = RecycleMiniProgramDetailTask(
  182. pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
  183. )
  184. await task.deal(params=self.data)
  185. return self.TASK_SUCCESS_STATUS
  186. # 提取标题特征
  187. async def _extract_title_features_handler(self) -> int:
  188. task = ExtractTitleFeatures(self.db_client, self.log_client, self.trace_id)
  189. await task.deal(data=self.data)
  190. return self.TASK_SUCCESS_STATUS
  191. # 回收外部账号文章
  192. async def _recycle_outside_account_article_handler(self) -> int:
  193. date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
  194. task = RecycleOutsideAccountArticlesTask(
  195. pool=self.db_client, log_client=self.log_client, date_string=date_str
  196. )
  197. await task.deal()
  198. return self.TASK_SUCCESS_STATUS
  199. # 更新外部账号文章的root_source_id和update_time
  200. async def _update_outside_account_article_root_source_id_and_update_time_handler(
  201. self,
  202. ) -> int:
  203. task = UpdateOutsideRootSourceIdAndUpdateTimeTask(
  204. pool=self.db_client, log_client=self.log_client
  205. )
  206. await task.deal()
  207. return self.TASK_SUCCESS_STATUS
  208. # 更新限流账号信息
  209. async def _update_limited_account_info_handler(self) -> int:
  210. task = LimitedAccountAnalysisTask(
  211. pool=self.db_client, log_client=self.log_client
  212. )
  213. await task.deal(date_string=self.data.get("date_string"))
  214. return self.TASK_SUCCESS_STATUS
  215. # 更新账号阅读率均值
  216. async def _update_account_read_rate_avg_handler(self) -> int:
  217. task = AccountPositionReadRateAvg(
  218. pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
  219. )
  220. await task.deal(end_date=self.data.get("end_date"))
  221. return self.TASK_SUCCESS_STATUS
  222. # 更新账号阅读均值
  223. async def _update_account_read_avg_handler(self) -> int:
  224. task = AccountPositionReadAvg(
  225. pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
  226. )
  227. await task.deal(end_date=self.data.get("end_date"))
  228. return self.TASK_SUCCESS_STATUS
  229. __all__ = ["TaskHandler"]