run_gzh_author_dev.py 3.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. # -*- coding: utf-8 -*-
  2. # @Author: wangkun
  3. # @Time: 2023/7/3
  4. from common.common import Common
  5. from common.scheduling_db import MysqlHelper
  6. from common.public import task_fun_mq
  7. from gongzhonghao.gongzhonghao_author.gongzhonghao_author import GongzhonghaoAuthor
  8. def get_author_videos(log_type, crawler, token_index, task_dict, rule_dict, user_list, env):
  9. Common.logger(log_type, crawler).info(f'开始抓取:{task_dict["taskName"]}\n')
  10. Common.logging(log_type, crawler, env, f'开始抓取:{task_dict["taskName"]}\n')
  11. Common.logger(log_type, crawler).info(f"user_list:{user_list}")
  12. Common.logging(log_type, crawler, env, f"user_list:{user_list}")
  13. GongzhonghaoAuthor.get_all_videos(log_type=log_type,
  14. crawler=crawler,
  15. token_index = token_index,
  16. rule_dict=rule_dict,
  17. user_list = user_list,
  18. env=env)
  19. Common.del_logs(log_type, crawler)
  20. Common.logger(log_type, crawler).info('抓取一轮结束\n')
  21. Common.logging(log_type, crawler, env, '抓取一轮结束\n')
  22. def main(log_type, crawler, topic_name, group_id, env):
  23. # # 解析 task_dict
  24. # task_dict = task_fun_mq(msg.message_body)['task_dict']
  25. # Common.logger(log_type, crawler).info(f"调度任务:{task_dict}")
  26. # Common.logging(log_type, crawler, env, f"调度任务:{task_dict}")
  27. #
  28. # # 解析 rule_dict
  29. # rule_dict = task_fun_mq(msg.message_body)['rule_dict']
  30. # Common.logger(log_type, crawler).info(f"抓取规则:{rule_dict}\n")
  31. # Common.logging(log_type, crawler, env, f"抓取规则:{rule_dict}\n")
  32. #
  33. # # 解析 user_list
  34. # task_id = task_dict['id']
  35. # select_user_sql = f"""select * from crawler_user_v3 where task_id={task_id}"""
  36. # user_list = MysqlHelper.get_values(log_type, crawler, select_user_sql, env, action="")
  37. #
  38. # # 计算启动脚本数 crawler_num
  39. # user_num = len(user_list)
  40. # chunk_size = 100 # 每个进程处理的用户数量
  41. # crawler_num = int(user_num // chunk_size) # 向下取整
  42. # if user_num % chunk_size != 0:
  43. # crawler_num += 1
  44. # Common.logger(log_type, crawler).info(f"共{user_num}个公众号,需要启动{crawler_num}个脚本任务")
  45. # Common.logging(log_type, crawler, env, f"共{user_num}个公众号,需要启动{crawler_num}个脚本任务")
  46. #
  47. # # 多进程并行抓取
  48. # processes = []
  49. # for i in range(crawler_num):
  50. # start = i * chunk_size
  51. # end = min((i + 1) * chunk_size, user_num + 1)
  52. # process = Process(target=get_author_videos, args=(f"{log_type}{i+1}", crawler, i+1, task_dict, rule_dict, user_list[start:end], env))
  53. # process.start()
  54. # processes.append(process)
  55. #
  56. # for process in processes:
  57. # process.join()
  58. #
  59. #
  60. # Common.logger(log_type, crawler).info(f"Consume Message Fail! Exception:{err}\n")
  61. # Common.logging(log_type, crawler, env, f"Consume Message Fail! Exception:{err}\n")
  62. # time.sleep(2)
  63. # continue
  64. pass