run_gzh_author_dev.py 3.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. # -*- coding: utf-8 -*-
  2. # @Author: wangkun
  3. # @Time: 2023/7/3
  4. import os
  5. import sys
  6. from multiprocessing import Process
  7. sys.path.append(os.getcwd())
  8. from common.common import Common
  9. from common.scheduling_db import MysqlHelper
  10. from gongzhonghao.gongzhonghao_author.gongzhonghao_author_lock import GongzhonghaoAuthor
  11. def get_author_videos(log_type, crawler, task_dict, rule_dict, user_list, env):
  12. Common.logger(log_type, crawler).info(f'开始抓取:公众号账号\n')
  13. Common.logging(log_type, crawler, env, f'开始抓取:公众号账号\n')
  14. Common.logger(log_type, crawler).info(f"user_list:{user_list}")
  15. Common.logging(log_type, crawler, env, f"user_list:{user_list}")
  16. GongzhonghaoAuthor.get_all_videos(log_type=log_type,
  17. crawler=crawler,
  18. task_dict=task_dict,
  19. rule_dict=rule_dict,
  20. user_list = user_list,
  21. env=env)
  22. # Common.del_logs(log_type, crawler)
  23. Common.logger(log_type, crawler).info('抓取一轮结束\n')
  24. Common.logging(log_type, crawler, env, '抓取一轮结束\n')
  25. def main(log_type, crawler, env):
  26. task_dict = {'createTime': 1688382816512, 'id': 54, 'interval': 200, 'machine': 'aliyun', 'mode': 'author', 'operator': '王坤', 'rule': {'period': {'min': 1, 'max': 1}, 'duration': {'min': 20, 'max': 2700}}, 'source': 'gongzhonghao', 'spiderName': 'run_gzh_author', 'startTime': 1688456874000, 'status': 0, 'taskName': '公众号账号', 'updateTime': 1688456876643}
  27. # 解析 rule_dict
  28. rule_dict = {"period":{"min":1,"max":1},"duration":{"min":20,"max":2700}}
  29. Common.logger(log_type, crawler).info(f"抓取规则:{rule_dict}\n")
  30. Common.logging(log_type, crawler, env, f"抓取规则:{rule_dict}\n")
  31. # 解析 user_list
  32. task_id = 54
  33. select_user_sql = f"""select * from crawler_user_v3 where task_id={task_id}"""
  34. user_list = MysqlHelper.get_values(log_type, crawler, select_user_sql, env, action="")
  35. # 计算启动脚本数 crawler_num
  36. user_num = len(user_list)
  37. chunk_size = 1 # 每个进程处理的用户数量
  38. crawler_num = int(user_num // chunk_size) # 向下取整
  39. if user_num % chunk_size != 0:
  40. crawler_num += 1
  41. Common.logger(log_type, crawler).info(f"共{user_num}个公众号,需要启动{crawler_num}个脚本任务")
  42. Common.logging(log_type, crawler, env, f"共{user_num}个公众号,需要启动{crawler_num}个脚本任务")
  43. # 多进程并行抓取
  44. processes = []
  45. for i in range(crawler_num):
  46. start = i * chunk_size
  47. end = min((i + 1) * chunk_size, user_num + 1)
  48. process = Process(target=get_author_videos, args=(f"{log_type}{i+1}", crawler, task_dict, rule_dict, user_list[start:end], env))
  49. process.start()
  50. processes.append(process)
  51. for process in processes:
  52. process.join()
  53. if __name__ == "__main__":
  54. main(log_type="author", crawler="gongzhonghao", env="dev")