123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641 |
- import copy
- import logging
- import os
- import sys
- import time
- import requests
- from flask import Flask, request
- from flask import jsonify
- from dotenv import load_dotenv
- sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "..")))
- from conf.config import get_config
- from common.db.mysql_help import MysqlHelper
- load_dotenv(verbose=True)
- env = os.getenv('env')
- app = Flask(__name__)
- app.config['JSON_AS_ASCII'] = False
- # mysql实例
- mysql_con = MysqlHelper()
- conf = get_config()
- @app.route("/v1/crawler/task/addlink", methods=["POST"])
- def addSpiderLink():
- try:
- data = request.json
- spider_link = data['spider_link']
- task_id = data['task_id']
- sql = f'select * from crawler_author_map where spider_link={spider_link}'
- result = mysql_con.get_values(sql)
- now_time = int(time.time())
- if result:
- old_task_id = result[0]['task_id']
- if task_id == old_task_id:
- up_sql = f'update crawler_author_map set is_del=1 where spider_link="{spider_link}"'
- else:
- up_sql = f'update crawler_author_map set task_id={task_id} where spider_link="{spider_link}"'
- res = mysql_con.update_values(up_sql)
- task_sql = f'select spider_link from crawler_task where task_id ={task_id}'
- task = mysql_con.get_values(task_sql)
- spider_links = eval(task[0]['spider_link'])
- spider_links.append(spider_link)
- str_spider_links = str(spider_links)
- u_sql = f'update crawler_task set spider_link="{str_spider_links}", update_time={now_time} where task_id={task_id}'
- mysql_con.update_values(u_sql)
- return jsonify({'code': 200, 'message': '抓取名单增加成功', 'del_link': spider_link})
- else:
- sql = f'select * from crawler_task where task_id={task_id}'
- result = mysql_con.get_values(sql)
- success_list, fail_list = create_uid(result[0], task_id, spider_link=[spider_link])
- spider_links = eval(result[0]['spider_link'])
- spider_links.append(spider_link)
- str_spider_links = str(spider_links)
- u_sql = f'update crawler_task set spider_link="{str_spider_links}", update_time={now_time} where task_id={task_id}'
- mysql_con.update_values(u_sql)
- return jsonify({'code': 200, 'message': '抓取名单增加成功', 'add_link': success_list})
- except Exception as e:
- return jsonify(
- {'code': 400, 'message': '抓取名单删除失败', 'spider_link': spider_link})
- @app.route("/v1/crawler/task/dellink", methods=["POST"])
- def delSpiderLink():
- data = request.json
- spider_link = data['spider_link']
- task_id = data['task_id']
- up_sql = f'update crawler_author_map set is_del=0 where spider_link="{spider_link}"'
- MysqlHelper.update_values(up_sql)
- sql = f'select * from crawler_task where task_id ={task_id}'
- task = mysql_con.get_values(sql)
- spider_links = eval(task[0]['spider_link'])
- spider_links.remove(spider_link)
- now_time = int(time.time())
- u_sql = f'update crawler_task set spider_link="{spider_links}",update_time={now_time} where task_id={task_id}'
- mysql_con.update_values(u_sql)
- if spider_link:
- return jsonify({'code': 200, 'message': '抓取名单删除成功', 'del_link': spider_link})
- else:
- return jsonify(
- {'code': 400, 'message': '抓取名单删除失败', 'del_link': spider_link})
- @app.route("/v1/crawler/task/getcategory", methods=["GET"])
- def getCategory():
- sql = f'select id, content_category from crawler_content_category'
- result = mysql_con.get_values(sql)
- return jsonify({'code': 200, 'data': result})
- @app.route("/v1/crawler/task/getboard", methods=["GET"])
- def getBoard():
- sql = f'select id, mode_board from crawler_board'
- result = mysql_con.get_values(sql)
- return jsonify({'code': 200, 'data': result})
- @app.route("/v1/crawler/task/getmodename", methods=["GET"])
- def getModeName():
- sql = f'select id, mode_name from crawler_mode'
- result = mysql_con.get_values(sql)
- return jsonify({'code': 200, 'data': result})
- @app.route("/v1/crawler/task/getrecommendboard", methods=["GET"])
- def getRecommendBoard():
- sql = f'select id, mode_board from crawler_recommend_board'
- result = mysql_con.get_values(sql)
- return jsonify({'code': 200, 'data': result})
- @app.route("/v1/crawler/user/findmedia", methods=["GET"])
- def getMediaInfo():
- data = request.args.to_dict()
- task_id = data['task_id']
- sql = f'select * from crawler_author_map where task_id={task_id} and is_del=1'
- result = mysql_con.get_values(sql)
- task_user_info = []
- for task_info in result:
- media_id = task_info['media_id']
- media_info = requests.get(url=conf['select_media_url'], params={'uid': media_id}).json()['content']
- media_name = media_info['longvideoNickName'] if media_info['longvideoNickName'] else media_info['nickName']
- nick_name = task_info['nick_name']
- spider_link = task_info['spider_link']
- create_user_time = task_info['create_user_time']
- media_data = dict(
- media_name=media_name,
- nick_name=nick_name,
- spider_link=spider_link,
- media_id={'media_id': media_id, 'media_url': conf['media_main_url'].format(media_id)},
- create_user_time=create_user_time * 1000
- )
- task_user_info.append(media_data)
- return jsonify({'code': 200, 'data': task_user_info})
- @app.route("/v1/crawler/task/findtask", methods=["GET"])
- def getTaskUserInfo():
- # 根据条件查找任务
- data = request.args.to_dict()
- values = ''
- for k, v in data.items():
- if isinstance(v, int):
- values += f'{k}={v} and '
- else:
- values += f'{k}="{v}" and '
- sql = f"select task_id from crawler_author_map where {values[:-4]}" # [:-1]是为了去掉末尾的逗号
- res = mysql_con.get_values(sql)
- task_id = res['task_id']
- sql = f'select task_name, source, task_type, create_task_user, insert_time, update_task_user, update_time from crawler_task where task_id={task_id} '
- task_info = mysql_con.get_values(sql)
- return jsonify({'code': 200, 'data': task_info})
- # 只接受get方法访问
- @app.route("/v1/crawler/source/getall", methods=["GET"])
- def getSource():
- try:
- # # 对参数进行操作
- sql = 'select * from crawler_source'
- result = mysql_con.get_values(sql)
- if not result:
- return jsonify({'code': '200', 'result': [], 'message': '没有更多数据'})
- except Exception as e:
- return jsonify({'code': '400', 'message': '获取数据源信息失败'})
- return jsonify({'code': '200', 'result': result})
- @app.route("/v1/crawler/source/getasktype", methods=["GET"])
- def getTaskType():
- try:
- data = request.args.to_dict()
- source = data['source']
- # # 对参数进行操作
- sql = f'select * from crawler_task_type where source="{source}"'
- result = mysql_con.get_values(sql)
- if not result:
- return jsonify({'code': '200', 'result': [], 'message': '没有更多数据'})
- else:
- task_type_list = list()
- for task_type_info in result:
- task_info = {
- 'type': task_type_info['task_type'],
- 'description': task_type_info['task_type_desc'],
- 'spider': {
- 'spider_name': task_type_info['spider_name'],
- 'description': task_type_info['spider_name_desc']
- }
- }
- task_type_list.append(task_info)
- source_dict = {
- 'task_type': task_type_list,
- }
- except Exception as e:
- return jsonify({'code': '400', 'message': '获取数据源信息失败'})
- return jsonify({'code': '200', 'result': source_dict})
- @app.route("/v1/crawler/task/checkrepeat", methods=["POST"])
- def get_repeat_list():
- data = request.json
- # 字段转换
- spider_links = data.get('spider_link')
- repeat_list = list()
- # 判断是否为重复名单
- for spider_link in spider_links:
- if isinstance(spider_link, int):
- s_sql = f"""select spider_link from crawler_author_map where spider_link={spider_link}"""
- else:
- s_sql = f"""select spider_link from crawler_author_map where spider_link='{spider_link}'"""
- result = mysql_con.get_values(s_sql)
- if result:
- repeat_list.append(spider_link)
- if repeat_list:
- return jsonify({'code': 400, 'message': '名单重复', 'repeat_list': repeat_list})
- else:
- return jsonify({'code': 200, 'message': '抓取名单校验通过', 'repeat_list': repeat_list})
- @app.route("/v1/crawler/task/insert", methods=["POST"])
- def insertTask():
- try:
- data = request.json
- user_data = copy.deepcopy(data)
- tag_name_list = []
- content_tag_list = []
- user_tag = data['user_tag']
- user_content_tag = data['user_content_tag']
- for tag in user_tag:
- tag_name_list.append(tag['tagName'])
- for tag in user_content_tag:
- content_tag_list.append(tag['tagName'])
- if data['min_publish_time']:
- data['min_publish_time'] = int(data['min_publish_time'] / 1000)
- else:
- data['min_publish_time'] = 0
- if not data['min_publish_day']:
- data['min_publish_day'] = 0
- data['next_time'] = int(data['next_time'] / 1000)
- data['insert_time'] = int(time.time())
- data['update_time'] = int(time.time())
- data['spider_link'] = str(data['spider_link'])
- data['spider_rule'] = str(data['spider_rule'])
- data['user_tag_info'] = str(user_tag)
- data['content_tag_info'] = str(user_content_tag)
- data['user_tag'] = ','.join(str(i) for i in tag_name_list)
- data['user_content_tag'] = ','.join(str(i) for i in content_tag_list)
- # data['crawler_interval'] = data.pop('interval')
- # 获取到一个以键且为逗号分隔的字符串,返回一个字符串
- keys = ','.join(data.keys())
- values = ','.join(['%s'] * len(data))
- sql = 'insert into {table}({keys}) VALUES({values})'.format(table='crawler_task', keys=keys, values=values)
- task_id = mysql_con.insert_values(sql, tuple(data.values()))
- if task_id:
- spider_link = user_data['spider_link']
- success_list, fail_list = create_uid(user_data, task_id, spider_link)
- return jsonify(
- {'code': 200, 'message': 'task create success', 'success_list': success_list, 'fail_list': fail_list})
- except Exception as e:
- return jsonify({'code': 500, 'message': '任务写入失败,原因:{e}'})
- @app.route("/v1/crawler/task/gettask", methods=["POST"])
- def getAllTask():
- try:
- get_data = request.json
- page = int(get_data.get('page', 1))
- offset = int(get_data.get('offset', 10))
- start_count = (page * offset) - offset
- end_count = page * offset
- if get_data.get('fields'):
- select_data = get_data['fields']
- values = ''
- for k, v in select_data.items():
- if isinstance(v, int):
- values += f'{k}={v} and '
- else:
- values += f'{k}="{v}" and '
- sql = f"select task_id from crawler_author_map where {values[:-4]} and is_del=1" # [:-1]是为了去掉末尾的逗号
- res = mysql_con.get_values(sql)
- task_id_set = set()
- for task in res:
- task_id_set.add(task['task_id'])
- task_list = list()
- for task_id in task_id_set:
- sql = f'select * from crawler_task where task_id={task_id} order by update_time desc limit {start_count}, {end_count}'
- task_info = mysql_con.get_values(sql)[0]
- task_data = dict(
- task_id=task_info['task_id'],
- task_name=task_info['task_name'],
- source=task_info['source'],
- task_type=task_info['task_type'],
- create_task_user=task_info['create_task_user'],
- insert_time=task_info['insert_time'] * 1000,
- update_task_user=task_info['update_task_user'],
- update_time=task_info['update_time'] * 1000
- )
- task_list.append(task_data)
- return jsonify({'code': 200, 'result': task_list, 'total': len(task_list)})
- sql = f"""select * from crawler_task order by update_time desc limit {start_count}, {end_count} """
- result = mysql_con.get_values(sql)
- if not result:
- return jsonify({'code': '200', 'result': [], 'message': '没有更多任务'})
- task_list = list()
- for task_info in result:
- source = task_info['source']
- task_type = task_info['task_type']
- source_sql = f'select * from crawler_source where source="{source}"'
- source_info = mysql_con.get_values(source_sql)
- task_type_sql = f'select * from crawler_task_type where task_type="{task_type}"'
- type_info = mysql_con.get_values(task_type_sql)
- task_data = dict(
- task_id=task_info['task_id'],
- task_name=task_info['task_name'],
- source_name=source_info[0]['source_desc'],
- task_type_name=type_info[0]['task_type_desc'],
- source=task_info['source'],
- task_type=task_info['task_type'],
- create_task_user=task_info['create_task_user'],
- insert_time=task_info['insert_time'] * 1000,
- update_task_user=task_info['update_task_user'],
- update_time=task_info['update_time'] * 1000
- )
- task_list.append(task_data)
- t_sql = f"""select count(*) from crawler_task"""
- t_res = mysql_con.get_values(t_sql)
- total = t_res[0]['count(*)']
- except Exception as e:
- return jsonify({"code": "400", 'message': "任务列表获取失败"})
- return jsonify({'code': '200', 'result': task_list, 'total': total})
- @app.route("/v1/crawler/task/getone", methods=["GET"])
- def getOneTask():
- try:
- get_data = request.args.to_dict()
- task_id = get_data['task_id']
- sql = f'select * from crawler_task where task_id={task_id}'
- result = mysql_con.get_values(sql)
- if not result:
- return jsonify({'code': '400', 'result': [], 'message': 'no data'})
- data = result[0]
- if data['min_publish_time']:
- data['min_publish_time'] = data['min_publish_time'] * 1000
- else:
- data['min_publish_time'] = 0
- data['next_time'] = data['next_time'] * 1000
- data['spider_link'] = eval(data['spider_link'])
- data['spider_rule'] = eval(data['spider_rule'])
- #
- data['user_tag_info'] = eval(data['user_tag_info'])
- data['content_tag_info'] = eval(data['content_tag_info'])
- if not data['mode_name_id']:
- data['mode_name_id'] = ''
- if not data['mode_board_id']:
- data['mode_board_id'] = ''
- if not data['content_category_id']:
- data['content_category_id'] = ''
- except Exception as e:
- return jsonify({'code': '500', "message": "获取任务信息失败"})
- return jsonify({'code': '200', 'result': result})
- @app.route("/v1/crawler/task/update", methods=["POST"])
- def updateTask():
- try:
- data = request.json
- task_id = data.get('task_id')
- task_info = data.get('task_info')
- values = ''
- if task_info['min_publish_time']:
- task_info['min_publish_time'] = task_info['min_publish_time'] / 1000
- else:
- task_info['min_publish_time'] = 0
- if not task_info['min_publish_day']:
- task_info['min_publish_day'] = 0
- task_info['next_time'] = task_info['next_time'] / 1000
- user_tag = task_info['user_tag']
- user_content_tag = task_info['user_content_tag']
- tag_name_list = []
- content_tag_list = []
- for tag in user_tag:
- tag_name_list.append(tag['tagName'])
- for tag in user_content_tag:
- content_tag_list.append(tag['tagName'])
- task_info['user_tag_info'] = str(user_tag)
- task_info['content_tag_info'] = str(user_content_tag)
- task_info['user_tag'] = ','.join(str(i) for i in tag_name_list)
- task_info['user_content_tag'] = ','.join(str(i) for i in content_tag_list)
- for k, v in task_info.items():
- if isinstance(v, int):
- values += f'{k}={v},'
- else:
- values += f'{k}="{v}",'
- sql = f'update crawler_task set {values[:-1]} where task_id={task_id}'
- result = mysql_con.update_values(sql)
- if result:
- return jsonify({'code': 200, 'message': 'task update success'})
- else:
- return jsonify({'code': 400, 'message': 'task update faild'})
- except Exception as e:
- return jsonify({'code': 400, 'message': '任务更新失败'})
- def create_uid(task, task_id, spider_link):
- if not isinstance(spider_link, list):
- spider_link = eval(spider_link)
- source = task.get('source')
- task_type = task.get('task_type')
- applets_status = task.get('applets_status')
- app_status = task.get('app_status')
- try:
- user_tag = eval(task.get('user_tag_info'))
- user_content_tag = eval(task.get('content_tag_info'))
- except Exception as e:
- user_tag = task.get('user_tag')
- user_content_tag = task.get('user_content_tag')
- mode_name_id = task.get('mode_name_id', 0)
- mode_board_id = task.get('mode_board_id', 0)
- content_category_id = task.get('content_category_id', 0)
- mn_sql = f'select * from crawler_mode where id={mode_name_id}'
- mode_name_list = mysql_con.get_values(mn_sql)
- mb_sql = f'select * from crawler_board where id={mode_board_id}'
- mode_board_list = mysql_con.get_values(mb_sql)
- cc_sql = f'select * from crawler_content_category where id={content_category_id}'
- content_category_list = mysql_con.get_values(cc_sql)
- source_sql = f'select * from crawler_source where source="{source}"'
- source_res = mysql_con.get_values(source_sql)[0]
- spider_platform = source_res['source_desc']
- if mode_name_list:
- task['mode_name_str'] = mode_name_list[0]['mode_name']
- else:
- task['mode_name_str'] = ''
- if mode_board_list:
- task['mode_board_str'] = mode_board_list[0]['mode_board']
- else:
- task['mode_board_str'] = ''
- if content_category_list:
- task['content_category_str'] = content_category_list[0]['content_category']
- else:
- task['content_category_str'] = ''
- success_list = list()
- fail_list = list()
- tag_name_list = list()
- content_tag_list = list()
- for tag in user_tag:
- tag_name_list.append(tag['tagName'])
- for tag in user_content_tag:
- content_tag_list.append(tag['tagName'])
- user_tags = ','.join(str(i) for i in tag_name_list)
- user_content_tags = ','.join(str(i) for i in content_tag_list)
- for author_url in spider_link:
- now_time = int(time.time())
- time_array = time.localtime(now_time)
- str_time = time.strftime("%Y%m%d", time_array)
- # 生成创建用户的tag
- tags = ''
- if task['task_type'] == 'author':
- spider_task = '账号'
- tags_list = ['spider', spider_task, spider_platform, user_tags, task['content_category_str'], str_time]
- elif task['task_type'] == 'search':
- spider_task = '搜索'
- tags_list = ['spider', spider_task, spider_platform, user_tags, author_url, task['content_category_str'], str_time]
- elif task['task_type'] == 'board':
- spider_task = '榜单'
- mode_tags = task['mode_board_str']
- tags_list = ['spider', spider_task, spider_platform, user_tags, mode_tags, task['content_category_str'], str_time]
- elif task['task_type'] == 'recommend':
- spider_task = '推荐'
- mode_tags = task['mode_name_str'] + task['mode_board_str']
- tags_list = ['spider', spider_task, spider_platform, user_tags, mode_tags, task['content_category_str'], str_time]
- else:
- tags_list = ['spider', spider_platform, user_tags, task['content_category_str'], str_time]
- for v in tags_list:
- if v:
- tags += str(v) + ','
- post_data = {
- # 'count': 1, # (必须)账号个数:传1
- # 'accountType': 4, # (必须)账号类型 :传 4 app虚拟账号
- 'pwd': '', # 密码 默认 12346
- 'nickName': '', # 昵称 默认 vuser......
- 'avatarUrl': '',
- # 头像Url 默认 http://weapppiccdn.yishihui.com/resources/images/pic_normal.png
- 'tagName': tags[:-1], # 多条数据用英文逗号分割
- }
- try:
- response = requests.post(url=conf['media_url'], params=post_data)
- media_id = response.json()['data']
- media_info = requests.get(url=conf['select_media_url'], params={'uid': media_id}).json()['content']
- except Exception as e:
- logging.warning(f'创建账户:{spider_link},失败,原因:{e}')
- fail_list.append(author_url)
- continue
- data = dict(
- spider_link=author_url,
- media_id=media_id,
- media_name=media_info['longvideoNickName'] if media_info['longvideoNickName'] else media_info['nickName'],
- source=source,
- task_type=task_type,
- applets_status=applets_status,
- app_status=app_status,
- user_tag=user_tags,
- user_content_tag=user_content_tags,
- insert_time=int(time.time()),
- update_time=int(time.time()),
- create_user_time=now_time,
- mode_name_str=task['mode_name_str'],
- mode_board_str=task['mode_board_str'],
- content_category_str=task['content_category_str'],
- # mode_value_str=mode_value_str,
- task_id=task_id,
- media_main_url=conf['media_main_url'].format(media_id)
- )
- keys = ','.join(data.keys())
- values = ','.join(['%s'] * len(data))
- table = 'crawler_author_map'
- sql = f"""insert into {table}({keys}) VALUES({values})"""
- mysql_con.insert_values(sql, tuple(data.values()))
- uer_info = dict(
- outer_id=author_url,
- uid=media_id
- )
- success_list.append(uer_info)
- return success_list, fail_list
- @app.route("/v1/crawler/author/create", methods=["POST"])
- def createUser():
- spider_link = request.json.get('spider_link')
- source = request.json.get('source')
- task_type = request.json.get('task_type')
- applets_status = request.json.get('applets_status')
- app_status = request.json.get('app_status')
- user_tag = request.json.get('user_tag')
- user_content_tag = request.json.get('user_content_tag')
- success_list = list()
- fail_list = list()
- for author_url in spider_link:
- try:
- f_sql = f"""select spider_link from crawler_author_map where spider_link="{author_url}" """
- result = mysql_con.get_values(f_sql)
- if result:
- success_list.append(author_url)
- continue
- else:
- tag_name_list = []
- content_tag_list = []
- for tag in user_tag:
- tag_name_list.append(tag['tagName'])
- for tag in user_content_tag:
- content_tag_list.append(tag['tagName'])
- user_tags = ','.join(str(i) for i in tag_name_list)
- user_content_tags = ','.join(str(i) for i in content_tag_list)
- post_data = {
- # 'count': 1, # (必须)账号个数:传1
- # 'accountType': 4, # (必须)账号类型 :传 4 app虚拟账号
- 'pwd': '', # 密码 默认 12346
- 'nickName': '', # 昵称 默认 vuser......
- 'avatarUrl': '',
- # 头像Url 默认 http://weapppiccdn.yishihui.com/resources/images/pic_normal.png
- 'tagName': user_tags, # 多条数据用英文逗号分割
- }
- response = requests.post(url=conf['media_url'], params=post_data)
- media_id = response.json()['data']
- data = dict(
- spider_link=author_url,
- media_id=media_id,
- source=source,
- task_type=task_type,
- applets_status=applets_status,
- app_status=app_status,
- user_tag=user_tags,
- user_content_tag=user_content_tags,
- insert_time=int(time.time()),
- update_time=int(time.time())
- )
- keys = ','.join(data.keys())
- values = ','.join(['%s'] * len(data))
- table = 'crawler_author_map'
- sql = f"""insert into {table}({keys}) VALUES({values})"""
- result = mysql_con.insert_values(sql, tuple(data.values()))
- if not result:
- fail_list.append(author_url)
- else:
- success_list.append(author_url)
- except Exception as e:
- fail_list.append(author_url)
- continue
- return jsonify({'code': 200, 'result': {'success': success_list, 'fail': fail_list}})
- if __name__ == "__main__":
- app.run(debug=True, port=5050)
|