# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2023/3/2 import argparse import os import sys import time sys.path.append(os.getcwd()) from common.common import Common from common.db import MysqlHelper class Scheduling: # 任务列表 task_list = [] # 读取任务表 @classmethod def get_task(cls, log_type, crawler, env, machine): get_sql = """ select * from crawler_task_1 """ all_task_list = MysqlHelper.get_values(log_type=log_type, crawler=crawler, sql=get_sql, env=env, machine=machine) pre_task_list = [] for task in all_task_list: task_id = task[0] task_name = task[1] source = task[2] next_time = task[3] interval_piaoquan = task[4] spider_rule = task[5] task_type = task[6] spider_link = task[7] spider_name = task[8] min_publish_time = task[9] min_publish_day = task[10] media_id = task[11] applets_status = task[12] app_status = task[13] user_tag = task[14] user_content_tag = task[15] machine = task[16] insert_time = task[17] update_time = task[18] if int(time.time()) >= next_time: task_dict = { "task_id": task_id, "task_name": task_name, "source": source, "next_time": next_time, "interval_piaoquan": interval_piaoquan, "spider_rule": spider_rule, "task_type": task_type, "spider_link": spider_link, "spider_name": spider_name, "min_publish_time": min_publish_time, "min_publish_day": min_publish_day, "media_id": media_id, "applets_status": applets_status, "app_status": app_status, "user_tag": user_tag, "user_content_tag": user_content_tag, "machine": machine, "insert_time": insert_time, "update_time": update_time, } pre_task_list.append(task_dict) return pre_task_list # 更新下次启动时间,调用时机:调度该 task_id 的任务时 @classmethod def update_task(cls, log_type, crawler, task_id, next_time, interval_piaoquan, env, machine): if interval_piaoquan > 0: new_next_time = next_time + interval_piaoquan update_sql = f""" UPDATE crawler_task_1 SET next_time={new_next_time} WHERE task_id={task_id} """ MysqlHelper.update_values(log_type, crawler, update_sql, env, machine) 'sh ./main/main.sh' \ ' ./xigua/xigua_main/run_xigua_follow.py' \ ' --log_type="follow"' \ ' --crawler="xigua"' \ ' --strategy="定向爬虫策略"' \ ' --oss_endpoint="inner"' \ ' --env="prod"' \ ' --machine="aliyun"' \ ' xigua/nohup.log' # 资源分配 / 组装 @classmethod def write_redis(cls, log_type, crawler, env, machine): pre_task_list = cls.get_task(log_type=log_type, crawler=crawler, env=env, machine=machine) if len(pre_task_list) == 0: Common.logger(log_type, crawler).info("暂无新任务\n") else: for pre_task in pre_task_list: if machine == "hk": # 写入 redis pass elif machine == "aliyun": # 写入 redis pass else: # 写入 redis pass @classmethod def main(cls): # 当前时间 >= next_time,更新 next_time(调用update_task),然后启动该爬虫 pass if __name__ == "__main__": Scheduling.write_redis("scheduling", "scheduling", "dev", "local")