import os import sys import time import requests from flask import Flask, request from flask import jsonify sys.path.append(os.path.abspath(os.path.join(os.getcwd(), ".."))) from common.db.mysql_help import MysqlHelper from user_spider.user_info import * app = Flask(__name__) app.config['JSON_AS_ASCII'] = False # 只接受get方法访问 @app.route("/v1/crawler/source/getall", methods=["GET"]) def getSource(): try: # 获取传入的params参数 get_data = request.args.to_dict() # # 对参数进行操作 sql = 'select source, task_type, spider_name, machine, source_desc, task_type_desc, spider_name_desc from crawler_source' result = MysqlHelper.get_values(sql) if not result: return jsonify({'code': '200', 'result': [], 'message': '没有更多数据'}) source_list = list() for source, task_type, spider_name, machine, source_desc, task_type_desc, spider_name_desc in result: source_dict = { 'task_type': [ { 'type': task_type, 'description': task_type_desc, 'spider': { 'spider_name': spider_name, 'description': spider_name_desc } } ], 'description': source_desc, 'source': source, 'machine': machine } source_list.append(source_dict) except Exception as e: return jsonify({'code': '400', 'message': '获取数据源信息失败'}) return jsonify({'code': '200', 'result': source_list}) @app.route("/v1/crawler/task/insert", methods=["POST"]) def insertTask(): try: data = request.json outer_info = data.get('spider_link') source = data.get('source') exist_outer_info = list() s_sql = f"""select spider_link from crawler_task where source='{source}'""" result = MysqlHelper.get_values(s_sql) if result: for outer_link in outer_info: for link_tuple in result: if outer_link in link_tuple[0]: exist_outer_info.append(outer_link) if exist_outer_info: return jsonify({'code': 200, 'message': '名单重复', 'repeat_list': exist_outer_info}) # 字段转换 tag_name_list = [] content_tag_list = [] user_tag = data['user_tag'] user_content_tag = data['user_content_tag'] for tag in user_tag: tag_name_list.append(tag['tagName']) for tag in user_content_tag: content_tag_list.append(tag['tagName']) if data['min_publish_time']: data['min_publish_time'] = int(data['min_publish_time'] / 1000) else: data['min_publish_time'] = 0 # if data['min_publish_day']: # data['min_publish_day'] = 0 data['next_time'] = int(data['next_time'] / 1000) data['insert_time'] = int(time.time()) data['update_time'] = int(time.time()) data['spider_link'] = str(data['spider_link']) data['spider_rule'] = str(data['spider_rule']) data['user_tag'] = ','.join(str(i) for i in tag_name_list) data['user_content_tag'] = ','.join(str(i) for i in content_tag_list) # data['crawler_interval'] = data.pop('interval') # 获取到一个以键且为逗号分隔的字符串,返回一个字符串 keys = ','.join(data.keys()) values = ','.join(['%s'] * len(data)) sql = 'insert into {table}({keys}) VALUES({values})'.format(table='crawler_task', keys=keys, values=values) MysqlHelper.insert_values(sql, tuple(data.values())) except Exception as e: return jsonify({'code': 400, 'message': '任务写入失败'}) return jsonify({'code': 200, 'message': 'task create success'}) @app.route("/v1/crawler/task/gettask", methods=["GET"]) def getAllTask(): try: get_data = request.args.to_dict() page = int(get_data.get('page', 1)) offset = int(get_data.get('offset', 10)) start_count = (page * offset) - offset end_count = page * offset sql = f"""select task_id, task_name, insert_time from crawler_task order by update_time desc limit {start_count}, {end_count} """ result = MysqlHelper.get_values(sql) if not result: return jsonify({'code': '200', 'result': [], 'message': '没有更多任务'}) source_list = list() for task_id, task_name, insert_time in result: data = dict( task_id=task_id, task_name=task_name, insert_time=insert_time * 1000 ) source_list.append(data) t_sql = f"""select count(*) from crawler_task""" t_res = MysqlHelper.get_values(t_sql) total = t_res[0][0] except Exception as e: return jsonify({"code": "400", 'message': "任务列表获取失败"}) return jsonify({'code': '200', 'result': source_list, 'total': total}) @app.route("/v1/crawler/task/getone", methods=["GET"]) def getOneTask(): try: get_data = request.args.to_dict() task_id = get_data['task_id'] sql = f'select task_id, spider_link from crawler_task where task_id={task_id}' result = MysqlHelper.get_values(sql) if not result: return jsonify({'code': '200', 'result': [], 'message': 'no data'}) for task_id, spider_link in result: data = dict( task_id=task_id, spider_link=eval(spider_link), ) except Exception as e: return jsonify({'code': '400', "message": "获取任务信息失败"}) return jsonify({'code': '200', 'result': data}) @app.route("/v1/crawler/task/update", methods=["POST"]) def updateTask(): try: task_id = request.json.get('task_id') spider_link = request.json.get('spider_link') sql = f"""UPDATE crawler_task SET spider_link="{spider_link}", update_time={int(time.time())} where task_id = {task_id}""" result = MysqlHelper.update_values(sql) if result: return jsonify({'code': 200, 'message': 'task update success'}) else: return jsonify({'code': 400, 'message': 'task update faild'}) except Exception as e: return jsonify({'code': 400, 'message': '任务更新失败'}) # def get_user_info(source): # source_spider = { # 'xigua': xigua_user_info # } # return source_spider.get(source) @app.route("/v1/crawler/author/create", methods=["POST"]) def createUser(): get_media_url = 'http://longvideoapi-internal.piaoquantv.com/longvideoapi/user/virtual/crawler/registerVirtualUser' spider_link = request.json.get('spider_link') source = request.json.get('source') task_type = request.json.get('task_type') applets_status = request.json.get('applets_status') app_status = request.json.get('app_status') user_tag = request.json.get('user_tag') user_content_tag = request.json.get('user_content_tag') success_list = list() fail_list = list() for author_url in spider_link: try: f_sql = f"""select spider_link from crawler_author_map where spider_link="{author_url}" """ result = MysqlHelper.get_values(f_sql) if result: success_list.append(author_url) continue else: tag_name_list = [] content_tag_list = [] for tag in user_tag: tag_name_list.append(tag['tagName']) for tag in user_content_tag: content_tag_list.append(tag['tagName']) user_tags = ','.join(str(i) for i in tag_name_list) user_content_tags = ','.join(str(i) for i in content_tag_list) post_data = { # 'count': 1, # (必须)账号个数:传1 # 'accountType': 4, # (必须)账号类型 :传 4 app虚拟账号 'pwd': '', # 密码 默认 12346 'nickName': '', # 昵称 默认 vuser...... 'avatarUrl': '', # 头像Url 默认 http://weapppiccdn.yishihui.com/resources/images/pic_normal.png 'tagName': user_tags, # 多条数据用英文逗号分割 } response = requests.post(url=get_media_url, params=post_data) media_id = response.json()['data'] data = dict( spider_link=author_url, media_id=media_id, source=source, task_type=task_type, applets_status=applets_status, app_status=app_status, user_tag=user_tags, user_content_tag=user_content_tags, insert_time=int(time.time()), update_time=int(time.time()) ) keys = ','.join(data.keys()) values = ','.join(['%s'] * len(data)) table = 'crawler_author_map' sql = f"""insert into {table}({keys}) VALUES({values})""" result = MysqlHelper.insert_values(sql, tuple(data.values())) if not result: fail_list.append(author_url) else: success_list.append(author_url) except Exception as e: fail_list.append(author_url) continue return jsonify({'code': 200, 'result': {'success': success_list, 'fail': fail_list}}) if __name__ == "__main__": app.run(debug=True, port=5050)