crawler_scheduling_v3.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. # -*- coding: utf-8 -*-
  2. # @Author: wangkun
  3. # @Time: 2023/3/2
  4. import os
  5. import sys
  6. import time
  7. sys.path.append(os.getcwd())
  8. from common.common import Common
  9. from common.scheduling_db import MysqlHelper, RedisHelper
  10. class SchedulingV3:
  11. # 读取任务表
  12. @classmethod
  13. def get_task(cls, log_type, crawler, env):
  14. get_sql = """ select * from crawler_task_v3; """
  15. all_task_list = MysqlHelper.get_values(log_type=log_type, crawler=crawler, sql=get_sql, env=env, action='')
  16. pre_task_list = []
  17. for task in all_task_list:
  18. if int(time.time()*1000) >= task["start_time"]:
  19. pre_task_list.append(task)
  20. return pre_task_list
  21. # 更新下次启动时间,调用时机:调度该 task_id 的任务时
  22. @classmethod
  23. def update_task(cls, log_type, crawler, task_id, start_time, interval_piaoquan, env):
  24. if interval_piaoquan > 0:
  25. new_start_time = start_time + int(interval_piaoquan)*1000
  26. update_sql = f""" UPDATE crawler_task_v3 SET start_time={new_start_time} WHERE id={task_id} """
  27. MysqlHelper.update_values(log_type, crawler, update_sql, env)
  28. Common.logger(log_type, crawler).info(f"更新任务下次启动时间成功:{new_start_time}\n")
  29. # 资源分配 / 组装
  30. @classmethod
  31. def write_redis(cls, log_type, crawler, env):
  32. pre_task_list = cls.get_task(log_type=log_type, crawler=crawler, env=env)
  33. if len(pre_task_list) == 0:
  34. Common.logger(log_type, crawler).info("暂无新任务\n")
  35. else:
  36. for pre_task in pre_task_list:
  37. # machine字段是用来区分海外爬虫和国内爬虫使用的,不涉及任何其他含义
  38. machine = pre_task.get('machine', 'dev')
  39. next_time = pre_task['start_time']
  40. interval_piaoquan = pre_task['interval']
  41. task_id = pre_task['id']
  42. if machine == "hk":
  43. # 写入 redis
  44. task_key = 'crawler_config_task_queue'
  45. RedisHelper.redis_push(env, task_key, str(pre_task))
  46. Common.logger(log_type, crawler).info(f"写入Redis成功:{str(pre_task)}")
  47. elif machine == "aliyun":
  48. # 写入 redis
  49. task_key = 'crawler_config_task_queue'
  50. RedisHelper.redis_push(env, task_key, str(pre_task))
  51. Common.logger(log_type, crawler).info(f"写入Redis成功:{str(pre_task)}")
  52. else:
  53. # 写入 redis
  54. task_key = 'crawler_config_task_queue'
  55. RedisHelper.redis_push(env, task_key, str(pre_task))
  56. Common.logger(log_type, crawler).info(f"写入Redis成功:{str(pre_task)}")
  57. if int(time.time()*1000) >= next_time:
  58. cls.update_task(log_type, crawler, task_id, next_time, interval_piaoquan, env)
  59. @classmethod
  60. def get_redis(cls, log_type, crawler, env):
  61. if env == 'hk':
  62. task_key = 'crawler_config_task_queue'
  63. elif env == 'prod':
  64. task_key = 'crawler_config_task_queue'
  65. else:
  66. task_key = 'crawler_config_task_queue'
  67. redis_data = RedisHelper.redis_pop(env, task_key)
  68. if redis_data is None or len(redis_data) == 0:
  69. Common.logger(log_type, crawler).info("Redis为空,程序退出")
  70. return
  71. else:
  72. task = eval(str(redis_data, encoding="utf8"))
  73. return task
  74. @classmethod
  75. def scheduling_task(cls, log_type, crawler, env):
  76. task = cls.get_redis(log_type, crawler, env)
  77. if not task:
  78. Common.logger(log_type, crawler).info("Redis为空,程序退出")
  79. return
  80. Common.logger(log_type, crawler).info(f"已获取调度任务:{type(task)}, {task}")
  81. mode = task['mode']
  82. source = task['source']
  83. spider_name = task['spider_name']
  84. if env == "aliyun":
  85. oss_endpoint = "inner"
  86. elif env == "hk":
  87. oss_endpoint = "hk"
  88. else:
  89. oss_endpoint = "out"
  90. # 正式环境,调度任务
  91. Common.logger(log_type, crawler).info(f"开始调度任务")
  92. task_str = [
  93. ('task_id', str(task['id'])),
  94. ('task_name', str(task['task_name'])),
  95. ('source', str(task['source'])),
  96. ('start_time', str(task['start_time'])),
  97. ('interval', str(task['interval'])),
  98. ('mode', str(task['mode'])),
  99. ('rule', task['rule']),
  100. ('spider_name', str(task['spider_name'])),
  101. ('machine', str(task['machine'])),
  102. ('status', str(task['status'])),
  103. ('create_time', str(task['create_time'])),
  104. ('update_time', str(task['update_time'])),
  105. ('operator', str(task['operator']))
  106. ]
  107. task_str = str(task_str).replace(' ', '').replace('"', "'").replace("\/", "").replace("/", "")
  108. cmd = f"""sh scheduling/scheduling_v3/scheduling_v3.sh {source}/{source}_main/{spider_name}.py --log_type="{mode}" --crawler="{source}" --task="{task_str}" --oss_endpoint="{oss_endpoint}" --env="{env}" {source}/logs/{source}-{mode}-scheduling.log """
  109. Common.logger(log_type, crawler).info(f"cmd:{cmd}")
  110. os.system(cmd)
  111. Common.logger(log_type, crawler).info(f"调度任务结束")
  112. if __name__ == "__main__":
  113. # print(Scheduling.get_task("scheduling", "scheduling", "dev"))
  114. # Scheduling.update_task("scheduling", "scheduling", 8, 1681833600000, 1, "dev")
  115. SchedulingV3.write_redis("scheduling", "scheduling", "dev")
  116. # print(SchedulingV3.get_redis("scheduling", "scheduling", "dev"))
  117. # SchedulingV3.scheduling_task("scheduling", "scheduling", "dev")
  118. pass