# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2023/3/2 import os import sys import time sys.path.append(os.getcwd()) from common.common import Common from common.scheduling_db import MysqlHelper, RedisHelper class SchedulingV3: # 读取任务表 @classmethod def get_task(cls, log_type, crawler, env): get_sql = """ select * from crawler_task_v3; """ all_task_list = MysqlHelper.get_values(log_type=log_type, crawler=crawler, sql=get_sql, env=env, action='') pre_task_list = [] for task in all_task_list: if int(time.time()*1000) >= task["start_time"]: pre_task_list.append(task) return pre_task_list # 更新下次启动时间,调用时机:调度该 task_id 的任务时 @classmethod def update_task(cls, log_type, crawler, task_id, start_time, interval_piaoquan, env): if interval_piaoquan > 0: new_start_time = start_time + int(interval_piaoquan)*60*1000 update_sql = f""" UPDATE crawler_task_v3 SET start_time={new_start_time} WHERE id={task_id} """ MysqlHelper.update_values(log_type, crawler, update_sql, env) Common.logger(log_type, crawler).info(f"更新任务下次启动时间成功:{new_start_time}\n") # 资源分配 / 组装 @classmethod def write_redis(cls, log_type, crawler, env): pre_task_list = cls.get_task(log_type=log_type, crawler=crawler, env=env) if len(pre_task_list) == 0: Common.logger(log_type, crawler).info("暂无新任务\n") else: for pre_task in pre_task_list: # machine字段是用来区分海外爬虫和国内爬虫使用的,不涉及任何其他含义 machine = pre_task.get('machine', 'dev') next_time = pre_task['start_time'] interval_piaoquan = pre_task['interval'] task_id = pre_task['id'] if machine == "hk": # 写入 redis task_key = 'crawler_config_task_queue:hk' RedisHelper.redis_push(env, task_key, str(pre_task)) Common.logger(log_type, crawler).info(f"写入Redis成功:{str(pre_task)}") elif machine == "aliyun": # 写入 redis task_key = 'crawler_config_task_queue:aliyun' RedisHelper.redis_push(env, task_key, str(pre_task)) Common.logger(log_type, crawler).info(f"写入Redis成功:{str(pre_task)}") else: # 写入 redis task_key = 'crawler_config_task_queue:dev' RedisHelper.redis_push(env, task_key, str(pre_task)) Common.logger(log_type, crawler).info(f"写入Redis成功:{str(pre_task)}") if int(time.time()*1000) >= next_time: cls.update_task(log_type, crawler, task_id, next_time, interval_piaoquan, env) @classmethod def get_redis(cls, log_type, crawler, env): if env == 'hk': task_key = 'crawler_config_task_queue:hk' elif env == 'prod': task_key = 'crawler_config_task_queue:aliyun' else: task_key = 'crawler_config_task_queue:dev' redis_data = RedisHelper.redis_pop(env, task_key) if redis_data is None or len(redis_data) == 0: Common.logger(log_type, crawler).info("Redis为空,程序退出") return else: task = eval(str(redis_data, encoding="utf8")) return task @classmethod def scheduling_task(cls, log_type, crawler, env): task = cls.get_redis(log_type, crawler, env) if not task: Common.logger(log_type, crawler).info("Redis为空,程序退出") return Common.logger(log_type, crawler).info(f"已获取调度任务:{type(task)}, {task}") mode = task['mode'] source = task['source'] spider_name = f"run_{source}_{mode}_scheduling" if env == "aliyun": oss_endpoint = "inner" elif env == "hk": oss_endpoint = "hk" else: oss_endpoint = "out" # 正式环境,调度任务 Common.logger(log_type, crawler).info(f"开始调度任务") # task_str = [ # ('task_id', str(task['id'])), # ('task_name', str(task['task_name'])), # ('source', str(task['source'])), # ('start_time', str(task['start_time'])), # ('interval', str(task['interval'])), # ('mode', str(task['mode'])), # ('duration_min', eval(task['rule'])['duration']['min']), # ('duration_max', eval(task['rule'])['duration']['max']), # ('play_cnt_min', eval(task['rule'])['playCnt']['min']), # ('play_cnt_max', eval(task['rule'])['playCnt']['max']), # ('publish_day_min', eval(task['rule'])['period']['min']), # ('publish_day_max', eval(task['rule'])['period']['max']), # ('fans_min', eval(task['rule'])['fans']['min']), # ('fans_max', eval(task['rule'])['fans']['max']), # ('videos_min', eval(task['rule'])['videos']['min']), # ('videos_max', eval(task['rule'])['videos']['max']), # ('video_like_min', eval(task['rule'])['like']['min']), # ('video_like_max', eval(task['rule'])['like']['max']), # ('video_width_min', eval(task['rule'])['videoWidth']['min']), # ('video_width_max', eval(task['rule'])['videoWidth']['max']), # ('video_height_min', eval(task['rule'])['videoHeight']['min']), # ('video_height_max', eval(task['rule'])['videoHeight']['max']), # ('spider_name', str(task['spider_name'])), # ('machine', str(task['machine'])), # ('status', str(task['status'])), # ('create_time', str(task['create_time'])), # ('update_time', str(task['update_time'])), # ('operator', str(task['operator'])) # ] task_str = [ ('task_id', str(task['id'])), ('task_name', str(task['task_name'])), ('source', str(task['source'])), ('start_time', str(task['start_time'])), ('interval', str(task['interval'])), ('mode', str(task['mode'])), ('rule', task['rule']), ('spider_name', str(task['spider_name'])), ('machine', str(task['machine'])), ('status', str(task['status'])), ('create_time', str(task['create_time'])), ('update_time', str(task['update_time'])), ('operator', str(task['operator'])) ] task_str = str(task_str).replace(' ', '').replace('"', "'").replace("\/", "").replace("/", "") cmd = f"""sh scheduling/scheduling_v3/scheduling_v3.sh {source}/{source}_main/{spider_name}.py --log_type="{mode}" --crawler="{source}" --task="{task_str}" --oss_endpoint="{oss_endpoint}" --env="{env}" {source}/logs/{source}-{mode}-scheduling.log """ Common.logger(log_type, crawler).info(f"cmd:{cmd}") os.system(cmd) Common.logger(log_type, crawler).info(f"调度任务结束") if __name__ == "__main__": # print(Scheduling.get_task("scheduling", "scheduling", "dev")) # Scheduling.update_task("scheduling", "scheduling", 8, 1681833600000, 1, "dev") SchedulingV3.write_redis("scheduling", "scheduling", "dev") # print(SchedulingV3.get_redis("scheduling", "scheduling", "dev")) # SchedulingV3.scheduling_task("scheduling", "scheduling", "dev") pass