1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465 |
- # -*- coding: utf-8 -*-
- # @Author: wangkun
- # @Time: 2023/7/3
- import os
- import sys
- from multiprocessing import Process
- sys.path.append(os.getcwd())
- from common.common import Common
- from common.scheduling_db import MysqlHelper
- from gongzhonghao.gongzhonghao_author.gongzhonghao_author import GongzhonghaoAuthor
- def get_author_videos(log_type, crawler, task_dict, token_index, rule_dict, user_list, env):
- Common.logger(log_type, crawler).info(f'开始抓取:公众号账号\n')
- Common.logging(log_type, crawler, env, f'开始抓取:公众号账号\n')
- Common.logger(log_type, crawler).info(f"user_list:{user_list}")
- Common.logging(log_type, crawler, env, f"user_list:{user_list}")
- GongzhonghaoAuthor.get_all_videos(log_type=log_type,
- crawler=crawler,
- task_dict=task_dict,
- token_index = token_index,
- rule_dict=rule_dict,
- user_list = user_list,
- env=env)
- Common.del_logs(log_type, crawler)
- Common.logger(log_type, crawler).info('抓取一轮结束\n')
- Common.logging(log_type, crawler, env, '抓取一轮结束\n')
- def main(log_type, crawler, env):
- task_dict = {'createTime': 1688382816512, 'id': 54, 'interval': 200, 'machine': 'aliyun', 'mode': 'author', 'operator': '王坤', 'rule': {'period': {'min': 1, 'max': 1}, 'duration': {'min': 20, 'max': 2700}}, 'source': 'gongzhonghao', 'spiderName': 'run_gzh_author', 'startTime': 1688456874000, 'status': 0, 'taskName': '公众号账号', 'updateTime': 1688456876643}
- # 解析 rule_dict
- rule_dict = {"period":{"min":1,"max":1},"duration":{"min":20,"max":2700}}
- Common.logger(log_type, crawler).info(f"抓取规则:{rule_dict}\n")
- Common.logging(log_type, crawler, env, f"抓取规则:{rule_dict}\n")
- # 解析 user_list
- task_id = 54
- select_user_sql = f"""select * from crawler_user_v3 where task_id={task_id}"""
- user_list = MysqlHelper.get_values(log_type, crawler, select_user_sql, env, action="")
- # 计算启动脚本数 crawler_num
- user_num = len(user_list)
- chunk_size = 2 # 每个进程处理的用户数量
- crawler_num = int(user_num // chunk_size) # 向下取整
- if user_num % chunk_size != 0:
- crawler_num += 1
- Common.logger(log_type, crawler).info(f"共{user_num}个公众号,需要启动{crawler_num}个脚本任务")
- Common.logging(log_type, crawler, env, f"共{user_num}个公众号,需要启动{crawler_num}个脚本任务")
- # 多进程并行抓取
- processes = []
- for i in range(crawler_num):
- start = i * chunk_size
- end = min((i + 1) * chunk_size, user_num + 1)
- process = Process(target=get_author_videos, args=(f"{log_type}{i+1}", crawler, task_dict, i+1, rule_dict, user_list[start:end], env))
- process.start()
- processes.append(process)
- for process in processes:
- process.join()
- if __name__ == "__main__":
- main(log_type="author", crawler="gongzhonghao", env="dev")
|