import copy import logging import os import sys import time import requests from flask import Flask, request from flask import jsonify from dotenv import load_dotenv sys.path.append(os.path.abspath(os.path.join(os.getcwd(), ".."))) from conf.config import get_config from common.db.mysql_help import MysqlHelper load_dotenv(verbose=True) env = os.getenv('env') app = Flask(__name__) app.config['JSON_AS_ASCII'] = False # mysql实例 mysql_con = MysqlHelper() conf = get_config() @app.route("/v1/crawler/task/addlink", methods=["POST"]) def addSpiderLink(): try: data = request.json spider_link = data['spider_link'] task_id = data['task_id'] sql = f'select * from crawler_author_map where spider_link={spider_link}' result = mysql_con.get_values(sql) now_time = int(time.time()) if result: is_del = result[0]['is_del'] if is_del: return jsonify({'code': 400, 'message': '抓取名单重复'}) else: old_task_id = result[0]['task_id'] if task_id == old_task_id: up_sql = f'update crawler_author_map set is_del=1 where spider_link="{spider_link}"' else: up_sql = f'update crawler_author_map set task_id={task_id},is_del=1 where spider_link="{spider_link}"' mysql_con.update_values(up_sql) return jsonify({'code': 200, 'message': '抓取名单增加成功'}) # task_sql = f'select spider_link from crawler_task where task_id ={task_id}' # task = mysql_con.get_values(task_sql) # spider_links = eval(task[0]['spider_link']) # spider_links.append(spider_link) # str_spider_links = str(spider_links) # u_sql = f'update crawler_task set spider_link="{str_spider_links}", update_time={now_time} where task_id={task_id}' # mysql_con.update_values(u_sql) else: sql = f'select * from crawler_task where task_id={task_id}' result = mysql_con.get_values(sql) success_list, fail_list = create_uid(result[0], task_id, spider_link=[spider_link]) spider_links = eval(result[0]['spider_link']) spider_links.append(spider_link) str_spider_links = str(spider_links) u_sql = f'update crawler_task set spider_link="{str_spider_links}", update_time={now_time} where task_id={task_id}' mysql_con.update_values(u_sql) return jsonify({'code': 200, 'message': '抓取名单增加成功', 'add_link': success_list}) except Exception as e: return jsonify( {'code': 400, 'message': '抓取名单删除失败', 'spider_link': spider_link}) @app.route("/v1/crawler/task/dellink", methods=["POST"]) def delSpiderLink(): data = request.json spider_link = data['spider_link'] task_id = data['task_id'] up_sql = f'update crawler_author_map set is_del=0 where spider_link="{spider_link}"' MysqlHelper.update_values(up_sql) sql = f'select * from crawler_task where task_id ={task_id}' task = mysql_con.get_values(sql) spider_links = eval(task[0]['spider_link']) spider_links.remove(spider_link) now_time = int(time.time()) u_sql = f'update crawler_task set spider_link="{spider_links}",update_time={now_time} where task_id={task_id}' mysql_con.update_values(u_sql) if spider_link: return jsonify({'code': 200, 'message': '抓取名单删除成功', 'del_link': spider_link}) else: return jsonify( {'code': 400, 'message': '抓取名单删除失败', 'del_link': spider_link}) @app.route("/v1/crawler/task/getcategory", methods=["GET"]) def getCategory(): sql = f'select id, content_category from crawler_content_category' result = mysql_con.get_values(sql) return jsonify({'code': 200, 'data': result}) @app.route("/v1/crawler/task/getboard", methods=["GET"]) def getBoard(): sql = f'select id, mode_board from crawler_board' result = mysql_con.get_values(sql) return jsonify({'code': 200, 'data': result}) @app.route("/v1/crawler/task/getmodename", methods=["GET"]) def getModeName(): sql = f'select id, mode_name from crawler_mode' result = mysql_con.get_values(sql) return jsonify({'code': 200, 'data': result}) @app.route("/v1/crawler/task/getrecommendboard", methods=["GET"]) def getRecommendBoard(): sql = f'select id, mode_board from crawler_recommend_board' result = mysql_con.get_values(sql) return jsonify({'code': 200, 'data': result}) @app.route("/v1/crawler/user/findmedia", methods=["GET"]) def getMediaInfo(): data = request.args.to_dict() task_id = data['task_id'] sql = f'select * from crawler_author_map where task_id={task_id} and is_del=1' result = mysql_con.get_values(sql) task_user_info = [] for task_info in result: media_id = task_info['media_id'] media_info = requests.get(url=conf['select_media_url'], params={'uid': media_id},verify=False).json()['content'] media_name = media_info['longvideoNickName'] if media_info['longvideoNickName'] else media_info['nickName'] nick_name = task_info['nick_name'] spider_link = task_info['spider_link'] create_user_time = task_info['create_user_time'] media_data = dict( media_name=media_name, nick_name=nick_name, spider_link=spider_link, media_id={'media_id': media_id, 'media_url': conf['media_main_url'].format(media_id)}, create_user_time=create_user_time * 1000 ) task_user_info.append(media_data) return jsonify({'code': 200, 'data': task_user_info}) @app.route("/v1/crawler/task/findtask", methods=["GET"]) def getTaskUserInfo(): # 根据条件查找任务 data = request.args.to_dict() values = '' for k, v in data.items(): if isinstance(v, int): values += f'{k}={v} and ' else: values += f'{k}="{v}" and ' sql = f"select task_id from crawler_author_map where {values[:-4]}" # [:-1]是为了去掉末尾的逗号 res = mysql_con.get_values(sql) task_id = res['task_id'] sql = f'select task_name, source, task_type, create_task_user, insert_time, update_task_user, update_time from crawler_task where task_id={task_id} ' task_info = mysql_con.get_values(sql) return jsonify({'code': 200, 'data': task_info}) # 只接受get方法访问 @app.route("/v1/crawler/source/getall", methods=["GET"]) def getSource(): try: # # 对参数进行操作 sql = 'select * from crawler_source' result = mysql_con.get_values(sql) if not result: return jsonify({'code': '200', 'result': [], 'message': '没有更多数据'}) except Exception as e: return jsonify({'code': '400', 'message': '获取数据源信息失败'}) return jsonify({'code': '200', 'result': result}) @app.route("/v1/crawler/source/getasktype", methods=["GET"]) def getTaskType(): try: data = request.args.to_dict() source = data['source'] # # 对参数进行操作 sql = f'select * from crawler_task_type where source="{source}"' result = mysql_con.get_values(sql) if not result: return jsonify({'code': '200', 'result': [], 'message': '没有更多数据'}) else: task_type_list = list() for task_type_info in result: task_info = { 'type': task_type_info['task_type'], 'description': task_type_info['task_type_desc'], 'spider': { 'spider_name': task_type_info['spider_name'], 'description': task_type_info['spider_name_desc'] } } task_type_list.append(task_info) source_dict = { 'task_type': task_type_list, } except Exception as e: return jsonify({'code': '400', 'message': '获取数据源信息失败'}) return jsonify({'code': '200', 'result': source_dict}) @app.route("/v1/crawler/task/checkrepeat", methods=["POST"]) def get_repeat_list(): data = request.json # 字段转换 spider_links = data.get('spider_link') repeat_list = list() # 判断是否为重复名单 for spider_link in spider_links: if isinstance(spider_link, int): s_sql = f"""select spider_link from crawler_author_map where spider_link={spider_link}""" else: s_sql = f"""select spider_link from crawler_author_map where spider_link='{spider_link}'""" result = mysql_con.get_values(s_sql) if result: repeat_list.append(spider_link) if repeat_list: return jsonify({'code': 400, 'message': '名单重复', 'repeat_list': repeat_list}) else: return jsonify({'code': 200, 'message': '抓取名单校验通过', 'repeat_list': repeat_list}) @app.route("/v1/crawler/task/insert", methods=["POST"]) def insertTask(): try: data = request.json user_data = copy.deepcopy(data) tag_name_list = [] content_tag_list = [] user_tag = data['user_tag'] user_content_tag = data['user_content_tag'] for tag in user_tag: tag_name_list.append(tag['tagName']) for tag in user_content_tag: content_tag_list.append(tag['tagName']) if data['min_publish_time']: data['min_publish_time'] = int(data['min_publish_time'] / 1000) else: data['min_publish_time'] = 0 if not data['min_publish_day']: data['min_publish_day'] = 0 data['next_time'] = int(data['next_time'] / 1000) data['insert_time'] = int(time.time()) data['update_time'] = int(time.time()) data['spider_link'] = str(data['spider_link']) data['spider_rule'] = str(data['spider_rule']) data['user_tag_info'] = str(user_tag) data['content_tag_info'] = str(user_content_tag) data['user_tag'] = ','.join(str(i) for i in tag_name_list) data['user_content_tag'] = ','.join(str(i) for i in content_tag_list) # data['crawler_interval'] = data.pop('interval') # 获取到一个以键且为逗号分隔的字符串,返回一个字符串 keys = ','.join(data.keys()) values = ','.join(['%s'] * len(data)) sql = 'insert into {table}({keys}) VALUES({values})'.format(table='crawler_task', keys=keys, values=values) task_id = mysql_con.insert_values(sql, tuple(data.values())) if task_id: spider_link = user_data['spider_link'] success_list, fail_list = create_uid(user_data, task_id, spider_link) return jsonify( {'code': 200, 'message': 'task create success', 'success_list': success_list, 'fail_list': fail_list}) except Exception as e: return jsonify({'code': 500, 'message': '任务写入失败,原因:{e}'}) @app.route("/v1/crawler/task/gettask", methods=["POST"]) def getAllTask(): try: get_data = request.json page = int(get_data.get('page', 1)) offset = int(get_data.get('offset', 10)) start_count = (page * offset) - offset end_count = page * offset if get_data.get('fields'): select_data = get_data['fields'] values = '' for k, v in select_data.items(): if isinstance(v, int): values += f'{k}={v} and ' else: values += f'{k}="{v}" and ' sql = f"select task_id from crawler_author_map where {values[:-4]} and is_del=1" # [:-1]是为了去掉末尾的逗号 res = mysql_con.get_values(sql) task_id_set = set() for task in res: task_id_set.add(task['task_id']) task_list = list() for task_id in task_id_set: sql = f'select * from crawler_task where task_id={task_id} order by update_time desc limit {start_count}, {end_count}' task_info = mysql_con.get_values(sql)[0] task_data = dict( task_id=task_info['task_id'], task_name=task_info['task_name'], source=task_info['source'], task_type=task_info['task_type'], create_task_user=task_info['create_task_user'], insert_time=task_info['insert_time'] * 1000, update_task_user=task_info['update_task_user'], update_time=task_info['update_time'] * 1000 ) task_list.append(task_data) return jsonify({'code': 200, 'result': task_list, 'total': len(task_list)}) sql = f"""select * from crawler_task order by update_time desc limit {start_count}, {end_count} """ result = mysql_con.get_values(sql) if not result: return jsonify({'code': '200', 'result': [], 'message': '没有更多任务'}) task_list = list() for task_info in result: source = task_info['source'] task_type = task_info['task_type'] source_sql = f'select * from crawler_source where source="{source}"' source_info = mysql_con.get_values(source_sql) task_type_sql = f'select * from crawler_task_type where task_type="{task_type}"' type_info = mysql_con.get_values(task_type_sql) task_data = dict( task_id=task_info['task_id'], task_name=task_info['task_name'], source_name=source_info[0]['source_desc'], task_type_name=type_info[0]['task_type_desc'], source=task_info['source'], task_type=task_info['task_type'], create_task_user=task_info['create_task_user'], insert_time=task_info['insert_time'] * 1000, update_task_user=task_info['update_task_user'], update_time=task_info['update_time'] * 1000 ) task_list.append(task_data) t_sql = f"""select count(*) from crawler_task""" t_res = mysql_con.get_values(t_sql) total = t_res[0]['count(*)'] except Exception as e: return jsonify({"code": "400", 'message': "任务列表获取失败"}) return jsonify({'code': '200', 'result': task_list, 'total': total}) @app.route("/v1/crawler/task/getone", methods=["GET"]) def getOneTask(): try: get_data = request.args.to_dict() task_id = get_data['task_id'] sql = f'select * from crawler_task where task_id={task_id}' result = mysql_con.get_values(sql) if not result: return jsonify({'code': '400', 'result': [], 'message': 'no data'}) data = result[0] if data['min_publish_time']: data['min_publish_time'] = data['min_publish_time'] * 1000 else: data['min_publish_time'] = 0 data['next_time'] = data['next_time'] * 1000 data['spider_link'] = eval(data['spider_link']) data['spider_rule'] = eval(data['spider_rule']) # data['user_tag_info'] = eval(data['user_tag_info']) data['content_tag_info'] = eval(data['content_tag_info']) if not data['mode_name_id']: data['mode_name_id'] = '' if not data['mode_board_id']: data['mode_board_id'] = '' if not data['content_category_id']: data['content_category_id'] = '' except Exception as e: return jsonify({'code': '500', "message": "获取任务信息失败"}) return jsonify({'code': '200', 'result': result}) @app.route("/v1/crawler/task/update", methods=["POST"]) def updateTask(): try: data = request.json task_id = data.get('task_id') task_info = data.get('task_info') values = '' if task_info['min_publish_time']: task_info['min_publish_time'] = task_info['min_publish_time'] / 1000 else: task_info['min_publish_time'] = 0 if not task_info['min_publish_day']: task_info['min_publish_day'] = 0 task_info['next_time'] = task_info['next_time'] / 1000 user_tag = task_info['user_tag'] user_content_tag = task_info['user_content_tag'] tag_name_list = [] content_tag_list = [] for tag in user_tag: tag_name_list.append(tag['tagName']) for tag in user_content_tag: content_tag_list.append(tag['tagName']) task_info['user_tag_info'] = str(user_tag) task_info['content_tag_info'] = str(user_content_tag) task_info['user_tag'] = ','.join(str(i) for i in tag_name_list) task_info['user_content_tag'] = ','.join(str(i) for i in content_tag_list) for k, v in task_info.items(): if isinstance(v, int): values += f'{k}={v},' else: values += f'{k}="{v}",' sql = f'update crawler_task set {values[:-1]} where task_id={task_id}' result = mysql_con.update_values(sql) if result: return jsonify({'code': 200, 'message': 'task update success'}) else: return jsonify({'code': 400, 'message': 'task update faild'}) except Exception as e: return jsonify({'code': 400, 'message': '任务更新失败'}) def create_uid(task, task_id, spider_link): if not isinstance(spider_link, list): spider_link = eval(spider_link) source = task.get('source') task_type = task.get('task_type') applets_status = task.get('applets_status') app_status = task.get('app_status') try: user_tag = eval(task.get('user_tag_info')) user_content_tag = eval(task.get('content_tag_info')) except Exception as e: user_tag = task.get('user_tag') user_content_tag = task.get('user_content_tag') mode_name_id = task.get('mode_name_id', 0) mode_board_id = task.get('mode_board_id', 0) content_category_id = task.get('content_category_id', 0) mn_sql = f'select * from crawler_mode where id={mode_name_id}' mode_name_list = mysql_con.get_values(mn_sql) mb_sql = f'select * from crawler_board where id={mode_board_id}' mode_board_list = mysql_con.get_values(mb_sql) cc_sql = f'select * from crawler_content_category where id={content_category_id}' content_category_list = mysql_con.get_values(cc_sql) source_sql = f'select * from crawler_source where source="{source}"' source_res = mysql_con.get_values(source_sql)[0] spider_platform = source_res['source_desc'] if mode_name_list: task['mode_name_str'] = mode_name_list[0]['mode_name'] else: task['mode_name_str'] = '' if mode_board_list: task['mode_board_str'] = mode_board_list[0]['mode_board'] else: task['mode_board_str'] = '' if content_category_list: task['content_category_str'] = content_category_list[0]['content_category'] else: task['content_category_str'] = '' success_list = list() fail_list = list() tag_name_list = list() content_tag_list = list() for tag in user_tag: tag_name_list.append(tag['tagName']) for tag in user_content_tag: content_tag_list.append(tag['tagName']) user_tags = ','.join(str(i) for i in tag_name_list) user_content_tags = ','.join(str(i) for i in content_tag_list) for author_url in spider_link: now_time = int(time.time()) time_array = time.localtime(now_time) str_time = time.strftime("%Y%m%d", time_array) # 生成创建用户的tag tags = '' if task['task_type'] == 'author': spider_task = '账号' tags_list = ['spider', spider_task, spider_platform, user_tags, task['content_category_str'], str_time] elif task['task_type'] == 'search': spider_task = '搜索' tags_list = ['spider', spider_task, spider_platform, user_tags, author_url, task['content_category_str'], str_time] elif task['task_type'] == 'board': spider_task = '榜单' mode_tags = task['mode_board_str'] tags_list = ['spider', spider_task, spider_platform, user_tags, mode_tags, task['content_category_str'], str_time] elif task['task_type'] == 'recommend': spider_task = '推荐' mode_tags = task['mode_name_str'] + task['mode_board_str'] tags_list = ['spider', spider_task, spider_platform, user_tags, mode_tags, task['content_category_str'], str_time] else: tags_list = ['spider', spider_platform, user_tags, task['content_category_str'], str_time] for v in tags_list: if v: tags += str(v) + ',' post_data = { # 'count': 1, # (必须)账号个数:传1 # 'accountType': 4, # (必须)账号类型 :传 4 app虚拟账号 'pwd': '', # 密码 默认 12346 'nickName': '', # 昵称 默认 vuser...... 'avatarUrl': '', # 头像Url 默认 http://weapppiccdn.yishihui.com/resources/images/pic_normal.png 'tagName': tags[:-1], # 多条数据用英文逗号分割 } try: response = requests.post(url=conf['media_url'], params=post_data) media_id = response.json()['data'] media_info = requests.get(url=conf['select_media_url'], params={'uid': media_id},verify=False).json()['content'] except Exception as e: logging.warning(f'创建账户:{spider_link},失败,原因:{e}') fail_list.append(author_url) continue data = dict( spider_link=author_url, media_id=media_id, media_name=media_info['longvideoNickName'] if media_info['longvideoNickName'] else media_info['nickName'], source=source, task_type=task_type, applets_status=applets_status, app_status=app_status, user_tag=user_tags, user_content_tag=user_content_tags, insert_time=int(time.time()), update_time=int(time.time()), create_user_time=now_time, mode_name_str=task['mode_name_str'], mode_board_str=task['mode_board_str'], content_category_str=task['content_category_str'], # mode_value_str=mode_value_str, task_id=task_id, media_main_url=conf['media_main_url'].format(media_id) ) keys = ','.join(data.keys()) values = ','.join(['%s'] * len(data)) table = 'crawler_author_map' sql = f"""insert into {table}({keys}) VALUES({values})""" mysql_con.insert_values(sql, tuple(data.values())) uer_info = dict( outer_id=author_url, uid=media_id ) success_list.append(uer_info) return success_list, fail_list @app.route("/v1/crawler/author/create", methods=["POST"]) def createUser(): spider_link = request.json.get('spider_link') source = request.json.get('source') task_type = request.json.get('task_type') applets_status = request.json.get('applets_status') app_status = request.json.get('app_status') user_tag = request.json.get('user_tag') user_content_tag = request.json.get('user_content_tag') success_list = list() fail_list = list() for author_url in spider_link: try: f_sql = f"""select spider_link from crawler_author_map where spider_link="{author_url}" """ result = mysql_con.get_values(f_sql) if result: success_list.append(author_url) continue else: tag_name_list = [] content_tag_list = [] for tag in user_tag: tag_name_list.append(tag['tagName']) for tag in user_content_tag: content_tag_list.append(tag['tagName']) user_tags = ','.join(str(i) for i in tag_name_list) user_content_tags = ','.join(str(i) for i in content_tag_list) post_data = { # 'count': 1, # (必须)账号个数:传1 # 'accountType': 4, # (必须)账号类型 :传 4 app虚拟账号 'pwd': '', # 密码 默认 12346 'nickName': '', # 昵称 默认 vuser...... 'avatarUrl': '', # 头像Url 默认 http://weapppiccdn.yishihui.com/resources/images/pic_normal.png 'tagName': user_tags, # 多条数据用英文逗号分割 } response = requests.post(url=conf['media_url'], params=post_data) media_id = response.json()['data'] data = dict( spider_link=author_url, media_id=media_id, source=source, task_type=task_type, applets_status=applets_status, app_status=app_status, user_tag=user_tags, user_content_tag=user_content_tags, insert_time=int(time.time()), update_time=int(time.time()) ) keys = ','.join(data.keys()) values = ','.join(['%s'] * len(data)) table = 'crawler_author_map' sql = f"""insert into {table}({keys}) VALUES({values})""" result = mysql_con.insert_values(sql, tuple(data.values())) if not result: fail_list.append(author_url) else: success_list.append(author_url) except Exception as e: fail_list.append(author_url) continue return jsonify({'code': 200, 'result': {'success': success_list, 'fail': fail_list}}) if __name__ == "__main__": app.run(debug=True, port=5050)