new_history_task.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import time
  6. import asyncio
  7. from applications.log import logging
  8. from .history_task import historyContentIdTask
  9. class NewHistoryTask(historyContentIdTask):
  10. """
  11. 新的历史任务
  12. """
  13. async def get_tasks_new(self):
  14. """
  15. 获取任务
  16. :return:
  17. """
  18. select_sql1 = f"""
  19. SELECT
  20. ART.trace_id,
  21. ART.content_id,
  22. ART.flow_pool_level,
  23. ART.gh_id,
  24. ART.process_times,
  25. ART.publish_flag
  26. FROM {self.article_match_video_table} ART
  27. JOIN (
  28. select content_id, count(1) as cnt
  29. from {self.article_crawler_video_table}
  30. where download_status = {self.const.VIDEO_DOWNLOAD_SUCCESS_STATUS}
  31. group by content_id
  32. ) VID on ART.content_id = VID.content_id and VID.cnt >= 3
  33. WHERE ART.content_status = {self.const.TASK_INIT_STATUS} and ART.process_times <= {self.const.TASK_MAX_PROCESS_TIMES}
  34. AND ART.publish_flag = {self.const.DO_NOT_NEED_PUBLISH}
  35. ORDER BY ART.flow_pool_level, ART.request_timestamp
  36. LIMIT {self.history_coroutines};
  37. """
  38. tasks = await self.mysql_client.async_select(sql=select_sql1)
  39. task_obj_list = [
  40. {
  41. "trace_id": item[0],
  42. "content_id": item[1],
  43. "flow_pool_level": item[2],
  44. "gh_id": item[3],
  45. "process_times": item[4],
  46. "publish_flag": item[5]
  47. } for item in tasks
  48. ]
  49. return task_obj_list
  50. async def deal_new(self):
  51. """
  52. 处理publish_flag=2的任务
  53. """
  54. task_list = await self.get_tasks_new()
  55. logging(
  56. code="newHistory1001",
  57. info="History content_task Task Got {} this time".format(len(task_list)),
  58. function="History Contents Task"
  59. )
  60. if task_list:
  61. a = time.time()
  62. tasks = [self.process_task(params) for params in task_list]
  63. await asyncio.gather(*tasks)
  64. b = time.time()
  65. print("{} s 内处理了{}个任务".format(b - a, len(task_list)))
  66. else:
  67. print("暂时未获得历史已存在文章")