task_handler.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. from datetime import datetime
  2. from applications.tasks.cold_start_tasks import ArticlePoolColdStart
  3. from applications.tasks.crawler_tasks import CrawlerToutiao
  4. from applications.tasks.crawler_tasks import WeixinAccountManager
  5. from applications.tasks.crawler_tasks import CrawlerGzhAccountArticles
  6. from applications.tasks.crawler_tasks import CrawlerGzhSearchArticles
  7. from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
  8. from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
  9. from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
  10. from applications.tasks.data_recycle_tasks import RecycleFwhDailyPublishArticlesTask
  11. from applications.tasks.llm_tasks import TitleRewrite
  12. from applications.tasks.llm_tasks import ArticlePoolCategoryGeneration
  13. from applications.tasks.llm_tasks import CandidateAccountQualityScoreRecognizer
  14. from applications.tasks.monitor_tasks import check_kimi_balance
  15. from applications.tasks.monitor_tasks import GetOffVideos
  16. from applications.tasks.monitor_tasks import CheckVideoAuditStatus
  17. from applications.tasks.monitor_tasks import InnerGzhArticlesMonitor
  18. from applications.tasks.monitor_tasks import OutsideGzhArticlesMonitor
  19. from applications.tasks.monitor_tasks import OutsideGzhArticlesCollector
  20. from applications.tasks.monitor_tasks import TaskProcessingMonitor
  21. from applications.tasks.task_mapper import TaskMapper
  22. class TaskHandler(TaskMapper):
  23. def __init__(self, data, log_service, db_client, trace_id):
  24. self.data = data
  25. self.log_client = log_service
  26. self.db_client = db_client
  27. self.trace_id = trace_id
  28. # ---------- 下面是若干复合任务的局部实现 ----------
  29. async def _check_kimi_balance_handler(self) -> int:
  30. response = await check_kimi_balance()
  31. await self.log_client.log(
  32. contents={
  33. "trace_id": self.trace_id,
  34. "task": "check_kimi_balance",
  35. "data": response,
  36. }
  37. )
  38. return self.TASK_SUCCESS_STATUS
  39. async def _get_off_videos_task_handler(self) -> int:
  40. sub_task = GetOffVideos(self.db_client, self.log_client, self.trace_id)
  41. return await sub_task.deal()
  42. async def _check_video_audit_status_handler(self) -> int:
  43. sub_task = CheckVideoAuditStatus(self.db_client, self.log_client, self.trace_id)
  44. return await sub_task.deal()
  45. async def _task_processing_monitor_handler(self) -> int:
  46. sub_task = TaskProcessingMonitor(self.db_client)
  47. await sub_task.deal()
  48. return self.TASK_SUCCESS_STATUS
  49. async def _inner_gzh_articles_monitor_handler(self) -> int:
  50. sub_task = InnerGzhArticlesMonitor(self.db_client)
  51. return await sub_task.deal()
  52. async def _title_rewrite_handler(self):
  53. sub_task = TitleRewrite(self.db_client, self.log_client, self.trace_id)
  54. return await sub_task.deal()
  55. async def _update_root_source_id_handler(self) -> int:
  56. sub_task = UpdateRootSourceIdAndUpdateTimeTask(self.db_client, self.log_client)
  57. await sub_task.deal()
  58. return self.TASK_SUCCESS_STATUS
  59. async def _outside_monitor_handler(self) -> int:
  60. collector = OutsideGzhArticlesCollector(self.db_client)
  61. await collector.deal()
  62. monitor = OutsideGzhArticlesMonitor(self.db_client)
  63. return await monitor.deal() # 应返回 SUCCESS / FAILED
  64. async def _recycle_article_data_handler(self) -> int:
  65. date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
  66. recycle = RecycleDailyPublishArticlesTask(
  67. self.db_client, self.log_client, date_str
  68. )
  69. await recycle.deal()
  70. check = CheckDailyPublishArticlesTask(self.db_client, self.log_client, date_str)
  71. await check.deal()
  72. return self.TASK_SUCCESS_STATUS
  73. async def _crawler_toutiao_handler(self) -> int:
  74. sub_task = CrawlerToutiao(self.db_client, self.log_client, self.trace_id)
  75. method = self.data.get("method", "account")
  76. media_type = self.data.get("media_type", "article")
  77. category_list = self.data.get("category_list", [])
  78. match method:
  79. case "account":
  80. await sub_task.crawler_task(media_type=media_type)
  81. case "recommend":
  82. await sub_task.crawl_toutiao_recommend_task(category_list)
  83. case "search":
  84. await sub_task.search_candidate_accounts()
  85. case _:
  86. raise ValueError(f"Unsupported method {method}")
  87. return self.TASK_SUCCESS_STATUS
  88. async def _article_pool_cold_start_handler(self) -> int:
  89. cold_start = ArticlePoolColdStart(
  90. self.db_client, self.log_client, self.trace_id
  91. )
  92. platform = self.data.get("platform", "weixin")
  93. crawler_methods = self.data.get("crawler_methods", [])
  94. await cold_start.deal(platform=platform, crawl_methods=crawler_methods)
  95. return self.TASK_SUCCESS_STATUS
  96. async def _candidate_account_quality_score_handler(self) -> int:
  97. task = CandidateAccountQualityScoreRecognizer(
  98. self.db_client, self.log_client, self.trace_id
  99. )
  100. await task.deal()
  101. return self.TASK_SUCCESS_STATUS
  102. async def _article_pool_category_generation_handler(self) -> int:
  103. task = ArticlePoolCategoryGeneration(
  104. self.db_client, self.log_client, self.trace_id
  105. )
  106. limit_num = self.data.get("limit")
  107. await task.deal(limit=limit_num)
  108. return self.TASK_SUCCESS_STATUS
  109. async def _crawler_account_manager_handler(self) -> int:
  110. platform = self.data.get("platform", "weixin")
  111. account_id_list = self.data.get("account_id_list")
  112. match platform:
  113. case "weixin":
  114. task = WeixinAccountManager(
  115. self.db_client, self.log_client, self.trace_id
  116. )
  117. case _:
  118. raise ValueError(f"Unsupported platform {platform}")
  119. await task.deal(platform=platform, account_id_list=account_id_list)
  120. return self.TASK_SUCCESS_STATUS
  121. async def _crawler_gzh_article_handler(self) -> int:
  122. account_method = self.data.get("account_method")
  123. crawl_mode = self.data.get("crawl_mode")
  124. strategy = self.data.get("strategy")
  125. match crawl_mode:
  126. case "account":
  127. task = CrawlerGzhAccountArticles(
  128. self.db_client, self.log_client, self.trace_id
  129. )
  130. await task.deal(account_method, strategy)
  131. case "search":
  132. task = CrawlerGzhSearchArticles(
  133. self.db_client, self.log_client, self.trace_id
  134. )
  135. await task.deal(strategy)
  136. case _:
  137. raise ValueError(f"Unsupported crawl mode {crawl_mode}")
  138. return self.TASK_SUCCESS_STATUS
  139. async def _recycle_fwh_article_handler(self) -> int:
  140. task = RecycleFwhDailyPublishArticlesTask(self.db_client, self.log_client)
  141. await task.deal()
  142. return self.TASK_SUCCESS_STATUS