task_handler.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. from datetime import datetime
  2. from applications.tasks.cold_start_tasks import ArticlePoolColdStart
  3. from applications.tasks.crawler_tasks import CrawlerToutiao
  4. from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
  5. from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
  6. from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
  7. from applications.tasks.llm_tasks import TitleRewrite
  8. from applications.tasks.llm_tasks import CandidateAccountQualityScoreRecognizer
  9. from applications.tasks.monitor_tasks import check_kimi_balance
  10. from applications.tasks.monitor_tasks import GetOffVideos
  11. from applications.tasks.monitor_tasks import CheckVideoAuditStatus
  12. from applications.tasks.monitor_tasks import InnerGzhArticlesMonitor
  13. from applications.tasks.monitor_tasks import OutsideGzhArticlesMonitor
  14. from applications.tasks.monitor_tasks import OutsideGzhArticlesCollector
  15. from applications.tasks.monitor_tasks import TaskProcessingMonitor
  16. from applications.tasks.task_mapper import TaskMapper
  17. class TaskHandler(TaskMapper):
  18. def __init__(self, data, log_service, db_client, trace_id):
  19. self.data = data
  20. self.log_client = log_service
  21. self.db_client = db_client
  22. self.trace_id = trace_id
  23. # ---------- 下面是若干复合任务的局部实现 ----------
  24. async def _check_kimi_balance_handler(self) -> int:
  25. response = await check_kimi_balance()
  26. await self.log_client.log(
  27. contents={
  28. "trace_id": self.trace_id,
  29. "task": "check_kimi_balance",
  30. "data": response,
  31. }
  32. )
  33. return self.TASK_SUCCESS_STATUS
  34. async def _get_off_videos_task_handler(self) -> int:
  35. sub_task = GetOffVideos(self.db_client, self.log_client, self.trace_id)
  36. return await sub_task.deal()
  37. async def _check_video_audit_status_handler(self) -> int:
  38. sub_task = CheckVideoAuditStatus(self.db_client, self.log_client, self.trace_id)
  39. return await sub_task.deal()
  40. async def _task_processing_monitor_handler(self) -> int:
  41. sub_task = TaskProcessingMonitor(self.db_client)
  42. await sub_task.deal()
  43. return self.TASK_SUCCESS_STATUS
  44. async def _inner_gzh_articles_monitor_handler(self) -> int:
  45. sub_task = InnerGzhArticlesMonitor(self.db_client)
  46. return await sub_task.deal()
  47. async def _title_rewrite_handler(self):
  48. sub_task = TitleRewrite(self.db_client, self.log_client)
  49. return await sub_task.deal()
  50. async def _update_root_source_id_handler(self) -> int:
  51. sub_task = UpdateRootSourceIdAndUpdateTimeTask(self.db_client, self.log_client)
  52. await sub_task.deal()
  53. return self.TASK_SUCCESS_STATUS
  54. async def _outside_monitor_handler(self) -> int:
  55. collector = OutsideGzhArticlesCollector(self.db_client)
  56. await collector.deal()
  57. monitor = OutsideGzhArticlesMonitor(self.db_client)
  58. return await monitor.deal() # 应返回 SUCCESS / FAILED
  59. async def _recycle_article_data_handler(self) -> int:
  60. date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
  61. recycle = RecycleDailyPublishArticlesTask(
  62. self.db_client, self.log_client, date_str
  63. )
  64. await recycle.deal()
  65. check = CheckDailyPublishArticlesTask(self.db_client, self.log_client, date_str)
  66. await check.deal()
  67. return self.TASK_SUCCESS_STATUS
  68. async def _crawler_toutiao_handler(self) -> int:
  69. sub_task = CrawlerToutiao(self.db_client, self.log_client, self.trace_id)
  70. method = self.data.get("method", "account")
  71. media_type = self.data.get("media_type", "article")
  72. category_list = self.data.get("category_list", [])
  73. match method:
  74. case "account":
  75. await sub_task.crawler_task(media_type=media_type)
  76. case "recommend":
  77. await sub_task.crawl_toutiao_recommend_task(category_list)
  78. case "search":
  79. await sub_task.search_candidate_accounts()
  80. case _:
  81. raise ValueError(f"Unsupported method {method}")
  82. return self.TASK_SUCCESS_STATUS
  83. async def _article_pool_cold_start_handler(self) -> int:
  84. cold_start = ArticlePoolColdStart(
  85. self.db_client, self.log_client, self.trace_id
  86. )
  87. platform = self.data.get("platform", "weixin")
  88. crawler_methods = self.data.get("crawler_methods", [])
  89. await cold_start.deal(platform=platform, crawl_methods=crawler_methods)
  90. return self.TASK_SUCCESS_STATUS
  91. async def _candidate_account_quality_score_handler(self) -> int:
  92. task = CandidateAccountQualityScoreRecognizer(
  93. self.db_client, self.log_client, self.trace_id
  94. )
  95. await task.deal()
  96. return self.TASK_SUCCESS_STATUS