# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2023/3/2 import os import sys import time sys.path.append(os.getcwd()) from common.common import Common from common.scheduling_db import MysqlHelper, RedisHelper class Scheduling: # 读取任务表 @classmethod def get_task(cls, log_type, crawler, env, machine): get_sql = """ select * from crawler_task """ all_task_list = MysqlHelper.get_values(log_type=log_type, crawler=crawler, sql=get_sql, env=env, machine=machine) pre_task_list = [] for task in all_task_list: if int(time.time()) >= task["next_time"]: pre_task_list.append(task) return pre_task_list # 更新下次启动时间,调用时机:调度该 task_id 的任务时 @classmethod def update_task(cls, log_type, crawler, task_id, next_time, interval_piaoquan, env, machine): if interval_piaoquan > 0: new_next_time = next_time + interval_piaoquan update_sql = f""" UPDATE crawler_task SET next_time={new_next_time} WHERE task_id={task_id} """ MysqlHelper.update_values(log_type, crawler, update_sql, env, machine) # 资源分配 / 组装 @classmethod def write_redis(cls, log_type, crawler, env, machine): pre_task_list = cls.get_task(log_type=log_type, crawler=crawler, env=env, machine=machine) if len(pre_task_list) == 0: Common.logger(log_type, crawler).info("暂无新任务\n") else: for pre_task in pre_task_list: if machine == "hk": # 写入 redis pass elif machine == "aliyun": # 写入 redis pass else: # 写入 redis RedisHelper.redis_push(env, machine,str(pre_task)) @classmethod def get_redis(cls, log_type, crawler, env, machine): redis_data = RedisHelper.redis_pop(env, machine) if redis_data is None: Common.logger(log_type, crawler).info("Redis为空,等待1秒") time.sleep(1) else: task = eval(str(redis_data, encoding="utf8")) return task @classmethod def scheduling_task(cls, log_type, crawler, env, machine): task = cls.get_redis(log_type, crawler, env, machine) Common.logger(log_type, crawler).info(f"已获取调度任务:{task}") task_id = task['task_id'] source = task['source'] next_time = task['next_time'] interval_piaoquan = task['interval_piaoquan'] spider_name = task['spider_name'] if machine == "aliyun": oss_endpoint = "inner" elif machine == "aliyun_hk": oss_endpoint = "hk" else: oss_endpoint = "out" if int(time.time()) >= next_time: cls.update_task(log_type, crawler, task_id, next_time, interval_piaoquan, env, machine) # 正式环境,调度任务 Common.logger(log_type, crawler).info(f"开始调度任务:{task}\n") task_str = [('task_id', str(task_id)), ('task_name', str(task['task_name'])), ('source', str(task['source'])), ('next_time', str(task['next_time'])), ('interval_piaoquan', str(task['interval_piaoquan'])), ('play_cnt', eval(task['spider_rule'])['play_cnt']),('video_width', eval(task['spider_rule'])['video_width']),('video_height', eval(task['spider_rule'])['video_height']),('video_like', eval(task['spider_rule'])['video_like']),('share_cnt', eval(task['spider_rule'])['share_cnt']),('duration_min', eval(task['spider_rule'])['duration']['min']),('duration_max', eval(task['spider_rule'])['duration']['max']),('task_type', task['task_type']),('spider_link', eval(task['spider_link'])),('spider_name', str(task['spider_name'])),('min_publish_time', str(task['min_publish_time'])),('min_publish_day', str(task['min_publish_day'])),('media_id', str(task['media_id'])),('applets_status', str(task['applets_status'])),('app_status', str(task['app_status'])),('user_tag', str(task['user_tag'])),('user_content_tag',str(task['user_content_tag'])),('machine', str(task['machine']))] task_str = str(task_str).replace(' ', '') cmd = f"""sh scheduling/scheduling_main/scheduling.sh {source}/{source}_main/{spider_name}_scheduling.py --log_type="{spider_name}" --crawler="{source}" --task="{str(task_str)}" --oss_endpoint="{oss_endpoint}" --env="{env}" --machine="{machine}" {source}/{source}-nohup.log """ Common.logger(log_type, crawler).info(f"cmd:{cmd}\n") os.system(cmd) if __name__ == "__main__": # print(Scheduling.get_task("scheduling", "scheduling", "dev", "local")) # print(Scheduling.get_redis("scheduling", "scheduling", "dev", "local")) # Scheduling.write_redis("scheduling", "scheduling", "dev", "local") Scheduling.scheduling_task("scheduling", "scheduling", "dev", "local") pass