task_handler.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. from datetime import datetime
  2. from applications.tasks.analysis_task import CrawlerDetailDeal
  3. from applications.tasks.analysis_task import AccountPositionReadRateAvg
  4. from applications.tasks.analysis_task import AccountPositionReadAvg
  5. from applications.tasks.analysis_task import AccountPositionOpenRateAvg
  6. from applications.tasks.algorithm_tasks import AccountCategoryAnalysis
  7. from applications.tasks.cold_start_tasks import ArticlePoolColdStart
  8. from applications.tasks.crawler_tasks import CrawlerToutiao
  9. from applications.tasks.crawler_tasks import WeixinAccountManager
  10. from applications.tasks.crawler_tasks import CrawlerGzhAccountArticles
  11. from applications.tasks.crawler_tasks import CrawlerGzhSearchArticles
  12. from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
  13. from applications.tasks.data_recycle_tasks import RecycleOutsideAccountArticlesTask
  14. from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
  15. from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
  16. from applications.tasks.data_recycle_tasks import RecycleFwhDailyPublishArticlesTask
  17. from applications.tasks.data_recycle_tasks import RecycleMiniProgramDetailTask
  18. from applications.tasks.data_recycle_tasks import (
  19. UpdateOutsideRootSourceIdAndUpdateTimeTask,
  20. )
  21. from applications.tasks.llm_tasks import TitleRewrite
  22. from applications.tasks.llm_tasks import ArticlePoolCategoryGeneration
  23. from applications.tasks.llm_tasks import CandidateAccountQualityScoreRecognizer
  24. from applications.tasks.llm_tasks import ExtractTitleFeatures
  25. from applications.tasks.monitor_tasks import AutoReplyCardsMonitor
  26. from applications.tasks.monitor_tasks import check_kimi_balance
  27. from applications.tasks.monitor_tasks import GetOffVideos
  28. from applications.tasks.monitor_tasks import CheckVideoAuditStatus
  29. from applications.tasks.monitor_tasks import CooperateAccountsMonitorTask
  30. from applications.tasks.monitor_tasks import InnerGzhArticlesMonitor
  31. from applications.tasks.monitor_tasks import OutsideGzhArticlesMonitor
  32. from applications.tasks.monitor_tasks import OutsideGzhArticlesCollector
  33. from applications.tasks.monitor_tasks import TaskProcessingMonitor
  34. from applications.tasks.monitor_tasks import LimitedAccountAnalysisTask
  35. from applications.tasks.task_mapper import TaskMapper
  36. class TaskHandler(TaskMapper):
  37. def __init__(self, data, log_service, db_client, trace_id):
  38. self.data = data
  39. self.log_client = log_service
  40. self.db_client = db_client
  41. self.trace_id = trace_id
  42. # ---------- 下面是若干复合任务的局部实现 ----------
  43. async def _check_kimi_balance_handler(self) -> int:
  44. response = await check_kimi_balance()
  45. await self.log_client.log(
  46. contents={
  47. "trace_id": self.trace_id,
  48. "task": "check_kimi_balance",
  49. "data": response,
  50. }
  51. )
  52. return self.TASK_SUCCESS_STATUS
  53. async def _get_off_videos_task_handler(self) -> int:
  54. sub_task = GetOffVideos(self.db_client, self.log_client, self.trace_id)
  55. return await sub_task.deal()
  56. async def _check_video_audit_status_handler(self) -> int:
  57. sub_task = CheckVideoAuditStatus(self.db_client, self.log_client, self.trace_id)
  58. return await sub_task.deal()
  59. async def _task_processing_monitor_handler(self) -> int:
  60. sub_task = TaskProcessingMonitor(self.db_client)
  61. await sub_task.deal()
  62. return self.TASK_SUCCESS_STATUS
  63. async def _inner_gzh_articles_monitor_handler(self) -> int:
  64. sub_task = InnerGzhArticlesMonitor(self.db_client)
  65. return await sub_task.deal()
  66. async def _title_rewrite_handler(self):
  67. sub_task = TitleRewrite(self.db_client, self.log_client, self.trace_id)
  68. return await sub_task.deal()
  69. async def _update_root_source_id_handler(self) -> int:
  70. sub_task = UpdateRootSourceIdAndUpdateTimeTask(self.db_client, self.log_client)
  71. await sub_task.deal()
  72. return self.TASK_SUCCESS_STATUS
  73. async def _outside_monitor_handler(self) -> int:
  74. collector = OutsideGzhArticlesCollector(self.db_client)
  75. await collector.deal()
  76. monitor = OutsideGzhArticlesMonitor(self.db_client)
  77. return await monitor.deal() # 应返回 SUCCESS / FAILED
  78. async def _recycle_article_data_handler(self) -> int:
  79. date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
  80. recycle = RecycleDailyPublishArticlesTask(
  81. self.db_client, self.log_client, date_str
  82. )
  83. await recycle.deal()
  84. check = CheckDailyPublishArticlesTask(self.db_client, self.log_client, date_str)
  85. await check.deal()
  86. return self.TASK_SUCCESS_STATUS
  87. async def _crawler_toutiao_handler(self) -> int:
  88. sub_task = CrawlerToutiao(self.db_client, self.log_client, self.trace_id)
  89. method = self.data.get("method", "account")
  90. media_type = self.data.get("media_type", "article")
  91. category_list = self.data.get("category_list", [])
  92. match method:
  93. case "account":
  94. await sub_task.crawler_task(media_type=media_type)
  95. case "recommend":
  96. await sub_task.crawl_toutiao_recommend_task(category_list)
  97. case "search":
  98. await sub_task.search_candidate_accounts()
  99. case _:
  100. raise ValueError(f"Unsupported method {method}")
  101. return self.TASK_SUCCESS_STATUS
  102. async def _article_pool_cold_start_handler(self) -> int:
  103. cold_start = ArticlePoolColdStart(
  104. self.db_client, self.log_client, self.trace_id
  105. )
  106. platform = self.data.get("platform", "weixin")
  107. crawler_methods = self.data.get("crawler_methods", [])
  108. category_list = self.data.get("category_list", [])
  109. strategy = self.data.get("strategy", "strategy_v1")
  110. await cold_start.deal(
  111. platform=platform,
  112. crawl_methods=crawler_methods,
  113. category_list=category_list,
  114. strategy=strategy,
  115. )
  116. return self.TASK_SUCCESS_STATUS
  117. async def _candidate_account_quality_score_handler(self) -> int:
  118. task = CandidateAccountQualityScoreRecognizer(
  119. self.db_client, self.log_client, self.trace_id
  120. )
  121. await task.deal()
  122. return self.TASK_SUCCESS_STATUS
  123. async def _article_pool_category_generation_handler(self) -> int:
  124. task = ArticlePoolCategoryGeneration(
  125. self.db_client, self.log_client, self.trace_id
  126. )
  127. limit_num = self.data.get("limit")
  128. await task.deal(limit=limit_num)
  129. return self.TASK_SUCCESS_STATUS
  130. async def _crawler_account_manager_handler(self) -> int:
  131. platform = self.data.get("platform", "weixin")
  132. account_id_list = self.data.get("account_id_list")
  133. match platform:
  134. case "weixin":
  135. task = WeixinAccountManager(
  136. self.db_client, self.log_client, self.trace_id
  137. )
  138. case _:
  139. raise ValueError(f"Unsupported platform {platform}")
  140. await task.deal(platform=platform, account_id_list=account_id_list)
  141. return self.TASK_SUCCESS_STATUS
  142. # 抓取公众号文章
  143. async def _crawler_gzh_article_handler(self) -> int:
  144. account_method = self.data.get("account_method")
  145. crawl_mode = self.data.get("crawl_mode")
  146. strategy = self.data.get("strategy")
  147. match crawl_mode:
  148. case "account":
  149. task = CrawlerGzhAccountArticles(
  150. self.db_client, self.log_client, self.trace_id
  151. )
  152. await task.deal(account_method, strategy)
  153. case "search":
  154. task = CrawlerGzhSearchArticles(
  155. self.db_client, self.log_client, self.trace_id
  156. )
  157. await task.deal(strategy)
  158. case _:
  159. raise ValueError(f"Unsupported crawl mode {crawl_mode}")
  160. return self.TASK_SUCCESS_STATUS
  161. # 回收服务号文章
  162. async def _recycle_fwh_article_handler(self) -> int:
  163. task = RecycleFwhDailyPublishArticlesTask(self.db_client, self.log_client)
  164. await task.deal()
  165. return self.TASK_SUCCESS_STATUS
  166. # 账号品类处理任务
  167. async def _account_category_analysis_handler(self) -> int:
  168. task = AccountCategoryAnalysis(
  169. pool=self.db_client,
  170. log_client=self.log_client,
  171. trace_id=self.trace_id,
  172. data=self.data,
  173. date_string=None,
  174. )
  175. await task.deal()
  176. return self.TASK_SUCCESS_STATUS
  177. # 抓取视频/文章详情分析统计
  178. async def _crawler_article_analysis_handler(self) -> int:
  179. task = CrawlerDetailDeal(pool=self.db_client, trace_id=self.trace_id)
  180. await task.deal(params=self.data)
  181. return self.TASK_SUCCESS_STATUS
  182. # 更新小程序裂变信息
  183. async def _mini_program_detail_handler(self) -> int:
  184. task = RecycleMiniProgramDetailTask(
  185. pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
  186. )
  187. await task.deal(params=self.data)
  188. return self.TASK_SUCCESS_STATUS
  189. # 提取标题特征
  190. async def _extract_title_features_handler(self) -> int:
  191. task = ExtractTitleFeatures(self.db_client, self.log_client, self.trace_id)
  192. await task.deal(data=self.data)
  193. return self.TASK_SUCCESS_STATUS
  194. # 回收外部账号文章
  195. async def _recycle_outside_account_article_handler(self) -> int:
  196. date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
  197. task = RecycleOutsideAccountArticlesTask(
  198. pool=self.db_client, log_client=self.log_client, date_string=date_str
  199. )
  200. await task.deal()
  201. return self.TASK_SUCCESS_STATUS
  202. # 更新外部账号文章的root_source_id和update_time
  203. async def _update_outside_account_article_root_source_id_and_update_time_handler(
  204. self,
  205. ) -> int:
  206. task = UpdateOutsideRootSourceIdAndUpdateTimeTask(
  207. pool=self.db_client, log_client=self.log_client
  208. )
  209. await task.deal()
  210. return self.TASK_SUCCESS_STATUS
  211. # 更新限流账号信息
  212. async def _update_limited_account_info_handler(self) -> int:
  213. task = LimitedAccountAnalysisTask(
  214. pool=self.db_client, log_client=self.log_client
  215. )
  216. await task.deal(date_string=self.data.get("date_string"))
  217. return self.TASK_SUCCESS_STATUS
  218. # 更新账号阅读率均值
  219. async def _update_account_read_rate_avg_handler(self) -> int:
  220. task = AccountPositionReadRateAvg(
  221. pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
  222. )
  223. await task.deal(end_date=self.data.get("end_date"))
  224. return self.TASK_SUCCESS_STATUS
  225. # 更新账号阅读均值
  226. async def _update_account_read_avg_handler(self) -> int:
  227. task = AccountPositionReadAvg(
  228. pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
  229. )
  230. await task.deal(end_date=self.data.get("end_date"))
  231. return self.TASK_SUCCESS_STATUS
  232. # 更新账号打开率均值
  233. async def _update_account_open_rate_avg_handler(self) -> int:
  234. task = AccountPositionOpenRateAvg(
  235. pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
  236. )
  237. await task.deal(date_string=self.data.get("date_string"))
  238. return self.TASK_SUCCESS_STATUS
  239. # 自动关注公众号账号
  240. async def _auto_follow_account_handler(self) -> int:
  241. task = AutoReplyCardsMonitor(pool=self.db_client, log_client=self.log_client)
  242. await task.deal(task_name="follow_gzh_task")
  243. return self.TASK_SUCCESS_STATUS
  244. # 获取自动关注回复
  245. async def _get_follow_result_handler(self) -> int:
  246. task = AutoReplyCardsMonitor(pool=self.db_client, log_client=self.log_client)
  247. await task.deal(task_name="get_auto_reply_task")
  248. return self.TASK_SUCCESS_STATUS
  249. # 解析自动回复结果
  250. async def _extract_reply_result_handler(self) -> int:
  251. task = AutoReplyCardsMonitor(pool=self.db_client, log_client=self.log_client)
  252. await task.deal(task_name="extract_task")
  253. return self.TASK_SUCCESS_STATUS
  254. # 定时获取外部文章
  255. async def _cooperate_accounts_monitor_handler(self) -> int:
  256. task = CooperateAccountsMonitorTask(
  257. pool=self.db_client, log_client=self.log_client
  258. )
  259. await task.deal(task_name="save_articles")
  260. return self.TASK_SUCCESS_STATUS
  261. # 定时更新详情
  262. async def _cooperate_accounts_detail_handler(self) -> int:
  263. task = CooperateAccountsMonitorTask(
  264. pool=self.db_client, log_client=self.log_client
  265. )
  266. await task.deal(task_name="get_detail")
  267. return self.TASK_SUCCESS_STATUS
  268. __all__ = ["TaskHandler"]