crawler_scheduling.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. # -*- coding: utf-8 -*-
  2. # @Author: wangkun
  3. # @Time: 2023/3/2
  4. import os
  5. import sys
  6. import time
  7. sys.path.append(os.getcwd())
  8. from common.common import Common
  9. from common.scheduling_db import MysqlHelper, RedisHelper
  10. class Scheduling:
  11. # 读取任务表
  12. @classmethod
  13. def get_task(cls, log_type, crawler, env):
  14. get_sql = """ select * from crawler_task """
  15. all_task_list = MysqlHelper.get_values(log_type=log_type, crawler=crawler, sql=get_sql, env=env)
  16. pre_task_list = []
  17. for task in all_task_list:
  18. if int(time.time()) >= task["next_time"]:
  19. pre_task_list.append(task)
  20. return pre_task_list
  21. # 更新下次启动时间,调用时机:调度该 task_id 的任务时
  22. @classmethod
  23. def update_task(cls, log_type, crawler, task_id, next_time, interval_piaoquan, env):
  24. if interval_piaoquan > 0:
  25. new_next_time = next_time + interval_piaoquan
  26. update_sql = f""" UPDATE crawler_task SET next_time={new_next_time} WHERE task_id={task_id} """
  27. MysqlHelper.update_values(log_type, crawler, update_sql, env)
  28. # 资源分配 / 组装
  29. @classmethod
  30. def write_redis(cls, log_type, crawler, env):
  31. pre_task_list = cls.get_task(log_type=log_type, crawler=crawler, env=env)
  32. if len(pre_task_list) == 0:
  33. Common.logger(log_type, crawler).info("暂无新任务\n")
  34. else:
  35. for pre_task in pre_task_list:
  36. # machine字段是用来区分海外爬虫和国内爬虫使用的,不涉及任何其他含义
  37. machine = pre_task.get('machine', 'dev')
  38. next_time = pre_task['next_time']
  39. interval_piaoquan = pre_task['interval_piaoquan']
  40. task_id = pre_task['task_id']
  41. if machine == "hk":
  42. # 写入 redis
  43. task_key = 'crawler_config_task_queue:hk'
  44. RedisHelper.redis_push(env, task_key, str(pre_task))
  45. elif machine == "aliyun":
  46. # 写入 redis
  47. task_key = 'crawler_config_task_queue:aliyun'
  48. RedisHelper.redis_push(env, task_key, str(pre_task))
  49. else:
  50. # 写入 redis
  51. task_key = 'crawler_config_task_queue:dev'
  52. RedisHelper.redis_push(env, task_key, str(pre_task))
  53. if int(time.time()) >= next_time:
  54. cls.update_task(log_type, crawler, task_id, next_time, interval_piaoquan, env)
  55. @classmethod
  56. def get_redis(cls, log_type, crawler, env):
  57. if env == 'hk':
  58. task_key = 'crawler_config_task_queue:hk'
  59. elif env == 'prod':
  60. task_key = 'crawler_config_task_queue:aliyun'
  61. else:
  62. task_key = 'crawler_config_task_queue:dev'
  63. redis_data = RedisHelper.redis_pop(env, task_key)
  64. if redis_data is None or len(redis_data) == 0:
  65. # Common.logger(log_type, crawler).info("Redis为空,程序退出")
  66. # time.sleep(1)
  67. return
  68. else:
  69. task = eval(str(redis_data, encoding="utf8"))
  70. return task
  71. @classmethod
  72. def scheduling_task(cls, log_type, crawler, env):
  73. task = cls.get_redis(log_type, crawler, env)
  74. if not task:
  75. Common.logger(log_type, crawler).info("Redis为空,程序退出")
  76. return
  77. Common.logger(log_type, crawler).info(f"task: {task}")
  78. Common.logger(log_type, crawler).info(f"已获取调度任务:{task}")
  79. task_id = task['task_id']
  80. source = task['source']
  81. spider_name = task['spider_name']
  82. if env == "aliyun":
  83. oss_endpoint = "inner"
  84. elif env == "hk":
  85. oss_endpoint = "hk"
  86. else:
  87. oss_endpoint = "out"
  88. # 正式环境,调度任务
  89. Common.logger(log_type, crawler).info(f"开始调度任务:{task}\n")
  90. task_str = [('task_id', str(task_id)), ('task_name', str(task['task_name'])),
  91. ('source', str(task['source'])), ('next_time', str(task['next_time'])),
  92. ('interval_piaoquan', str(task['interval_piaoquan'])),
  93. ('play_cnt', eval(task['spider_rule'])['play_cnt']),
  94. ('video_width', eval(task['spider_rule'])['video_width']),
  95. ('video_height', eval(task['spider_rule'])['video_height']),
  96. ('video_like', eval(task['spider_rule'])['video_like']),
  97. ('share_cnt', eval(task['spider_rule'])['share_cnt']),
  98. ('duration_min', eval(task['spider_rule'])['duration']['min']),
  99. ('duration_max', eval(task['spider_rule'])['duration']['max']),
  100. ('task_type', task['task_type']), ('spider_link', eval(task['spider_link'])),
  101. ('spider_name', str(task['spider_name'])), ('min_publish_time', str(task['min_publish_time'])),
  102. ('min_publish_day', str(task['min_publish_day'])), ('media_id', str(task['media_id'])),
  103. ('applets_status', str(task['applets_status'])), ('app_status', str(task['app_status'])),
  104. ('user_tag', str(task['user_tag'])), ('user_content_tag', str(task['user_content_tag'])),
  105. ('machine', str(task['machine']))]
  106. task_str = str(task_str).replace(' ', '')
  107. cmd = f"""sh scheduling/scheduling_main/scheduling.sh {source}/{source}_main/{spider_name}.py --log_type="{spider_name}" --crawler="{source}" --task="{str(task_str)}" --oss_endpoint="{oss_endpoint}" --env="{env}" {source}/{source}-nohup.log """
  108. Common.logger(log_type, crawler).info(f"cmd:{cmd}\n")
  109. os.system(cmd)
  110. if __name__ == "__main__":
  111. # print(Scheduling.get_task("scheduling", "scheduling", "dev", "local"))
  112. # print(Scheduling.get_redis("scheduling", "scheduling", "dev", "local"))
  113. # Scheduling.write_redis("scheduling", "scheduling", "dev", "local")
  114. Scheduling.scheduling_task("scheduling", "scheduling", "dev")
  115. pass