|
@@ -1,46 +1,18 @@
|
|
|
# -*- coding: utf-8 -*-
|
|
|
# @Author: wangkun
|
|
|
-<<<<<<<< HEAD:gongzhonghao/gongzhonghao_main/run_gzh_author_old.py
|
|
|
# @Time: 2023/6/30
|
|
|
import argparse
|
|
|
-from multiprocessing import Process
|
|
|
-========
|
|
|
-# @Time: 2023/7/27
|
|
|
-import argparse
|
|
|
import random
|
|
|
|
|
|
->>>>>>>> master:xigua/xigua_main/run_xgms_recommend.py
|
|
|
from mq_http_sdk.mq_client import *
|
|
|
from mq_http_sdk.mq_consumer import *
|
|
|
from mq_http_sdk.mq_exception import MQExceptionBase
|
|
|
|
|
|
sys.path.append(os.getcwd())
|
|
|
from common.common import Common
|
|
|
-<<<<<<<< HEAD:gongzhonghao/gongzhonghao_main/run_gzh_author_old.py
|
|
|
-from common.scheduling_db import MysqlHelper
|
|
|
-from gongzhonghao.gongzhonghao_author.gongzhonghao_author import GongzhonghaoAuthor
|
|
|
-
|
|
|
-
|
|
|
-def get_author_videos(log_type, crawler, token_index, task_dict, rule_dict, user_list, env):
|
|
|
- Common.logger(log_type, crawler).info(f'开始抓取:{task_dict["taskName"]}\n')
|
|
|
- Common.logging(log_type, crawler, env, f'开始抓取:{task_dict["taskName"]}\n')
|
|
|
- Common.logger(log_type, crawler).info(f"user_list:{user_list}")
|
|
|
- Common.logging(log_type, crawler, env, f"user_list:{user_list}")
|
|
|
- GongzhonghaoAuthor.get_all_videos(log_type=log_type,
|
|
|
- crawler=crawler,
|
|
|
- task_dict=task_dict,
|
|
|
- token_index=token_index,
|
|
|
- rule_dict=rule_dict,
|
|
|
- user_list=user_list,
|
|
|
- env=env)
|
|
|
- Common.del_logs(log_type, crawler)
|
|
|
- Common.logger(log_type, crawler).info('抓取一轮结束\n')
|
|
|
- Common.logging(log_type, crawler, env, '抓取一轮结束\n')
|
|
|
-========
|
|
|
from common.public import get_consumer, ack_message, task_fun_mq
|
|
|
from common.scheduling_db import MysqlHelper
|
|
|
from xigua.xigua_recommend.xgms_recommend import XiguaRecommend
|
|
|
->>>>>>>> master:xigua/xigua_main/run_xgms_recommend.py
|
|
|
|
|
|
|
|
|
def main(log_type, crawler, topic_name, group_id, env):
|
|
@@ -87,45 +59,8 @@ def main(log_type, crawler, topic_name, group_id, env):
|
|
|
# ack_mq_message
|
|
|
ack_message(log_type=log_type, crawler=crawler, recv_msgs=recv_msgs, consumer=consumer)
|
|
|
|
|
|
- # 解析 task_dict
|
|
|
+ # 处理爬虫业务
|
|
|
task_dict = task_fun_mq(msg.message_body)['task_dict']
|
|
|
-<<<<<<<< HEAD:gongzhonghao/gongzhonghao_main/run_gzh_author_old.py
|
|
|
- Common.logger(log_type, crawler).info(f"调度任务:{task_dict}")
|
|
|
- Common.logging(log_type, crawler, env, f"调度任务:{task_dict}")
|
|
|
-
|
|
|
- # 解析 rule_dict
|
|
|
- rule_dict = task_fun_mq(msg.message_body)['rule_dict']
|
|
|
- Common.logger(log_type, crawler).info(f"抓取规则:{rule_dict}\n")
|
|
|
- Common.logging(log_type, crawler, env, f"抓取规则:{rule_dict}\n")
|
|
|
-
|
|
|
- # 解析 user_list
|
|
|
- task_id = task_dict['id']
|
|
|
- select_user_sql = f"""select * from crawler_user_v3 where task_id={task_id}"""
|
|
|
- user_list = MysqlHelper.get_values(log_type, crawler, select_user_sql, env, action="")
|
|
|
-
|
|
|
- # 计算启动脚本数 crawler_num
|
|
|
- user_num = len(user_list)
|
|
|
- chunk_size = 100 # 每个进程处理的用户数量
|
|
|
- crawler_num = int(user_num // chunk_size) # 向下取整
|
|
|
- if user_num % chunk_size != 0:
|
|
|
- crawler_num += 1
|
|
|
- Common.logger(log_type, crawler).info(f"共{user_num}个公众号,需要启动{crawler_num}个脚本任务")
|
|
|
- Common.logging(log_type, crawler, env, f"共{user_num}个公众号,需要启动{crawler_num}个脚本任务")
|
|
|
-
|
|
|
- # 多进程并行抓取
|
|
|
- processes = []
|
|
|
- for i in range(crawler_num):
|
|
|
- start = i * chunk_size
|
|
|
- end = min((i + 1) * chunk_size, user_num + 1)
|
|
|
- process = Process(target=get_author_videos, args=(
|
|
|
- f"{log_type}{i + 1}", crawler, i + 1, task_dict, rule_dict, user_list[start:end], env))
|
|
|
- process.start()
|
|
|
- processes.append(process)
|
|
|
-
|
|
|
- for process in processes:
|
|
|
- process.join()
|
|
|
-
|
|
|
-========
|
|
|
rule_dict = task_fun_mq(msg.message_body)['rule_dict']
|
|
|
task_id = task_dict['id']
|
|
|
select_user_sql = f"""select * from crawler_user_v3 where task_id={task_id}"""
|
|
@@ -154,7 +89,6 @@ def main(log_type, crawler, topic_name, group_id, env):
|
|
|
xg_recommend_duration = xg_recommend_start_time - xg_recommend_end_time
|
|
|
Common.logger(log_type, crawler).info(f"duration {xg_recommend_duration}")
|
|
|
Common.logging(log_type, crawler, env, f"duration {xg_recommend_duration}")
|
|
|
->>>>>>>> master:xigua/xigua_main/run_xgms_recommend.py
|
|
|
except MQExceptionBase as err:
|
|
|
# Topic中没有消息可消费。
|
|
|
if err.type == "MessageNotExist":
|
|
@@ -180,4 +114,4 @@ if __name__ == "__main__":
|
|
|
crawler=args.crawler,
|
|
|
topic_name=args.topic_name,
|
|
|
group_id=args.group_id,
|
|
|
- env=args.env)
|
|
|
+ env=args.env)
|