crawler_scheduling.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. # -*- coding: utf-8 -*-
  2. # @Author: wangkun
  3. # @Time: 2023/3/2
  4. import argparse
  5. import os
  6. import sys
  7. import time
  8. sys.path.append(os.getcwd())
  9. from common.common import Common
  10. from common.db import MysqlHelper, RedisHelper
  11. class Scheduling:
  12. # 任务列表
  13. task_list = []
  14. # 读取任务表
  15. @classmethod
  16. def get_task(cls, log_type, crawler, env, machine):
  17. get_sql = """ select * from crawler_task_1 """
  18. all_task_list = MysqlHelper.get_values(log_type=log_type, crawler=crawler, sql=get_sql, env=env, machine=machine)
  19. pre_task_list = []
  20. for task in all_task_list:
  21. task_id = task[0]
  22. task_name = task[1]
  23. source = task[2]
  24. next_time = task[3]
  25. interval_piaoquan = task[4]
  26. spider_rule = task[5]
  27. task_type = task[6]
  28. spider_link = task[7]
  29. spider_name = task[8]
  30. min_publish_time = task[9]
  31. min_publish_day = task[10]
  32. media_id = task[11]
  33. applets_status = task[12]
  34. app_status = task[13]
  35. user_tag = task[14]
  36. user_content_tag = task[15]
  37. machine = task[16]
  38. insert_time = task[17]
  39. update_time = task[18]
  40. if int(time.time()) >= next_time:
  41. task_dict = {
  42. "task_id": task_id,
  43. "task_name": task_name,
  44. "source": source,
  45. "next_time": next_time,
  46. "interval_piaoquan": interval_piaoquan,
  47. "spider_rule": spider_rule,
  48. "task_type": task_type,
  49. "spider_link": spider_link,
  50. "spider_name": spider_name,
  51. "min_publish_time": min_publish_time,
  52. "min_publish_day": min_publish_day,
  53. "media_id": media_id,
  54. "applets_status": applets_status,
  55. "app_status": app_status,
  56. "user_tag": user_tag,
  57. "user_content_tag": user_content_tag,
  58. "machine": machine,
  59. "insert_time": insert_time,
  60. "update_time": update_time,
  61. }
  62. pre_task_list.append(task_dict)
  63. return pre_task_list
  64. # 更新下次启动时间,调用时机:调度该 task_id 的任务时
  65. @classmethod
  66. def update_task(cls, log_type, crawler, task_id, next_time, interval_piaoquan, env, machine):
  67. if interval_piaoquan > 0:
  68. new_next_time = next_time + interval_piaoquan
  69. update_sql = f""" UPDATE crawler_task_1 SET next_time={new_next_time} WHERE task_id={task_id} """
  70. MysqlHelper.update_values(log_type, crawler, update_sql, env, machine)
  71. 'sh ./main/main.sh' \
  72. ' ./xigua/xigua_main/run_xigua_follow.py' \
  73. ' --log_type="follow"' \
  74. ' --crawler="xigua"' \
  75. ' --strategy="定向爬虫策略"' \
  76. ' --oss_endpoint="inner"' \
  77. ' --env="prod"' \
  78. ' --machine="aliyun"' \
  79. ' xigua/nohup.log'
  80. # 资源分配 / 组装
  81. @classmethod
  82. def write_redis(cls, log_type, crawler, env, machine):
  83. pre_task_list = cls.get_task(log_type=log_type, crawler=crawler, env=env, machine=machine)
  84. if len(pre_task_list) == 0:
  85. Common.logger(log_type, crawler).info("暂无新任务\n")
  86. else:
  87. for pre_task in pre_task_list:
  88. print(type(pre_task))
  89. print(pre_task)
  90. if machine == "hk":
  91. # 写入 redis
  92. pass
  93. elif machine == "aliyun":
  94. # 写入 redis
  95. pass
  96. else:
  97. # 写入 redis
  98. RedisHelper.redis_push(env, machine,pre_task['task_id'], str(pre_task))
  99. @classmethod
  100. def main(cls, log_type, crawler):
  101. # 当前时间 >= next_time,更新 next_time(调用update_task),然后启动该爬虫
  102. pre_task_list = cls.get_task(log_type=log_type, crawler=crawler, env=env, machine=machine)
  103. if len(pre_task_list) == 0:
  104. Common.logger(log_type, crawler).info("暂无新任务\n")
  105. else:
  106. for pre_task in pre_task_list:
  107. task_list = RedisHelper.redis_pop()
  108. if __name__ == "__main__":
  109. Scheduling.write_redis("scheduling", "scheduling", "dev", "local")