123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- # -*- coding: utf-8 -*-
- # @Author: wangkun
- # @Time: 2023/3/2
- import os
- import sys
- import time
- sys.path.append(os.getcwd())
- from common.common import Common
- from common.scheduling_db import MysqlHelper, RedisHelper
- class SchedulingV3:
- # 读取任务表
- @classmethod
- def get_task(cls, log_type, crawler, env):
- get_sql = """ select * from crawler_task_v3; """
- all_task_list = MysqlHelper.get_values(log_type=log_type, crawler=crawler, sql=get_sql, env=env, action='')
- pre_task_list = []
- for task in all_task_list:
- if int(time.time()*1000) >= task["start_time"]:
- pre_task_list.append(task)
- return pre_task_list
- # 更新下次启动时间,调用时机:调度该 task_id 的任务时
- @classmethod
- def update_task(cls, log_type, crawler, task_id, start_time, interval_piaoquan, env):
- if interval_piaoquan > 0:
- new_start_time = start_time + int(interval_piaoquan)*1000
- update_sql = f""" UPDATE crawler_task_v3 SET start_time={new_start_time} WHERE id={task_id} """
- MysqlHelper.update_values(log_type, crawler, update_sql, env)
- Common.logger(log_type, crawler).info(f"更新任务下次启动时间成功:{new_start_time}\n")
- # 资源分配 / 组装
- @classmethod
- def write_redis(cls, log_type, crawler, env):
- pre_task_list = cls.get_task(log_type=log_type, crawler=crawler, env=env)
- if len(pre_task_list) == 0:
- Common.logger(log_type, crawler).info("暂无新任务\n")
- else:
- for pre_task in pre_task_list:
- # machine字段是用来区分海外爬虫和国内爬虫使用的,不涉及任何其他含义
- machine = pre_task.get('machine', 'dev')
- next_time = pre_task['start_time']
- interval_piaoquan = pre_task['interval']
- task_id = pre_task['id']
- if machine == "hk":
- # 写入 redis
- task_key = 'crawler_config_task_queue'
- RedisHelper.redis_push(env, task_key, str(pre_task))
- Common.logger(log_type, crawler).info(f"写入Redis成功:{str(pre_task)}")
- elif machine == "aliyun":
- # 写入 redis
- task_key = 'crawler_config_task_queue'
- RedisHelper.redis_push(env, task_key, str(pre_task))
- Common.logger(log_type, crawler).info(f"写入Redis成功:{str(pre_task)}")
- else:
- # 写入 redis
- task_key = 'crawler_config_task_queue'
- RedisHelper.redis_push(env, task_key, str(pre_task))
- Common.logger(log_type, crawler).info(f"写入Redis成功:{str(pre_task)}")
- if int(time.time()*1000) >= next_time:
- cls.update_task(log_type, crawler, task_id, next_time, interval_piaoquan, env)
- @classmethod
- def get_redis(cls, log_type, crawler, env):
- if env == 'hk':
- task_key = 'crawler_config_task_queue'
- elif env == 'prod':
- task_key = 'crawler_config_task_queue'
- else:
- task_key = 'crawler_config_task_queue'
- redis_data = RedisHelper.redis_pop(env, task_key)
- if redis_data is None or len(redis_data) == 0:
- Common.logger(log_type, crawler).info("Redis为空,程序退出")
- return
- else:
- task = eval(str(redis_data, encoding="utf8"))
- return task
- @classmethod
- def scheduling_task(cls, log_type, crawler, env):
- task = cls.get_redis(log_type, crawler, env)
- if not task:
- Common.logger(log_type, crawler).info("Redis为空,程序退出")
- return
- Common.logger(log_type, crawler).info(f"已获取调度任务:{type(task)}, {task}")
- mode = task['mode']
- source = task['source']
- spider_name = task['spider_name']
- if "gongzhonghao2" in spider_name:
- mode = "author2"
- elif "gongzhonghao3" in spider_name:
- mode = "author3"
- elif "gongzhonghao4" in spider_name:
- mode = "author4"
- elif "gongzhonghao5" in spider_name:
- mode = "author5"
- elif "xiaoniangao_play" in spider_name:
- mode = "play"
- elif "xiaoniangao_hour" in spider_name:
- mode = "hour"
- else:
- mode = mode
- if spider_name == "run_gongzhonghao1_author_scheduling":
- cmd = 'ps -ef | grep "run_gongzhonghao1_author_scheduling" | grep -v "grep"'
- result1 = os.popen(cmd).read()
- if len(result1) != 0:
- return
- if spider_name == "run_gongzhonghao2_author_scheduling":
- cmd = 'ps -ef | grep "run_gongzhonghao2_author_scheduling" | grep -v "grep"'
- result2 = os.popen(cmd).read()
- if len(result2) != 0:
- return
- if spider_name == "run_gongzhonghao3_author_scheduling":
- cmd = 'ps -ef | grep "run_gongzhonghao3_author_scheduling" | grep -v "grep"'
- result3 = os.popen(cmd).read()
- if len(result3) != 0:
- return
- if spider_name == "run_gongzhonghao4_author_scheduling":
- cmd = 'ps -ef | grep "run_gongzhonghao4_author_scheduling" | grep -v "grep"'
- result4 = os.popen(cmd).read()
- if len(result4) != 0:
- return
- if spider_name == "run_gongzhonghao5_author_scheduling":
- cmd = 'ps -ef | grep "run_gongzhonghao5_author_scheduling" | grep -v "grep"'
- result5 = os.popen(cmd).read()
- if len(result5) != 0:
- return
- # 正式环境,调度任务
- Common.logger(log_type, crawler).info(f"开始调度任务")
- task_str = [
- ('task_id', str(task['id'])),
- ('task_name', str(task['task_name'])),
- ('source', str(task['source'])),
- ('start_time', str(task['start_time'])),
- ('interval', str(task['interval'])),
- ('mode', str(task['mode'])),
- ('rule', task['rule']),
- ('spider_name', str(task['spider_name'])),
- ('machine', str(task['machine'])),
- ('status', str(task['status'])),
- ('create_time', str(task['create_time'])),
- ('update_time', str(task['update_time'])),
- ('operator', str(task['operator']))
- ]
- task_str = str(task_str).replace(' ', '').replace('"', "'").replace("\/", "").replace("/", "")
- cmd = f"""sh scheduling/scheduling_v3/scheduling_v3.sh {source}/{source}_main/{spider_name}.py --log_type="{mode}" --crawler="{source}" --task="{task_str}" --env="{env}" {source}/logs/{source}-{mode}-scheduling.log """
- Common.logger(log_type, crawler).info(f"cmd:{cmd}")
- os.system(cmd)
- Common.logger(log_type, crawler).info(f"调度任务结束")
- if __name__ == "__main__":
- # print(Scheduling.get_task("scheduling", "scheduling", "dev"))
- # Scheduling.update_task("scheduling", "scheduling", 8, 1681833600000, 1, "dev")
- SchedulingV3.write_redis("scheduling", "scheduling", "dev")
- # print(SchedulingV3.get_redis("scheduling", "scheduling", "dev"))
- # SchedulingV3.scheduling_task("scheduling", "scheduling", "dev")
- pass
|