|
@@ -4,17 +4,18 @@
|
|
import os
|
|
import os
|
|
import sys
|
|
import sys
|
|
import time
|
|
import time
|
|
|
|
+
|
|
sys.path.append(os.getcwd())
|
|
sys.path.append(os.getcwd())
|
|
from common.common import Common
|
|
from common.common import Common
|
|
-from common.scheduling_db import MysqlHelper, RedisHelper
|
|
|
|
|
|
+from common.scheduling_db import MysqlHelper, RedisHelper
|
|
|
|
|
|
|
|
|
|
class Scheduling:
|
|
class Scheduling:
|
|
# 读取任务表
|
|
# 读取任务表
|
|
@classmethod
|
|
@classmethod
|
|
- def get_task(cls, log_type, crawler, env, machine):
|
|
|
|
|
|
+ def get_task(cls, log_type, crawler, env):
|
|
get_sql = """ select * from crawler_task """
|
|
get_sql = """ select * from crawler_task """
|
|
- all_task_list = MysqlHelper.get_values(log_type=log_type, crawler=crawler, sql=get_sql, env=env, machine=machine)
|
|
|
|
|
|
+ all_task_list = MysqlHelper.get_values(log_type=log_type, crawler=crawler, sql=get_sql, env=env)
|
|
pre_task_list = []
|
|
pre_task_list = []
|
|
for task in all_task_list:
|
|
for task in all_task_list:
|
|
if int(time.time()) >= task["next_time"]:
|
|
if int(time.time()) >= task["next_time"]:
|
|
@@ -23,34 +24,39 @@ class Scheduling:
|
|
|
|
|
|
# 更新下次启动时间,调用时机:调度该 task_id 的任务时
|
|
# 更新下次启动时间,调用时机:调度该 task_id 的任务时
|
|
@classmethod
|
|
@classmethod
|
|
- def update_task(cls, log_type, crawler, task_id, next_time, interval_piaoquan, env, machine):
|
|
|
|
|
|
+ def update_task(cls, log_type, crawler, task_id, next_time, interval_piaoquan, env):
|
|
if interval_piaoquan > 0:
|
|
if interval_piaoquan > 0:
|
|
new_next_time = next_time + interval_piaoquan
|
|
new_next_time = next_time + interval_piaoquan
|
|
update_sql = f""" UPDATE crawler_task SET next_time={new_next_time} WHERE task_id={task_id} """
|
|
update_sql = f""" UPDATE crawler_task SET next_time={new_next_time} WHERE task_id={task_id} """
|
|
- MysqlHelper.update_values(log_type, crawler, update_sql, env, machine)
|
|
|
|
|
|
+ MysqlHelper.update_values(log_type, crawler, update_sql, env)
|
|
|
|
|
|
# 资源分配 / 组装
|
|
# 资源分配 / 组装
|
|
@classmethod
|
|
@classmethod
|
|
- def write_redis(cls, log_type, crawler, env, machine):
|
|
|
|
- pre_task_list = cls.get_task(log_type=log_type, crawler=crawler, env=env, machine=machine)
|
|
|
|
|
|
+ def write_redis(cls, log_type, crawler, env):
|
|
|
|
+ pre_task_list = cls.get_task(log_type=log_type, crawler=crawler, env=env)
|
|
if len(pre_task_list) == 0:
|
|
if len(pre_task_list) == 0:
|
|
Common.logger(log_type, crawler).info("暂无新任务\n")
|
|
Common.logger(log_type, crawler).info("暂无新任务\n")
|
|
else:
|
|
else:
|
|
for pre_task in pre_task_list:
|
|
for pre_task in pre_task_list:
|
|
|
|
+ # machine字段是用来区分海外爬虫和国内爬虫使用的,不涉及任何其他含义
|
|
|
|
+ machine = pre_task.get('machine', 'dev')
|
|
if machine == "hk":
|
|
if machine == "hk":
|
|
# 写入 redis
|
|
# 写入 redis
|
|
- RedisHelper.redis_push(env, machine,str(pre_task))
|
|
|
|
- elif machine == "aliyun":
|
|
|
|
|
|
+ task_key = 'crawler_config_task_queue:hk'
|
|
|
|
+ RedisHelper.redis_push(env, task_key, str(pre_task))
|
|
|
|
+ elif machine == "prod":
|
|
# 写入 redis
|
|
# 写入 redis
|
|
- RedisHelper.redis_push(env, machine,str(pre_task))
|
|
|
|
|
|
+ task_key = 'crawler_config_task_queue:aliyun'
|
|
|
|
+ RedisHelper.redis_push(env, task_key, str(pre_task))
|
|
else:
|
|
else:
|
|
# 写入 redis
|
|
# 写入 redis
|
|
- RedisHelper.redis_push(env, machine,str(pre_task))
|
|
|
|
|
|
+ task_key = 'crawler_config_task_queue:dev'
|
|
|
|
+ RedisHelper.redis_push(env, task_key, str(pre_task))
|
|
|
|
|
|
@classmethod
|
|
@classmethod
|
|
- def get_redis(cls, log_type, crawler, env, machine):
|
|
|
|
|
|
+ def get_redis(cls, log_type, crawler, env):
|
|
while True:
|
|
while True:
|
|
- redis_data = RedisHelper.redis_pop(env, machine)
|
|
|
|
|
|
+ redis_data = RedisHelper.redis_pop(env)
|
|
if redis_data is None or len(redis_data) == 0:
|
|
if redis_data is None or len(redis_data) == 0:
|
|
Common.logger(log_type, crawler).info("Redis为空,等待1秒")
|
|
Common.logger(log_type, crawler).info("Redis为空,等待1秒")
|
|
time.sleep(1)
|
|
time.sleep(1)
|
|
@@ -59,8 +65,8 @@ class Scheduling:
|
|
return task
|
|
return task
|
|
|
|
|
|
@classmethod
|
|
@classmethod
|
|
- def scheduling_task(cls, log_type, crawler, env, machine):
|
|
|
|
- task = cls.get_redis(log_type, crawler, env, machine)
|
|
|
|
|
|
+ def scheduling_task(cls, log_type, crawler, env):
|
|
|
|
+ task = cls.get_redis(log_type, crawler, env)
|
|
Common.logger(log_type, crawler).info(f"task: {task}")
|
|
Common.logger(log_type, crawler).info(f"task: {task}")
|
|
Common.logger(log_type, crawler).info(f"已获取调度任务:{task}")
|
|
Common.logger(log_type, crawler).info(f"已获取调度任务:{task}")
|
|
task_id = task['task_id']
|
|
task_id = task['task_id']
|
|
@@ -68,30 +74,43 @@ class Scheduling:
|
|
next_time = task['next_time']
|
|
next_time = task['next_time']
|
|
interval_piaoquan = task['interval_piaoquan']
|
|
interval_piaoquan = task['interval_piaoquan']
|
|
spider_name = task['spider_name']
|
|
spider_name = task['spider_name']
|
|
- if machine == "aliyun":
|
|
|
|
|
|
+ if env == "aliyun":
|
|
oss_endpoint = "inner"
|
|
oss_endpoint = "inner"
|
|
- elif machine == "aliyun_hk":
|
|
|
|
|
|
+ elif env == "hk":
|
|
oss_endpoint = "hk"
|
|
oss_endpoint = "hk"
|
|
else:
|
|
else:
|
|
oss_endpoint = "out"
|
|
oss_endpoint = "out"
|
|
|
|
|
|
if int(time.time()) >= next_time:
|
|
if int(time.time()) >= next_time:
|
|
- cls.update_task(log_type, crawler, task_id, next_time, interval_piaoquan, env, machine)
|
|
|
|
|
|
+ cls.update_task(log_type, crawler, task_id, next_time, interval_piaoquan, env)
|
|
# 正式环境,调度任务
|
|
# 正式环境,调度任务
|
|
Common.logger(log_type, crawler).info(f"开始调度任务:{task}\n")
|
|
Common.logger(log_type, crawler).info(f"开始调度任务:{task}\n")
|
|
- task_str = [('task_id', str(task_id)), ('task_name', str(task['task_name'])), ('source', str(task['source'])), ('next_time', str(task['next_time'])), ('interval_piaoquan', str(task['interval_piaoquan'])), ('play_cnt', eval(task['spider_rule'])['play_cnt']),('video_width', eval(task['spider_rule'])['video_width']),('video_height', eval(task['spider_rule'])['video_height']),('video_like', eval(task['spider_rule'])['video_like']),('share_cnt', eval(task['spider_rule'])['share_cnt']),('duration_min', eval(task['spider_rule'])['duration']['min']),('duration_max', eval(task['spider_rule'])['duration']['max']),('task_type', task['task_type']),('spider_link', eval(task['spider_link'])),('spider_name', str(task['spider_name'])),('min_publish_time', str(task['min_publish_time'])),('min_publish_day', str(task['min_publish_day'])),('media_id', str(task['media_id'])),('applets_status', str(task['applets_status'])),('app_status', str(task['app_status'])),('user_tag', str(task['user_tag'])),('user_content_tag',str(task['user_content_tag'])),('machine', str(task['machine']))]
|
|
|
|
|
|
+ task_str = [('task_id', str(task_id)), ('task_name', str(task['task_name'])),
|
|
|
|
+ ('source', str(task['source'])), ('next_time', str(task['next_time'])),
|
|
|
|
+ ('interval_piaoquan', str(task['interval_piaoquan'])),
|
|
|
|
+ ('play_cnt', eval(task['spider_rule'])['play_cnt']),
|
|
|
|
+ ('video_width', eval(task['spider_rule'])['video_width']),
|
|
|
|
+ ('video_height', eval(task['spider_rule'])['video_height']),
|
|
|
|
+ ('video_like', eval(task['spider_rule'])['video_like']),
|
|
|
|
+ ('share_cnt', eval(task['spider_rule'])['share_cnt']),
|
|
|
|
+ ('duration_min', eval(task['spider_rule'])['duration']['min']),
|
|
|
|
+ ('duration_max', eval(task['spider_rule'])['duration']['max']),
|
|
|
|
+ ('task_type', task['task_type']), ('spider_link', eval(task['spider_link'])),
|
|
|
|
+ ('spider_name', str(task['spider_name'])), ('min_publish_time', str(task['min_publish_time'])),
|
|
|
|
+ ('min_publish_day', str(task['min_publish_day'])), ('media_id', str(task['media_id'])),
|
|
|
|
+ ('applets_status', str(task['applets_status'])), ('app_status', str(task['app_status'])),
|
|
|
|
+ ('user_tag', str(task['user_tag'])), ('user_content_tag', str(task['user_content_tag'])),
|
|
|
|
+ ('machine', str(task['machine']))]
|
|
task_str = str(task_str).replace(' ', '')
|
|
task_str = str(task_str).replace(' ', '')
|
|
- cmd = f"""sh scheduling/scheduling_main/scheduling.sh {source}/{source}_main/{spider_name}_scheduling.py --log_type="{spider_name}" --crawler="{source}" --task="{str(task_str)}" --oss_endpoint="{oss_endpoint}" --env="{env}" --machine="{machine}" {source}/{source}-nohup.log """
|
|
|
|
|
|
+ cmd = f"""sh scheduling/scheduling_main/scheduling.sh {source}/{source}_main/{spider_name}_scheduling.py --log_type="{spider_name}" --crawler="{source}" --task="{str(task_str)}" --oss_endpoint="{oss_endpoint}" --env="{env}" {source}/{source}-nohup.log """
|
|
Common.logger(log_type, crawler).info(f"cmd:{cmd}\n")
|
|
Common.logger(log_type, crawler).info(f"cmd:{cmd}\n")
|
|
os.system(cmd)
|
|
os.system(cmd)
|
|
|
|
|
|
|
|
|
|
-
|
|
|
|
-
|
|
|
|
if __name__ == "__main__":
|
|
if __name__ == "__main__":
|
|
# print(Scheduling.get_task("scheduling", "scheduling", "dev", "local"))
|
|
# print(Scheduling.get_task("scheduling", "scheduling", "dev", "local"))
|
|
# print(Scheduling.get_redis("scheduling", "scheduling", "dev", "local"))
|
|
# print(Scheduling.get_redis("scheduling", "scheduling", "dev", "local"))
|
|
# Scheduling.write_redis("scheduling", "scheduling", "dev", "local")
|
|
# Scheduling.write_redis("scheduling", "scheduling", "dev", "local")
|
|
- Scheduling.scheduling_task("scheduling", "scheduling", "dev", "local")
|
|
|
|
|
|
+ Scheduling.scheduling_task("scheduling", "scheduling", "dev")
|
|
|
|
|
|
- pass
|
|
|
|
|
|
+ pass
|