12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- import json
- import os
- import sys
- import asyncio
- import argparse
- import inspect
- import traceback
- sys.path.append(os.getcwd())
- from application.common import MysqlHelper, AliyunLogger
- from spider.spider_map import spider_map
- class OnlineManager(object):
- """
- 线上爬虫模版
- """
- def __init__(self, task_id, mode, platform):
- self.env = "prod"
- self.task_id = task_id
- self.mode = mode
- self.platform = platform
- self.MySQL = MysqlHelper(mode=self.mode, platform=self.platform, env=self.env)
- self.logger = AliyunLogger(platform=self.platform, mode=mode)
- def get_task_rule(self):
- """
- :return: 返回任务的规则, task_rule
- """
- rule_dict = {}
- task_rule_sql = f"SELECT rule FROM crawler_task_v3 WHERE id = {self.task_id};"
- data = self.MySQL.select(task_rule_sql)
- if data:
- rule_list = json.loads(data[0][0])
- for item in rule_list:
- for key in item:
- rule_dict[key] = item[key]
- self.logger.logging(
- code=1000,
- message="抓取规则",
- data=rule_dict
- )
- return rule_dict
- def get_task_user_list(self):
- """
- :return: 返回用户列表
- """
- task_user_list_sql = f"SELECT uid, link, nick_name from crawler_user_v3 where task_id = {self.task_id};"
- uid_list = self.MySQL.select(task_user_list_sql)
- user_list = [{"uid": i[0], "link": i[1], "nick_name": i[2]} for i in uid_list] if uid_list else []
- self.logger.logging(
- code=1000,
- message="用户列表",
- data=user_list
- )
- return user_list
- def start_crawl(self):
- """
- :return: 爬虫启动脚本
- """
- rule_dict = self.get_task_rule()
- user_list = self.get_task_user_list()
- if rule_dict and user_list:
- try:
- spider_class = spider_map[self.platform][self.mode]
- self.logger.logging(code=1003, message="开始一轮抓取")
- main_process = spider_class(
- platform=self.platform,
- mode=self.mode,
- rule_dict=rule_dict,
- user_list=user_list,
- env=self.env
- )
- if inspect.iscoroutinefunction(main_process.run):
- loop = asyncio.get_event_loop()
- loop.run_until_complete(main_process.run())
- else:
- main_process.run()
- self.logger.logging(code=1004, message="完成一轮抓取")
- except Exception as e:
- self.logger.logging(code=1006, message=f"启动爬虫出现错误, 报错原因是: {e}\n{traceback.format_exc()}")
- if __name__ == "__main__":
- parser = argparse.ArgumentParser() # 新建参数解释器对象
- parser.add_argument("--task_id")
- parser.add_argument("--mode")
- parser.add_argument("--platform")
- args = parser.parse_args()
- M = OnlineManager(task_id=args.task_id, mode=args.mode, platform=args.platform)
- M.start_crawl()
|