123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119 |
- # -*- coding: utf-8 -*-
- # @Author: wangkun
- # @Time: 2023/3/2
- import argparse
- import os
- import sys
- import time
- sys.path.append(os.getcwd())
- from common.common import Common
- from common.db import MysqlHelper, RedisHelper
- class Scheduling:
- # 任务列表
- task_list = []
- # 读取任务表
- @classmethod
- def get_task(cls, log_type, crawler, env, machine):
- get_sql = """ select * from crawler_task_1 """
- all_task_list = MysqlHelper.get_values(log_type=log_type, crawler=crawler, sql=get_sql, env=env, machine=machine)
- pre_task_list = []
- for task in all_task_list:
- task_id = task[0]
- task_name = task[1]
- source = task[2]
- next_time = task[3]
- interval_piaoquan = task[4]
- spider_rule = task[5]
- task_type = task[6]
- spider_link = task[7]
- spider_name = task[8]
- min_publish_time = task[9]
- min_publish_day = task[10]
- media_id = task[11]
- applets_status = task[12]
- app_status = task[13]
- user_tag = task[14]
- user_content_tag = task[15]
- machine = task[16]
- insert_time = task[17]
- update_time = task[18]
- if int(time.time()) >= next_time:
- task_dict = {
- "task_id": task_id,
- "task_name": task_name,
- "source": source,
- "next_time": next_time,
- "interval_piaoquan": interval_piaoquan,
- "spider_rule": spider_rule,
- "task_type": task_type,
- "spider_link": spider_link,
- "spider_name": spider_name,
- "min_publish_time": min_publish_time,
- "min_publish_day": min_publish_day,
- "media_id": media_id,
- "applets_status": applets_status,
- "app_status": app_status,
- "user_tag": user_tag,
- "user_content_tag": user_content_tag,
- "machine": machine,
- "insert_time": insert_time,
- "update_time": update_time,
- }
- pre_task_list.append(task_dict)
- return pre_task_list
- # 更新下次启动时间,调用时机:调度该 task_id 的任务时
- @classmethod
- def update_task(cls, log_type, crawler, task_id, next_time, interval_piaoquan, env, machine):
- if interval_piaoquan > 0:
- new_next_time = next_time + interval_piaoquan
- update_sql = f""" UPDATE crawler_task_1 SET next_time={new_next_time} WHERE task_id={task_id} """
- MysqlHelper.update_values(log_type, crawler, update_sql, env, machine)
- 'sh ./main/main.sh' \
- ' ./xigua/xigua_main/run_xigua_follow.py' \
- ' --log_type="follow"' \
- ' --crawler="xigua"' \
- ' --strategy="定向爬虫策略"' \
- ' --oss_endpoint="inner"' \
- ' --env="prod"' \
- ' --machine="aliyun"' \
- ' xigua/nohup.log'
- # 资源分配 / 组装
- @classmethod
- def write_redis(cls, log_type, crawler, env, machine):
- pre_task_list = cls.get_task(log_type=log_type, crawler=crawler, env=env, machine=machine)
- if len(pre_task_list) == 0:
- Common.logger(log_type, crawler).info("暂无新任务\n")
- else:
- for pre_task in pre_task_list:
- print(type(pre_task))
- print(pre_task)
- if machine == "hk":
- # 写入 redis
- pass
- elif machine == "aliyun":
- # 写入 redis
- pass
- else:
- # 写入 redis
- RedisHelper.redis_push(env, machine,pre_task['task_id'], str(pre_task))
- @classmethod
- def main(cls, log_type, crawler):
- # 当前时间 >= next_time,更新 next_time(调用update_task),然后启动该爬虫
- pre_task_list = cls.get_task(log_type=log_type, crawler=crawler, env=env, machine=machine)
- if len(pre_task_list) == 0:
- Common.logger(log_type, crawler).info("暂无新任务\n")
- else:
- for pre_task in pre_task_list:
- task_list = RedisHelper.redis_pop()
- if __name__ == "__main__":
- Scheduling.write_redis("scheduling", "scheduling", "dev", "local")
|