run_spider_online.py 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. import json
  2. import os
  3. import sys
  4. import asyncio
  5. import argparse
  6. import inspect
  7. import traceback
  8. sys.path.append(os.getcwd())
  9. from application.common import MysqlHelper, AliyunLogger
  10. from spider.spider_map import spider_map
  11. class OnlineManager(object):
  12. """
  13. 线上爬虫模版
  14. """
  15. def __init__(self, task_id, mode, platform):
  16. self.env = "prod"
  17. self.task_id = task_id
  18. self.mode = mode
  19. self.platform = platform
  20. self.MySQL = MysqlHelper(mode=self.mode, platform=self.platform, env=self.env)
  21. self.logger = AliyunLogger(platform=self.platform, mode=mode)
  22. def get_task_rule(self):
  23. """
  24. :return: 返回任务的规则, task_rule
  25. """
  26. rule_dict = {}
  27. task_rule_sql = f"SELECT rule FROM crawler_task_v3 WHERE id = {self.task_id};"
  28. data = self.MySQL.select(task_rule_sql)
  29. if data:
  30. rule_list = json.loads(data[0][0])
  31. for item in rule_list:
  32. for key in item:
  33. rule_dict[key] = item[key]
  34. self.logger.logging(
  35. code=1000,
  36. message="抓取规则",
  37. data=rule_dict
  38. )
  39. return rule_dict
  40. def get_task_user_list(self):
  41. """
  42. :return: 返回用户列表
  43. """
  44. task_user_list_sql = f"SELECT uid, link, nick_name from crawler_user_v3 where task_id = {self.task_id};"
  45. uid_list = self.MySQL.select(task_user_list_sql)
  46. user_list = [{"uid": i[0], "link": i[1], "nick_name": i[2]} for i in uid_list] if uid_list else []
  47. self.logger.logging(
  48. code=1000,
  49. message="用户列表",
  50. data=user_list
  51. )
  52. return user_list
  53. def start_crawl(self):
  54. """
  55. :return: 爬虫启动脚本
  56. """
  57. rule_dict = self.get_task_rule()
  58. user_list = self.get_task_user_list()
  59. if rule_dict and user_list:
  60. try:
  61. spider_class = spider_map[self.platform][self.mode]
  62. self.logger.logging(code=1003, message="开始一轮抓取")
  63. main_process = spider_class(
  64. platform=self.platform,
  65. mode=self.mode,
  66. rule_dict=rule_dict,
  67. user_list=user_list,
  68. env=self.env
  69. )
  70. if inspect.iscoroutinefunction(main_process.run):
  71. loop = asyncio.get_event_loop()
  72. loop.run_until_complete(main_process.run())
  73. else:
  74. main_process.run()
  75. self.logger.logging(code=1004, message="完成一轮抓取")
  76. except Exception as e:
  77. self.logger.logging(code=1006, message=f"启动爬虫出现错误, 报错原因是: {e}\n{traceback.format_exc()}")
  78. if __name__ == "__main__":
  79. parser = argparse.ArgumentParser() # 新建参数解释器对象
  80. parser.add_argument("--task_id")
  81. parser.add_argument("--mode")
  82. parser.add_argument("--platform")
  83. args = parser.parse_args()
  84. M = OnlineManager(task_id=args.task_id, mode=args.mode, platform=args.platform)
  85. M.start_crawl()