task_handler.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. from datetime import datetime
  2. from applications.tasks.analysis_task import CrawlerDetailDeal
  3. from applications.tasks.algorithm_tasks import AccountCategoryAnalysis
  4. from applications.tasks.cold_start_tasks import ArticlePoolColdStart
  5. from applications.tasks.crawler_tasks import CrawlerToutiao
  6. from applications.tasks.crawler_tasks import WeixinAccountManager
  7. from applications.tasks.crawler_tasks import CrawlerGzhAccountArticles
  8. from applications.tasks.crawler_tasks import CrawlerGzhSearchArticles
  9. from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
  10. from applications.tasks.data_recycle_tasks import RecycleOutsideAccountArticlesTask
  11. from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
  12. from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
  13. from applications.tasks.data_recycle_tasks import RecycleFwhDailyPublishArticlesTask
  14. from applications.tasks.data_recycle_tasks import RecycleMiniProgramDetailTask
  15. from applications.tasks.data_recycle_tasks import (
  16. UpdateOutsideRootSourceIdAndUpdateTimeTask,
  17. )
  18. from applications.tasks.llm_tasks import TitleRewrite
  19. from applications.tasks.llm_tasks import ArticlePoolCategoryGeneration
  20. from applications.tasks.llm_tasks import CandidateAccountQualityScoreRecognizer
  21. from applications.tasks.llm_tasks import ExtractTitleFeatures
  22. from applications.tasks.monitor_tasks import check_kimi_balance
  23. from applications.tasks.monitor_tasks import GetOffVideos
  24. from applications.tasks.monitor_tasks import CheckVideoAuditStatus
  25. from applications.tasks.monitor_tasks import InnerGzhArticlesMonitor
  26. from applications.tasks.monitor_tasks import OutsideGzhArticlesMonitor
  27. from applications.tasks.monitor_tasks import OutsideGzhArticlesCollector
  28. from applications.tasks.monitor_tasks import TaskProcessingMonitor
  29. from applications.tasks.monitor_tasks import LimitedAccountAnalysisTask
  30. from applications.tasks.task_mapper import TaskMapper
  31. class TaskHandler(TaskMapper):
  32. def __init__(self, data, log_service, db_client, trace_id):
  33. self.data = data
  34. self.log_client = log_service
  35. self.db_client = db_client
  36. self.trace_id = trace_id
  37. # ---------- 下面是若干复合任务的局部实现 ----------
  38. async def _check_kimi_balance_handler(self) -> int:
  39. response = await check_kimi_balance()
  40. await self.log_client.log(
  41. contents={
  42. "trace_id": self.trace_id,
  43. "task": "check_kimi_balance",
  44. "data": response,
  45. }
  46. )
  47. return self.TASK_SUCCESS_STATUS
  48. async def _get_off_videos_task_handler(self) -> int:
  49. sub_task = GetOffVideos(self.db_client, self.log_client, self.trace_id)
  50. return await sub_task.deal()
  51. async def _check_video_audit_status_handler(self) -> int:
  52. sub_task = CheckVideoAuditStatus(self.db_client, self.log_client, self.trace_id)
  53. return await sub_task.deal()
  54. async def _task_processing_monitor_handler(self) -> int:
  55. sub_task = TaskProcessingMonitor(self.db_client)
  56. await sub_task.deal()
  57. return self.TASK_SUCCESS_STATUS
  58. async def _inner_gzh_articles_monitor_handler(self) -> int:
  59. sub_task = InnerGzhArticlesMonitor(self.db_client)
  60. return await sub_task.deal()
  61. async def _title_rewrite_handler(self):
  62. sub_task = TitleRewrite(self.db_client, self.log_client, self.trace_id)
  63. return await sub_task.deal()
  64. async def _update_root_source_id_handler(self) -> int:
  65. sub_task = UpdateRootSourceIdAndUpdateTimeTask(self.db_client, self.log_client)
  66. await sub_task.deal()
  67. return self.TASK_SUCCESS_STATUS
  68. async def _outside_monitor_handler(self) -> int:
  69. collector = OutsideGzhArticlesCollector(self.db_client)
  70. await collector.deal()
  71. monitor = OutsideGzhArticlesMonitor(self.db_client)
  72. return await monitor.deal() # 应返回 SUCCESS / FAILED
  73. async def _recycle_article_data_handler(self) -> int:
  74. date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
  75. recycle = RecycleDailyPublishArticlesTask(
  76. self.db_client, self.log_client, date_str
  77. )
  78. await recycle.deal()
  79. check = CheckDailyPublishArticlesTask(self.db_client, self.log_client, date_str)
  80. await check.deal()
  81. return self.TASK_SUCCESS_STATUS
  82. async def _crawler_toutiao_handler(self) -> int:
  83. sub_task = CrawlerToutiao(self.db_client, self.log_client, self.trace_id)
  84. method = self.data.get("method", "account")
  85. media_type = self.data.get("media_type", "article")
  86. category_list = self.data.get("category_list", [])
  87. match method:
  88. case "account":
  89. await sub_task.crawler_task(media_type=media_type)
  90. case "recommend":
  91. await sub_task.crawl_toutiao_recommend_task(category_list)
  92. case "search":
  93. await sub_task.search_candidate_accounts()
  94. case _:
  95. raise ValueError(f"Unsupported method {method}")
  96. return self.TASK_SUCCESS_STATUS
  97. async def _article_pool_cold_start_handler(self) -> int:
  98. cold_start = ArticlePoolColdStart(
  99. self.db_client, self.log_client, self.trace_id
  100. )
  101. platform = self.data.get("platform", "weixin")
  102. crawler_methods = self.data.get("crawler_methods", [])
  103. category_list = self.data.get("category_list", [])
  104. strategy = self.data.get("strategy", "strategy_v1")
  105. await cold_start.deal(
  106. platform=platform,
  107. crawl_methods=crawler_methods,
  108. category_list=category_list,
  109. strategy=strategy,
  110. )
  111. return self.TASK_SUCCESS_STATUS
  112. async def _candidate_account_quality_score_handler(self) -> int:
  113. task = CandidateAccountQualityScoreRecognizer(
  114. self.db_client, self.log_client, self.trace_id
  115. )
  116. await task.deal()
  117. return self.TASK_SUCCESS_STATUS
  118. async def _article_pool_category_generation_handler(self) -> int:
  119. task = ArticlePoolCategoryGeneration(
  120. self.db_client, self.log_client, self.trace_id
  121. )
  122. limit_num = self.data.get("limit")
  123. await task.deal(limit=limit_num)
  124. return self.TASK_SUCCESS_STATUS
  125. async def _crawler_account_manager_handler(self) -> int:
  126. platform = self.data.get("platform", "weixin")
  127. account_id_list = self.data.get("account_id_list")
  128. match platform:
  129. case "weixin":
  130. task = WeixinAccountManager(
  131. self.db_client, self.log_client, self.trace_id
  132. )
  133. case _:
  134. raise ValueError(f"Unsupported platform {platform}")
  135. await task.deal(platform=platform, account_id_list=account_id_list)
  136. return self.TASK_SUCCESS_STATUS
  137. # 抓取公众号文章
  138. async def _crawler_gzh_article_handler(self) -> int:
  139. account_method = self.data.get("account_method")
  140. crawl_mode = self.data.get("crawl_mode")
  141. strategy = self.data.get("strategy")
  142. match crawl_mode:
  143. case "account":
  144. task = CrawlerGzhAccountArticles(
  145. self.db_client, self.log_client, self.trace_id
  146. )
  147. await task.deal(account_method, strategy)
  148. case "search":
  149. task = CrawlerGzhSearchArticles(
  150. self.db_client, self.log_client, self.trace_id
  151. )
  152. await task.deal(strategy)
  153. case _:
  154. raise ValueError(f"Unsupported crawl mode {crawl_mode}")
  155. return self.TASK_SUCCESS_STATUS
  156. # 回收服务号文章
  157. async def _recycle_fwh_article_handler(self) -> int:
  158. task = RecycleFwhDailyPublishArticlesTask(self.db_client, self.log_client)
  159. await task.deal()
  160. return self.TASK_SUCCESS_STATUS
  161. # 账号品类处理任务
  162. async def _account_category_analysis_handler(self) -> int:
  163. task = AccountCategoryAnalysis(
  164. pool=self.db_client,
  165. log_client=self.log_client,
  166. trace_id=self.trace_id,
  167. data=self.data,
  168. date_string=None,
  169. )
  170. await task.deal()
  171. return self.TASK_SUCCESS_STATUS
  172. # 抓取视频/文章详情分析统计
  173. async def _crawler_article_analysis_handler(self) -> int:
  174. task = CrawlerDetailDeal(pool=self.db_client, trace_id=self.trace_id)
  175. await task.deal(params=self.data)
  176. return self.TASK_SUCCESS_STATUS
  177. # 更新小程序裂变信息
  178. async def _mini_program_detail_handler(self) -> int:
  179. task = RecycleMiniProgramDetailTask(
  180. pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
  181. )
  182. await task.deal(params=self.data)
  183. return self.TASK_SUCCESS_STATUS
  184. # 提取标题特征
  185. async def _extract_title_features_handler(self) -> int:
  186. task = ExtractTitleFeatures(self.db_client, self.log_client, self.trace_id)
  187. await task.deal(data=self.data)
  188. return self.TASK_SUCCESS_STATUS
  189. # 回收外部账号文章
  190. async def _recycle_outside_account_article_handler(self) -> int:
  191. date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
  192. task = RecycleOutsideAccountArticlesTask(
  193. pool=self.db_client, log_client=self.log_client, date_string=date_str
  194. )
  195. await task.deal()
  196. return self.TASK_SUCCESS_STATUS
  197. # 更新外部账号文章的root_source_id和update_time
  198. async def _update_outside_account_article_root_source_id_and_update_time_handler(
  199. self,
  200. ) -> int:
  201. task = UpdateOutsideRootSourceIdAndUpdateTimeTask(
  202. pool=self.db_client, log_client=self.log_client
  203. )
  204. await task.deal()
  205. return self.TASK_SUCCESS_STATUS
  206. # 更新限流账号信息
  207. async def _update_limited_account_info_handler(self) -> int:
  208. task = LimitedAccountAnalysisTask(
  209. pool=self.db_client, log_client=self.log_client
  210. )
  211. await task.deal(date_string=self.data.get("date_string"))
  212. return self.TASK_SUCCESS_STATUS
  213. __all__ = ["TaskHandler"]