task_scheduler.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. import asyncio
  2. import time
  3. from datetime import datetime
  4. from applications.api import feishu_robot
  5. from applications.utils import task_schedule_response
  6. from applications.tasks.monitor_tasks import check_kimi_balance
  7. from applications.tasks.monitor_tasks import GetOffVideos
  8. from applications.tasks.monitor_tasks import CheckVideoAuditStatus
  9. from applications.tasks.monitor_tasks import OutsideGzhArticlesMonitor
  10. from applications.tasks.monitor_tasks import OutsideGzhArticlesCollector
  11. class TaskScheduler:
  12. def __init__(self, data, log_service, db_client):
  13. self.data = data
  14. self.log_client = log_service
  15. self.db_client = db_client
  16. self.table = "long_articles_task_manager"
  17. async def whether_task_processing(self, task_name: str) -> bool:
  18. """whether task is processing"""
  19. query = f"""
  20. select start_timestamp from {self.table} where task_name = %s and task_status = %s;
  21. """
  22. response, error = await self.db_client.async_fetch(
  23. query=query, params=(task_name, 1)
  24. )
  25. if not response:
  26. # no task is processing
  27. return False
  28. else:
  29. start_timestamp = response[0]["start_timestamp"]
  30. # todo: every task should has a unique expire timestamp, remember to write that in a task config file
  31. if int(time.time()) - start_timestamp >= 86400:
  32. await feishu_robot.bot(
  33. title=f"{task_name} has been processing for more than one day",
  34. detail={"timestamp": start_timestamp},
  35. env="long_articles_task",
  36. )
  37. return True
  38. async def record_task(self, task_name, date_string):
  39. """record task"""
  40. query = f"""insert into {self.table} (date_string, task_name, start_timestamp) values (%s, %s, %s);"""
  41. await self.db_client.async_save(
  42. query=query, params=(date_string, task_name, int(time.time()))
  43. )
  44. async def lock_task(self, task_name, date_string):
  45. query = f"""update {self.table} set task_status = %s where task_name = %s and date_string = %s and task_status = %s;"""
  46. return await self.db_client.async_save(
  47. query=query, params=(1, task_name, date_string, 0)
  48. )
  49. async def release_task(self, task_name, date_string, final_status):
  50. """
  51. 任务执行完成之后,将任务状态设置为完成状态/失败状态
  52. """
  53. query = f"""
  54. update {self.table}
  55. set task_status = %s, finish_timestamp = %s
  56. where task_name = %s and date_string = %s and task_status = %s;
  57. """
  58. return await self.db_client.async_save(
  59. query=query,
  60. params=(final_status, int(time.time()), task_name, date_string, 1),
  61. )
  62. async def deal(self):
  63. task_name = self.data.get("task_name")
  64. date_string = self.data.get("date_string")
  65. if not task_name:
  66. await self.log_client.log(
  67. contents={
  68. "task": task_name,
  69. "function": "task_scheduler_deal",
  70. "message": "not task name in params",
  71. "status": "fail",
  72. "data": self.data,
  73. }
  74. )
  75. return await task_schedule_response.fail_response(
  76. error_code="4002", error_message="task_name must be input"
  77. )
  78. if not date_string:
  79. date_string = datetime.today().strftime("%Y-%m-%d")
  80. # prepare for task
  81. if await self.whether_task_processing(task_name):
  82. return await task_schedule_response.fail_response(
  83. error_code="5001", error_message="task is processing"
  84. )
  85. await self.record_task(task_name=task_name, date_string=date_string)
  86. await self.lock_task(task_name, date_string)
  87. match task_name:
  88. case "check_kimi_balance":
  89. response = await check_kimi_balance()
  90. await self.log_client.log(
  91. contents={
  92. "task": task_name,
  93. "function": "task_scheduler_deal",
  94. "message": "check_kimi_balance task execute successfully",
  95. "status": "success",
  96. "data": response,
  97. }
  98. )
  99. await self.release_task(task_name=task_name, date_string=date_string, final_status=response['code'])
  100. return await task_schedule_response.success_response(
  101. task_name=task_name, data=response
  102. )
  103. case "get_off_videos":
  104. async def background_get_off_videos():
  105. sub_task = GetOffVideos(self.db_client, self.log_client)
  106. await sub_task.get_off_job()
  107. task_status = await sub_task.check()
  108. await self.release_task(
  109. task_name=task_name,
  110. date_string=date_string,
  111. final_status=task_status,
  112. )
  113. asyncio.create_task(background_get_off_videos())
  114. return await task_schedule_response.success_response(
  115. task_name=task_name,
  116. data={"code": 0, "message": "get off_videos started background"},
  117. )
  118. case "check_publish_video_audit_status":
  119. async def background_check_publish_video_audit_status():
  120. sub_task = CheckVideoAuditStatus(self.db_client, self.log_client)
  121. print("start processing task status: ")
  122. task_status = await sub_task.deal()
  123. await self.release_task(
  124. task_name=task_name,
  125. date_string=date_string,
  126. final_status=task_status,
  127. )
  128. print("finish task status: ", task_status)
  129. asyncio.create_task(background_check_publish_video_audit_status())
  130. return await task_schedule_response.success_response(
  131. task_name=task_name,
  132. data={
  133. "code": 0,
  134. "message": "check publish video audit status started",
  135. },
  136. )
  137. case "outside_article_monitor":
  138. async def background_outside_article_monitor():
  139. collect_task = OutsideGzhArticlesCollector(self.db_client)
  140. await collect_task.deal()
  141. monitor_task = OutsideGzhArticlesMonitor(self.db_client)
  142. final_status = await monitor_task.deal()
  143. await self.release_task(task_name, date_string, final_status=final_status)
  144. asyncio.create_task(background_outside_article_monitor())
  145. return await task_schedule_response.success_response(
  146. task_name=task_name,
  147. data={"code": 0, "message": "outside_article_monitor started background"}
  148. )
  149. case _:
  150. await self.log_client.log(
  151. contents={
  152. "task": task_name,
  153. "function": "task_scheduler_deal",
  154. "message": "wrong task input",
  155. "status": "success",
  156. "data": self.data,
  157. }
  158. )
  159. await self.release_task(task_name, date_string, 99)
  160. return await task_schedule_response.fail_response(
  161. error_code="4001", error_message="wrong task name input"
  162. )