run_gzh_author_dev.py 3.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. # -*- coding: utf-8 -*-
  2. # @Author: wangkun
  3. # @Time: 2023/7/3
  4. import os
  5. import sys
  6. from multiprocessing import Process
  7. sys.path.append(os.getcwd())
  8. from common.common import Common
  9. from common.scheduling_db import MysqlHelper
  10. from gongzhonghao.gongzhonghao_author.gongzhonghao_author import GongzhonghaoAuthor
  11. def get_author_videos(log_type, crawler, task_dict, token_index, rule_dict, user_list, env):
  12. Common.logger(log_type, crawler).info(f'开始抓取:公众号账号\n')
  13. Common.logging(log_type, crawler, env, f'开始抓取:公众号账号\n')
  14. Common.logger(log_type, crawler).info(f"user_list:{user_list}")
  15. Common.logging(log_type, crawler, env, f"user_list:{user_list}")
  16. GongzhonghaoAuthor.get_all_videos(log_type=log_type,
  17. crawler=crawler,
  18. task_dict=task_dict,
  19. token_index = token_index,
  20. rule_dict=rule_dict,
  21. user_list = user_list,
  22. env=env)
  23. Common.del_logs(log_type, crawler)
  24. Common.logger(log_type, crawler).info('抓取一轮结束\n')
  25. Common.logging(log_type, crawler, env, '抓取一轮结束\n')
  26. def main(log_type, crawler, env):
  27. task_dict = {'createTime': 1688382816512, 'id': 54, 'interval': 200, 'machine': 'aliyun', 'mode': 'author', 'operator': '王坤', 'rule': {'period': {'min': 1, 'max': 1}, 'duration': {'min': 20, 'max': 2700}}, 'source': 'gongzhonghao', 'spiderName': 'run_gzh_author', 'startTime': 1688456874000, 'status': 0, 'taskName': '公众号账号', 'updateTime': 1688456876643}
  28. # 解析 rule_dict
  29. rule_dict = {"period":{"min":1,"max":1},"duration":{"min":20,"max":2700}}
  30. Common.logger(log_type, crawler).info(f"抓取规则:{rule_dict}\n")
  31. Common.logging(log_type, crawler, env, f"抓取规则:{rule_dict}\n")
  32. # 解析 user_list
  33. task_id = 54
  34. select_user_sql = f"""select * from crawler_user_v3 where task_id={task_id}"""
  35. user_list = MysqlHelper.get_values(log_type, crawler, select_user_sql, env, action="")
  36. # 计算启动脚本数 crawler_num
  37. user_num = len(user_list)
  38. chunk_size = 2 # 每个进程处理的用户数量
  39. crawler_num = int(user_num // chunk_size) # 向下取整
  40. if user_num % chunk_size != 0:
  41. crawler_num += 1
  42. Common.logger(log_type, crawler).info(f"共{user_num}个公众号,需要启动{crawler_num}个脚本任务")
  43. Common.logging(log_type, crawler, env, f"共{user_num}个公众号,需要启动{crawler_num}个脚本任务")
  44. # 多进程并行抓取
  45. processes = []
  46. for i in range(crawler_num):
  47. start = i * chunk_size
  48. end = min((i + 1) * chunk_size, user_num + 1)
  49. process = Process(target=get_author_videos, args=(f"{log_type}{i+1}", crawler, task_dict, i+1, rule_dict, user_list[start:end], env))
  50. process.start()
  51. processes.append(process)
  52. for process in processes:
  53. process.join()
  54. if __name__ == "__main__":
  55. main(log_type="author", crawler="gongzhonghao", env="dev")