run_spider_online.py 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. import json
  2. import os
  3. import sys
  4. import asyncio
  5. import argparse
  6. sys.path.append(os.getcwd())
  7. from application.common import MysqlHelper, AliyunLogger
  8. from spider.spider_map import spider_map
  9. class OnlineManager(object):
  10. """
  11. 线上爬虫模版
  12. """
  13. def __init__(self, task_id, mode, platform):
  14. self.env = "prod"
  15. self.task_id = task_id
  16. self.mode = mode
  17. self.platform = platform
  18. self.MySQL = MysqlHelper(mode=self.mode, platform=self.platform, env=self.env)
  19. self.logger = AliyunLogger(platform=self.platform, mode=mode)
  20. def get_task_rule(self):
  21. """
  22. :return: 返回任务的规则, task_rule
  23. """
  24. rule_dict = {}
  25. task_rule_sql = f"SELECT rule FROM crawler_task_v3 WHERE id = {self.task_id};"
  26. data = self.MySQL.select(task_rule_sql)
  27. if data:
  28. rule_list = json.loads(data[0][0])
  29. for item in rule_list:
  30. for key in item:
  31. rule_dict[key] = item[key]
  32. self.logger.logging(
  33. code=1000,
  34. message="抓取规则",
  35. data=rule_dict
  36. )
  37. return rule_dict
  38. def get_task_user_list(self):
  39. """
  40. :return: 返回用户列表
  41. """
  42. task_user_list_sql = f"SELECT uid, link, nick_name from crawler_user_v3 where task_id = {self.task_id};"
  43. uid_list = self.MySQL.select(task_user_list_sql)
  44. user_list = [{"uid": i[0], "link": i[1], "nick_name": i[2]} for i in uid_list] if uid_list else []
  45. self.logger.logging(
  46. code=1000,
  47. message="用户列表",
  48. data=user_list
  49. )
  50. return user_list
  51. def start_crawl(self):
  52. """
  53. :return: 爬虫启动脚本
  54. """
  55. rule_dict = self.get_task_rule()
  56. user_list = self.get_task_user_list()
  57. if rule_dict and user_list:
  58. try:
  59. spider_class = spider_map[self.platform][self.mode]
  60. self.logger.logging(code=1003, message="开始一轮抓取")
  61. main_process = spider_class(
  62. platform=self.platform,
  63. mode=self.mode,
  64. rule_dict=rule_dict,
  65. user_list=user_list,
  66. env=self.env
  67. )
  68. loop = asyncio.get_event_loop()
  69. loop.run_until_complete(main_process.run())
  70. self.logger.logging(code=1004, message="完成一轮抓取")
  71. except Exception as e:
  72. self.logger.logging(code=1006, message="启动爬虫出现错误, 报错原因是: {}".format(e))
  73. if __name__ == "__main__":
  74. parser = argparse.ArgumentParser() # 新建参数解释器对象
  75. parser.add_argument("--task_id")
  76. parser.add_argument("--mode")
  77. parser.add_argument("--platform")
  78. args = parser.parse_args()
  79. M = OnlineManager(task_id=args.task_id, mode=args.mode, platform=args.platform)
  80. M.start_crawl()