# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2023/7/3 import os import sys from multiprocessing import Process sys.path.append(os.getcwd()) from common.common import Common from common.scheduling_db import MysqlHelper from gongzhonghao.gongzhonghao_author.gongzhonghao_author import GongzhonghaoAuthor def get_author_videos(log_type, crawler, task_dict, token_index, rule_dict, user_list, env): Common.logger(log_type, crawler).info(f'开始抓取:公众号账号\n') Common.logging(log_type, crawler, env, f'开始抓取:公众号账号\n') Common.logger(log_type, crawler).info(f"user_list:{user_list}") Common.logging(log_type, crawler, env, f"user_list:{user_list}") GongzhonghaoAuthor.get_all_videos(log_type=log_type, crawler=crawler, task_dict=task_dict, token_index = token_index, rule_dict=rule_dict, user_list = user_list, env=env) Common.del_logs(log_type, crawler) Common.logger(log_type, crawler).info('抓取一轮结束\n') Common.logging(log_type, crawler, env, '抓取一轮结束\n') def main(log_type, crawler, env): task_dict = {'createTime': 1688382816512, 'id': 54, 'interval': 200, 'machine': 'aliyun', 'mode': 'author', 'operator': '王坤', 'rule': {'period': {'min': 1, 'max': 1}, 'duration': {'min': 20, 'max': 2700}}, 'source': 'gongzhonghao', 'spiderName': 'run_gzh_author', 'startTime': 1688456874000, 'status': 0, 'taskName': '公众号账号', 'updateTime': 1688456876643} # 解析 rule_dict rule_dict = {"period":{"min":1,"max":1},"duration":{"min":20,"max":2700}} Common.logger(log_type, crawler).info(f"抓取规则:{rule_dict}\n") Common.logging(log_type, crawler, env, f"抓取规则:{rule_dict}\n") # 解析 user_list task_id = 54 select_user_sql = f"""select * from crawler_user_v3 where task_id={task_id}""" user_list = MysqlHelper.get_values(log_type, crawler, select_user_sql, env, action="") # 计算启动脚本数 crawler_num user_num = len(user_list) chunk_size = 2 # 每个进程处理的用户数量 crawler_num = int(user_num // chunk_size) # 向下取整 if user_num % chunk_size != 0: crawler_num += 1 Common.logger(log_type, crawler).info(f"共{user_num}个公众号,需要启动{crawler_num}个脚本任务") Common.logging(log_type, crawler, env, f"共{user_num}个公众号,需要启动{crawler_num}个脚本任务") # 多进程并行抓取 processes = [] for i in range(crawler_num): start = i * chunk_size end = min((i + 1) * chunk_size, user_num + 1) process = Process(target=get_author_videos, args=(f"{log_type}{i+1}", crawler, task_dict, i+1, rule_dict, user_list[start:end], env)) process.start() processes.append(process) for process in processes: process.join() if __name__ == "__main__": main(log_type="author", crawler="gongzhonghao", env="dev")