crawler_scheduling.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. # -*- coding: utf-8 -*-
  2. # @Author: wangkun
  3. # @Time: 2023/3/2
  4. import argparse
  5. import os
  6. import sys
  7. import time
  8. sys.path.append(os.getcwd())
  9. from common.common import Common
  10. from common.db import MysqlHelper
  11. class Scheduling:
  12. # 任务列表
  13. task_list = []
  14. # 读取 / 更新任务表
  15. @classmethod
  16. def get_task(cls, log_type, crawler, env, machine):
  17. get_sql = """ select * from crawler_task_1 """
  18. all_task_list = MysqlHelper.get_values(log_type=log_type, crawler=crawler, sql=get_sql, env=env, machine=machine)
  19. pre_task_list = []
  20. for task in all_task_list:
  21. task_id = task[0]
  22. task_name = task[1]
  23. source = task[2]
  24. next_time = task[3]
  25. interval_piaoquan = task[4]
  26. spider_rule = task[5]
  27. task_type = task[6]
  28. spider_link = task[7]
  29. spider_name = task[8]
  30. min_publish_time = task[9]
  31. min_publish_day = task[10]
  32. media_id = task[11]
  33. applets_status = task[12]
  34. app_status = task[13]
  35. user_tag = task[14]
  36. user_content_tag = task[15]
  37. machine = task[16]
  38. insert_time = task[17]
  39. update_time = task[18]
  40. if next_time >= int(time.time()):
  41. task_dict = {
  42. "task_id": task_id,
  43. "task_name": task_name,
  44. "source": source,
  45. "next_time": next_time,
  46. "interval_piaoquan": interval_piaoquan,
  47. "spider_rule": spider_rule,
  48. "task_type": task_type,
  49. "spider_link": spider_link,
  50. "spider_name": spider_name,
  51. "min_publish_time": min_publish_time,
  52. "min_publish_day": min_publish_day,
  53. "media_id": media_id,
  54. "applets_status": applets_status,
  55. "app_status": app_status,
  56. "user_tag": user_tag,
  57. "user_content_tag": user_content_tag,
  58. "machine": machine,
  59. "insert_time": insert_time,
  60. "update_time": update_time,
  61. }
  62. pre_task_list.append(task_dict)
  63. if interval_piaoquan > 0:
  64. new_next_time = next_time + interval_piaoquan
  65. update_sql = f""" UPDATE crawler_task_1 SET next_time={new_next_time} WHERE task_id={task_id} """
  66. MysqlHelper.update_values(log_type, crawler, update_sql, env, machine)
  67. return pre_task_list
  68. 'sh ./main/main.sh' \
  69. ' ./xigua/xigua_main/run_xigua_follow.py' \
  70. ' --log_type="follow"' \
  71. ' --crawler="xigua"' \
  72. ' --strategy="定向爬虫策略"' \
  73. ' --oss_endpoint="inner"' \
  74. ' --env="prod"' \
  75. ' --machine="aliyun"' \
  76. ' xigua/nohup.log'
  77. # 资源分配 / 组装 / 调度任务
  78. @classmethod
  79. def main(cls, log_type, crawler, env, machine):
  80. pre_task_list = cls.get_task(log_type=log_type, crawler=crawler, env=env, machine=machine)
  81. if len(pre_task_list) == 0:
  82. Common.logger(log_type, crawler).info("暂无新任务\n")
  83. else:
  84. for i in range(len(pre_task_list)):
  85. task_id = pre_task_list[i]["task_id"]
  86. task_name = pre_task_list[i]["task_name"]
  87. next_time = pre_task_list[i]["next_time"]
  88. interval_piaoquan = pre_task_list[i]["interval_piaoquan"]
  89. spider_rule = pre_task_list[i]["spider_rule"]
  90. if machine == "hk":
  91. # 写入 redis
  92. pass
  93. elif machine == "aliyun":
  94. # 写入 redis
  95. pass
  96. else:
  97. # 写入 redis
  98. pass
  99. if __name__ == "__main__":
  100. Scheduling.main("scheduling", "scheduling", "dev", "local")