task_handler.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. from datetime import datetime
  2. from applications.tasks.algorithm_tasks import AccountCategoryAnalysis
  3. from applications.tasks.cold_start_tasks import ArticlePoolColdStart
  4. from applications.tasks.crawler_tasks import CrawlerToutiao
  5. from applications.tasks.crawler_tasks import WeixinAccountManager
  6. from applications.tasks.crawler_tasks import CrawlerGzhAccountArticles
  7. from applications.tasks.crawler_tasks import CrawlerGzhSearchArticles
  8. from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
  9. from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
  10. from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
  11. from applications.tasks.data_recycle_tasks import RecycleFwhDailyPublishArticlesTask
  12. from applications.tasks.llm_tasks import TitleRewrite
  13. from applications.tasks.llm_tasks import ArticlePoolCategoryGeneration
  14. from applications.tasks.llm_tasks import CandidateAccountQualityScoreRecognizer
  15. from applications.tasks.monitor_tasks import check_kimi_balance
  16. from applications.tasks.monitor_tasks import GetOffVideos
  17. from applications.tasks.monitor_tasks import CheckVideoAuditStatus
  18. from applications.tasks.monitor_tasks import InnerGzhArticlesMonitor
  19. from applications.tasks.monitor_tasks import OutsideGzhArticlesMonitor
  20. from applications.tasks.monitor_tasks import OutsideGzhArticlesCollector
  21. from applications.tasks.monitor_tasks import TaskProcessingMonitor
  22. from applications.tasks.task_mapper import TaskMapper
  23. class TaskHandler(TaskMapper):
  24. def __init__(self, data, log_service, db_client, trace_id):
  25. self.data = data
  26. self.log_client = log_service
  27. self.db_client = db_client
  28. self.trace_id = trace_id
  29. # ---------- 下面是若干复合任务的局部实现 ----------
  30. async def _check_kimi_balance_handler(self) -> int:
  31. response = await check_kimi_balance()
  32. await self.log_client.log(
  33. contents={
  34. "trace_id": self.trace_id,
  35. "task": "check_kimi_balance",
  36. "data": response,
  37. }
  38. )
  39. return self.TASK_SUCCESS_STATUS
  40. async def _get_off_videos_task_handler(self) -> int:
  41. sub_task = GetOffVideos(self.db_client, self.log_client, self.trace_id)
  42. return await sub_task.deal()
  43. async def _check_video_audit_status_handler(self) -> int:
  44. sub_task = CheckVideoAuditStatus(self.db_client, self.log_client, self.trace_id)
  45. return await sub_task.deal()
  46. async def _task_processing_monitor_handler(self) -> int:
  47. sub_task = TaskProcessingMonitor(self.db_client)
  48. await sub_task.deal()
  49. return self.TASK_SUCCESS_STATUS
  50. async def _inner_gzh_articles_monitor_handler(self) -> int:
  51. sub_task = InnerGzhArticlesMonitor(self.db_client)
  52. return await sub_task.deal()
  53. async def _title_rewrite_handler(self):
  54. sub_task = TitleRewrite(self.db_client, self.log_client, self.trace_id)
  55. return await sub_task.deal()
  56. async def _update_root_source_id_handler(self) -> int:
  57. sub_task = UpdateRootSourceIdAndUpdateTimeTask(self.db_client, self.log_client)
  58. await sub_task.deal()
  59. return self.TASK_SUCCESS_STATUS
  60. async def _outside_monitor_handler(self) -> int:
  61. collector = OutsideGzhArticlesCollector(self.db_client)
  62. await collector.deal()
  63. monitor = OutsideGzhArticlesMonitor(self.db_client)
  64. return await monitor.deal() # 应返回 SUCCESS / FAILED
  65. async def _recycle_article_data_handler(self) -> int:
  66. date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
  67. recycle = RecycleDailyPublishArticlesTask(
  68. self.db_client, self.log_client, date_str
  69. )
  70. await recycle.deal()
  71. check = CheckDailyPublishArticlesTask(self.db_client, self.log_client, date_str)
  72. await check.deal()
  73. return self.TASK_SUCCESS_STATUS
  74. async def _crawler_toutiao_handler(self) -> int:
  75. sub_task = CrawlerToutiao(self.db_client, self.log_client, self.trace_id)
  76. method = self.data.get("method", "account")
  77. media_type = self.data.get("media_type", "article")
  78. category_list = self.data.get("category_list", [])
  79. match method:
  80. case "account":
  81. await sub_task.crawler_task(media_type=media_type)
  82. case "recommend":
  83. await sub_task.crawl_toutiao_recommend_task(category_list)
  84. case "search":
  85. await sub_task.search_candidate_accounts()
  86. case _:
  87. raise ValueError(f"Unsupported method {method}")
  88. return self.TASK_SUCCESS_STATUS
  89. async def _article_pool_cold_start_handler(self) -> int:
  90. cold_start = ArticlePoolColdStart(
  91. self.db_client, self.log_client, self.trace_id
  92. )
  93. platform = self.data.get("platform", "weixin")
  94. crawler_methods = self.data.get("crawler_methods", [])
  95. category_list = self.data.get("category_list", [])
  96. strategy = self.data.get("strategy", "strategy_v1")
  97. await cold_start.deal(
  98. platform=platform,
  99. crawl_methods=crawler_methods,
  100. category_list=category_list,
  101. strategy=strategy,
  102. )
  103. return self.TASK_SUCCESS_STATUS
  104. async def _candidate_account_quality_score_handler(self) -> int:
  105. task = CandidateAccountQualityScoreRecognizer(
  106. self.db_client, self.log_client, self.trace_id
  107. )
  108. await task.deal()
  109. return self.TASK_SUCCESS_STATUS
  110. async def _article_pool_category_generation_handler(self) -> int:
  111. task = ArticlePoolCategoryGeneration(
  112. self.db_client, self.log_client, self.trace_id
  113. )
  114. limit_num = self.data.get("limit")
  115. await task.deal(limit=limit_num)
  116. return self.TASK_SUCCESS_STATUS
  117. async def _crawler_account_manager_handler(self) -> int:
  118. platform = self.data.get("platform", "weixin")
  119. account_id_list = self.data.get("account_id_list")
  120. match platform:
  121. case "weixin":
  122. task = WeixinAccountManager(
  123. self.db_client, self.log_client, self.trace_id
  124. )
  125. case _:
  126. raise ValueError(f"Unsupported platform {platform}")
  127. await task.deal(platform=platform, account_id_list=account_id_list)
  128. return self.TASK_SUCCESS_STATUS
  129. async def _crawler_gzh_article_handler(self) -> int:
  130. account_method = self.data.get("account_method")
  131. crawl_mode = self.data.get("crawl_mode")
  132. strategy = self.data.get("strategy")
  133. match crawl_mode:
  134. case "account":
  135. task = CrawlerGzhAccountArticles(
  136. self.db_client, self.log_client, self.trace_id
  137. )
  138. await task.deal(account_method, strategy)
  139. case "search":
  140. task = CrawlerGzhSearchArticles(
  141. self.db_client, self.log_client, self.trace_id
  142. )
  143. await task.deal(strategy)
  144. case _:
  145. raise ValueError(f"Unsupported crawl mode {crawl_mode}")
  146. return self.TASK_SUCCESS_STATUS
  147. async def _recycle_fwh_article_handler(self) -> int:
  148. task = RecycleFwhDailyPublishArticlesTask(self.db_client, self.log_client)
  149. await task.deal()
  150. return self.TASK_SUCCESS_STATUS
  151. async def _account_category_analysis_handler(self) -> int:
  152. task = AccountCategoryAnalysis(
  153. pool=self.db_client,
  154. log_client=self.log_client,
  155. trace_id=self.trace_id,
  156. data=self.data,
  157. date_string=None,
  158. )
  159. await task.deal()
  160. return self.TASK_SUCCESS_STATUS