# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2023/7/3 from common.common import Common from common.scheduling_db import MysqlHelper from common.public import task_fun_mq from gongzhonghao.gongzhonghao_author.gongzhonghao_author import GongzhonghaoAuthor def get_author_videos(log_type, crawler, token_index, task_dict, rule_dict, user_list, env): Common.logger(log_type, crawler).info(f'开始抓取:{task_dict["taskName"]}\n') Common.logging(log_type, crawler, env, f'开始抓取:{task_dict["taskName"]}\n') Common.logger(log_type, crawler).info(f"user_list:{user_list}") Common.logging(log_type, crawler, env, f"user_list:{user_list}") GongzhonghaoAuthor.get_all_videos(log_type=log_type, crawler=crawler, token_index = token_index, rule_dict=rule_dict, user_list = user_list, env=env) Common.del_logs(log_type, crawler) Common.logger(log_type, crawler).info('抓取一轮结束\n') Common.logging(log_type, crawler, env, '抓取一轮结束\n') def main(log_type, crawler, topic_name, group_id, env): # # 解析 task_dict # task_dict = task_fun_mq(msg.message_body)['task_dict'] # Common.logger(log_type, crawler).info(f"调度任务:{task_dict}") # Common.logging(log_type, crawler, env, f"调度任务:{task_dict}") # # # 解析 rule_dict # rule_dict = task_fun_mq(msg.message_body)['rule_dict'] # Common.logger(log_type, crawler).info(f"抓取规则:{rule_dict}\n") # Common.logging(log_type, crawler, env, f"抓取规则:{rule_dict}\n") # # # 解析 user_list # task_id = task_dict['id'] # select_user_sql = f"""select * from crawler_user_v3 where task_id={task_id}""" # user_list = MysqlHelper.get_values(log_type, crawler, select_user_sql, env, action="") # # # 计算启动脚本数 crawler_num # user_num = len(user_list) # chunk_size = 100 # 每个进程处理的用户数量 # crawler_num = int(user_num // chunk_size) # 向下取整 # if user_num % chunk_size != 0: # crawler_num += 1 # Common.logger(log_type, crawler).info(f"共{user_num}个公众号,需要启动{crawler_num}个脚本任务") # Common.logging(log_type, crawler, env, f"共{user_num}个公众号,需要启动{crawler_num}个脚本任务") # # # 多进程并行抓取 # processes = [] # for i in range(crawler_num): # start = i * chunk_size # end = min((i + 1) * chunk_size, user_num + 1) # process = Process(target=get_author_videos, args=(f"{log_type}{i+1}", crawler, i+1, task_dict, rule_dict, user_list[start:end], env)) # process.start() # processes.append(process) # # for process in processes: # process.join() # # # Common.logger(log_type, crawler).info(f"Consume Message Fail! Exception:{err}\n") # Common.logging(log_type, crawler, env, f"Consume Message Fail! Exception:{err}\n") # time.sleep(2) # continue pass