task_handler.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. from datetime import datetime
  2. from applications.tasks.analysis_task import CrawlerDetailDeal
  3. from applications.tasks.algorithm_tasks import AccountCategoryAnalysis
  4. from applications.tasks.cold_start_tasks import ArticlePoolColdStart
  5. from applications.tasks.crawler_tasks import CrawlerToutiao
  6. from applications.tasks.crawler_tasks import WeixinAccountManager
  7. from applications.tasks.crawler_tasks import CrawlerGzhAccountArticles
  8. from applications.tasks.crawler_tasks import CrawlerGzhSearchArticles
  9. from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
  10. from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
  11. from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
  12. from applications.tasks.data_recycle_tasks import RecycleFwhDailyPublishArticlesTask
  13. from applications.tasks.data_recycle_tasks import RecycleMiniProgramDetailTask
  14. from applications.tasks.llm_tasks import TitleRewrite
  15. from applications.tasks.llm_tasks import ArticlePoolCategoryGeneration
  16. from applications.tasks.llm_tasks import CandidateAccountQualityScoreRecognizer
  17. from applications.tasks.monitor_tasks import check_kimi_balance
  18. from applications.tasks.monitor_tasks import GetOffVideos
  19. from applications.tasks.monitor_tasks import CheckVideoAuditStatus
  20. from applications.tasks.monitor_tasks import InnerGzhArticlesMonitor
  21. from applications.tasks.monitor_tasks import OutsideGzhArticlesMonitor
  22. from applications.tasks.monitor_tasks import OutsideGzhArticlesCollector
  23. from applications.tasks.monitor_tasks import TaskProcessingMonitor
  24. from applications.tasks.dev import DataAnalysis
  25. from applications.tasks.dev import GetAccountCategory
  26. from applications.tasks.dev import ReadScoreCalculator
  27. from applications.tasks.task_mapper import TaskMapper
  28. class TaskHandler(TaskMapper):
  29. def __init__(self, data, log_service, db_client, trace_id):
  30. self.data = data
  31. self.log_client = log_service
  32. self.db_client = db_client
  33. self.trace_id = trace_id
  34. # ---------- 下面是若干复合任务的局部实现 ----------
  35. async def _check_kimi_balance_handler(self) -> int:
  36. response = await check_kimi_balance()
  37. await self.log_client.log(
  38. contents={
  39. "trace_id": self.trace_id,
  40. "task": "check_kimi_balance",
  41. "data": response,
  42. }
  43. )
  44. return self.TASK_SUCCESS_STATUS
  45. async def _get_off_videos_task_handler(self) -> int:
  46. sub_task = GetOffVideos(self.db_client, self.log_client, self.trace_id)
  47. return await sub_task.deal()
  48. async def _check_video_audit_status_handler(self) -> int:
  49. sub_task = CheckVideoAuditStatus(self.db_client, self.log_client, self.trace_id)
  50. return await sub_task.deal()
  51. async def _task_processing_monitor_handler(self) -> int:
  52. sub_task = TaskProcessingMonitor(self.db_client)
  53. await sub_task.deal()
  54. return self.TASK_SUCCESS_STATUS
  55. async def _inner_gzh_articles_monitor_handler(self) -> int:
  56. sub_task = InnerGzhArticlesMonitor(self.db_client)
  57. return await sub_task.deal()
  58. async def _title_rewrite_handler(self):
  59. sub_task = TitleRewrite(self.db_client, self.log_client, self.trace_id)
  60. return await sub_task.deal()
  61. async def _update_root_source_id_handler(self) -> int:
  62. sub_task = UpdateRootSourceIdAndUpdateTimeTask(self.db_client, self.log_client)
  63. await sub_task.deal()
  64. return self.TASK_SUCCESS_STATUS
  65. async def _outside_monitor_handler(self) -> int:
  66. collector = OutsideGzhArticlesCollector(self.db_client)
  67. await collector.deal()
  68. monitor = OutsideGzhArticlesMonitor(self.db_client)
  69. return await monitor.deal() # 应返回 SUCCESS / FAILED
  70. async def _recycle_article_data_handler(self) -> int:
  71. date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
  72. recycle = RecycleDailyPublishArticlesTask(
  73. self.db_client, self.log_client, date_str
  74. )
  75. await recycle.deal()
  76. # check = CheckDailyPublishArticlesTask(self.db_client, self.log_client, date_str)
  77. # await check.deal()
  78. return self.TASK_SUCCESS_STATUS
  79. async def _crawler_toutiao_handler(self) -> int:
  80. sub_task = CrawlerToutiao(self.db_client, self.log_client, self.trace_id)
  81. method = self.data.get("method", "account")
  82. media_type = self.data.get("media_type", "article")
  83. category_list = self.data.get("category_list", [])
  84. match method:
  85. case "account":
  86. await sub_task.crawler_task(media_type=media_type)
  87. case "recommend":
  88. await sub_task.crawl_toutiao_recommend_task(category_list)
  89. case "search":
  90. await sub_task.search_candidate_accounts()
  91. case _:
  92. raise ValueError(f"Unsupported method {method}")
  93. return self.TASK_SUCCESS_STATUS
  94. async def _article_pool_cold_start_handler(self) -> int:
  95. cold_start = ArticlePoolColdStart(
  96. self.db_client, self.log_client, self.trace_id
  97. )
  98. platform = self.data.get("platform", "weixin")
  99. crawler_methods = self.data.get("crawler_methods", [])
  100. category_list = self.data.get("category_list", [])
  101. strategy = self.data.get("strategy", "strategy_v1")
  102. await cold_start.deal(
  103. platform=platform,
  104. crawl_methods=crawler_methods,
  105. category_list=category_list,
  106. strategy=strategy,
  107. )
  108. return self.TASK_SUCCESS_STATUS
  109. async def _candidate_account_quality_score_handler(self) -> int:
  110. task = CandidateAccountQualityScoreRecognizer(
  111. self.db_client, self.log_client, self.trace_id
  112. )
  113. await task.deal()
  114. return self.TASK_SUCCESS_STATUS
  115. async def _article_pool_category_generation_handler(self) -> int:
  116. task = ArticlePoolCategoryGeneration(
  117. self.db_client, self.log_client, self.trace_id
  118. )
  119. limit_num = self.data.get("limit")
  120. await task.deal(limit=limit_num)
  121. return self.TASK_SUCCESS_STATUS
  122. async def _crawler_account_manager_handler(self) -> int:
  123. platform = self.data.get("platform", "weixin")
  124. account_id_list = self.data.get("account_id_list")
  125. match platform:
  126. case "weixin":
  127. task = WeixinAccountManager(
  128. self.db_client, self.log_client, self.trace_id
  129. )
  130. case _:
  131. raise ValueError(f"Unsupported platform {platform}")
  132. await task.deal(platform=platform, account_id_list=account_id_list)
  133. return self.TASK_SUCCESS_STATUS
  134. # 抓取公众号文章
  135. async def _crawler_gzh_article_handler(self) -> int:
  136. account_method = self.data.get("account_method")
  137. crawl_mode = self.data.get("crawl_mode")
  138. strategy = self.data.get("strategy")
  139. match crawl_mode:
  140. case "account":
  141. task = CrawlerGzhAccountArticles(
  142. self.db_client, self.log_client, self.trace_id
  143. )
  144. await task.deal(account_method, strategy)
  145. case "search":
  146. task = CrawlerGzhSearchArticles(
  147. self.db_client, self.log_client, self.trace_id
  148. )
  149. await task.deal(strategy)
  150. case _:
  151. raise ValueError(f"Unsupported crawl mode {crawl_mode}")
  152. return self.TASK_SUCCESS_STATUS
  153. # 回收服务号文章
  154. async def _recycle_fwh_article_handler(self) -> int:
  155. task = RecycleFwhDailyPublishArticlesTask(self.db_client, self.log_client)
  156. await task.deal()
  157. return self.TASK_SUCCESS_STATUS
  158. # 账号品类处理任务
  159. async def _account_category_analysis_handler(self) -> int:
  160. task = AccountCategoryAnalysis(
  161. pool=self.db_client,
  162. log_client=self.log_client,
  163. trace_id=self.trace_id,
  164. data=self.data,
  165. date_string=None,
  166. )
  167. await task.deal()
  168. return self.TASK_SUCCESS_STATUS
  169. # 抓取视频/文章详情分析统计
  170. async def _crawler_article_analysis_handler(self) -> int:
  171. task = CrawlerDetailDeal(pool=self.db_client, trace_id=self.trace_id)
  172. await task.deal(params=self.data)
  173. return self.TASK_SUCCESS_STATUS
  174. # 更新小程序裂变信息
  175. async def _mini_program_detail_handler(self) -> int:
  176. task = RecycleMiniProgramDetailTask(pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id)
  177. await task.deal(params=self.data)
  178. return self.TASK_SUCCESS_STATUS
  179. async def _data_analysis_handler(self) -> int:
  180. task = DataAnalysis(self.db_client)
  181. await task.deal()
  182. return self.TASK_SUCCESS_STATUS
  183. async def _get_account_category_handler(self) -> int:
  184. task = GetAccountCategory(self.db_client)
  185. await task.deal()
  186. return self.TASK_SUCCESS_STATUS
  187. async def _cal_read_score_handler(self) -> int:
  188. task = ReadScoreCalculator(self.db_client)
  189. await task.deal(self.data)
  190. return self.TASK_SUCCESS_STATUS
  191. __all__ = ["TaskHandler"]