import random import os import logging import json import time import traceback import ast from gevent import monkey monkey.patch_all() from flask import Flask, request from log import Log from config import set_config from recommend import video_homepage_recommend, video_relevant_recommend from category import get_category_videos from video_recall import PoolRecall from db_helper import RedisHelper from gevent.pywsgi import WSGIServer from multiprocessing import cpu_count, Process from utils import update_video_w_h_rate from user2new import user2new from params_helper import Params from manager_op import get_video_list, search_video from ad_recommend import ad_recommend_predict # from werkzeug.middleware.profiler import ProfilerMiddleware # from geventwebsocket.handler import WebSocketHandler app = Flask(__name__) log_ = Log() config_ = set_config() @app.route('/healthcheck') def health_check(): return 'ok!' # 首页推荐及tab分类 @app.route('/applet/video/homepage/recommend', methods=['GET', 'POST']) def homepage_recommend(): start_time = time.time() # in_homepage = start_time * 1000 + random.randint(0, 100) # log_.info({'type': 'homepage', 'in_homepage': in_homepage}) try: # log_.info({'request_headers': request.headers}) request_data = json.loads(request.get_data()) request_id = request_data.get('requestId') mid = request_data.get('mid') uid = request_data.get('uid') category_id = request_data.get('categoryId') size = request_data.get('size', 4) app_type = request_data.get('appType') algo_type = request_data.get('algoType') client_info = request_data.get('clientInfo') ab_exp_info = request_data.get('abExpInfo', None) ab_info_data = request_data.get('abInfoData', None) version_audit_status = request_data.get('versionAuditStatus', 2) # 小程序版本审核参数:1-审核中,2-审核通过,默认:2 params = Params(request_id=request_id) # size默认为10 if not size: size = 10 if category_id in config_.CATEGORY['recommend']: # 推荐 recommend_result = video_homepage_recommend( request_id=request_id, mid=mid, uid=uid, size=size, app_type=app_type, algo_type=algo_type, client_info=client_info, ab_exp_info=ab_exp_info, params=params, ab_info_data=ab_info_data, version_audit_status=version_audit_status ) result = {'code': 200, 'message': 'success', 'data': {'videos': recommend_result['videos']}} log_message = { 'requestUri': '/applet/video/homepage/recommend', 'logTimestamp': int(time.time() * 1000), 'request_id': request_id, 'app_type': app_type, 'client_info': client_info, 'ab_exp_info': ab_exp_info, 'ab_info_data': ab_info_data, 'version_audit_status': version_audit_status, 'category_id': category_id, 'mid': mid, 'uid': uid, 'getRecommendParamsTime': recommend_result.get('getRecommendParamsTime', ''), 'getRecommendResultTime': recommend_result.get('getRecommendResultTime', ''), 'updateRedisDataTime': recommend_result.get('updateRedisDataTime', ''), 'recommendOperation': recommend_result.get('recommendOperation', ''), 'result': result, 'executeTime': (time.time() - start_time) * 1000 } log_.info(log_message) # log_.info('category_id: {}, mid: {}, uid: {}, result: {}, execute time = {}ms'.format( # category_id, mid, uid, result, (time.time() - start_time)*1000)) return json.dumps(result) elif category_id in config_.CATEGORY['other']: # 其他类别 videos = get_category_videos() result = {'code': 200, 'message': 'success', 'data': {'videos': videos}} log_.info('category_id: {}, mid: {}, uid: {}, result: {}, execute time = {}ms'.format( category_id, mid, uid, result, (time.time() - start_time) * 1000)) return json.dumps(result) else: log_.error('categoryId error, categoryId = {}'.format(category_id)) result = {'code': -1, 'message': 'categoryId error'} return json.dumps(result) except Exception as e: log_.error(traceback.format_exc()) result = {'code': -1, 'message': 'fail'} return json.dumps(result) # 相关推荐 @app.route('/applet/video/relevant/recommend', methods=['GET', 'POST']) def relevant_recommend(): start_time = time.time() # in_relevant = start_time * 1000 + random.randint(0, 100) # log_.info({"type": "relevant", "in_relevant": in_relevant}) try: request_data = json.loads(request.get_data()) request_id = request_data.get('requestId') # log_.info({ # 'logTimestamp': int(time.time() * 1000), # 'request_id': request_id, # 'in_relevant': in_relevant, # 'type': "relevant_recommend", # 'text': 'in relevant_recommend', # 'executeTime': (time.time() - start_time) * 1000 # }) mid = request_data.get('mid') uid = request_data.get('uid') video_id = request_data.get('videoId') # up_uid = request_data.get('upUid') # share_mid = request_data.get('shareMid') # share_uid = request_data.get('shareUid') # page_num = request_data.get('pageNum', 1) page_size = request_data.get('pageSize', 4) app_type = request_data.get('appType') client_info = request_data.get('clientInfo') ab_exp_info = request_data.get('abExpInfo', None) page_type = request_data.get('pageType') # 1:详情页;2:分享页 ab_info_data = request_data.get('abInfoData', None) version_audit_status = request_data.get('versionAuditStatus', 2) # 小程序版本审核参数:1-审核中,2-审核通过,默认:2 params = Params(request_id=request_id) recommend_result = video_relevant_recommend( request_id=request_id, video_id=video_id, mid=mid, uid=uid, size=page_size, app_type=app_type, ab_exp_info=ab_exp_info, client_info=client_info, page_type=page_type, params=params, ab_info_data=ab_info_data, version_audit_status=version_audit_status ) result = {'code': 200, 'message': 'success', 'data': {'videos': recommend_result['videos']}} log_message = { 'requestUri': '/applet/video/relevant/recommend', 'logTimestamp': int(time.time() * 1000), 'request_id': request_id, 'app_type': app_type, 'client_info': client_info, 'ab_exp_info': ab_exp_info, 'ab_info_data': ab_info_data, 'version_audit_status': version_audit_status, 'mid': mid, 'uid': uid, 'getRecommendParamsTime': recommend_result.get('getRecommendParamsTime', ''), 'getRecommendResultTime': recommend_result.get('getRecommendResultTime', ''), 'updateRedisDataTime': recommend_result.get('updateRedisDataTime', ''), 'recommendOperation': recommend_result.get('recommendOperation', ''), 'result': result, 'executeTime': (time.time() - start_time) * 1000 } log_.info(log_message) # log_.info('app_type: {}, mid: {}, uid: {}, relevant-result: {}, execute time = {}ms'.format( # app_type, mid, uid, result, (time.time() - start_time) * 1000)) return json.dumps(result) except Exception as e: log_.error(traceback.format_exc()) result = {'code': -1, 'message': 'fail'} return json.dumps(result) # 管理后台实时修改rov @app.route('/applet/video/update/rov', methods=['GET', 'POST']) def update_rov(): try: # log_.info({'requestUri': '/applet/video/update/rov', 'request_initial': request.get_data()}) request_data = json.loads(request.get_data()) log_.info({'requestUri': '/applet/video/update/rov', 'logTimestamp': int(time.time() * 1000), 'requestData': request_data}) # log_.info('update_rov request data: {}'.format(request_data)) # app_type = request_data.get('appType') video_id = request_data.get('videoId') rov_score = request_data.get('rovScore') redis_helper = RedisHelper() # 将修改ROV值视频的 videoId 和 rovScore 存入对应的redis中 redis_helper.update_score_with_value(key_name=config_.UPDATE_ROV_KEY_NAME, value=video_id, score=rov_score) redis_helper.update_score_with_value(key_name=config_.UPDATE_ROV_KEY_NAME_APP, value=video_id, score=rov_score) # ###### 下线 横屏实验 # # 判断该视频是否为 横屏视频,如果是则 存入rov召回池横屏视频 redis 中 # update_video_w_h_rate(video_id=int(video_id), key_name=config_.W_H_RATE_UP_1_VIDEO_LIST_KEY_NAME['rov_recall']) result = {'code': 200, 'message': 'update rov success'} log_.info({'requestUri': '/applet/video/update/rov', 'logTimestamp': int(time.time() * 1000), 'result': result}) # log_.info('result: {}'.format(result)) return json.dumps(result) except Exception as e: log_.error(traceback.format_exc()) result = {'code': -1, 'message': 'update rov fail'} return json.dumps(result) # 管理后台指定用户恢复成新用户 @app.route('/applet/user/to_new', methods=['GET', 'POST']) def user_to_new(): try: request_data = json.loads(request.get_data()) log_.info({'requestUri': '/applet/user/to_new', 'requestData': request_data}) app_type = request_data.get('appType', None) mid = request_data.get('mid') uid = request_data.get('uid') user2new(app_type=app_type, mid=mid, uid=uid) result = {'code': 200, 'message': 'success'} log_.info({'requestUri': '/applet/user/to_new', 'result': result}) return json.dumps(result) except Exception as e: log_.error(traceback.format_exc()) result = {'code': -1, 'message': 'fail'} return json.dumps(result) # 管理后台算法视频列表可视化 - 视频数据表类型获取 @app.route('/applet/video/get_video_type_list', methods=['GET', 'POST']) def get_video_type_list(): try: data = [ {'dataListDesc': val.get('dataListDesc'), 'dataListCode': val.get('dataListCode')} for key, val in config_.VIDEO_DATA_LIST_MAPPING.items() ] data.sort(key=lambda x: x['dataListCode'], reverse=False) result = {'code': 200, 'message': 'success', 'data': data} return json.dumps(result) except Exception as e: log_.error(traceback.format_exc()) result = {'code': -1, 'message': 'fail'} return json.dumps(result) # 管理后台算法视频列表可视化 - 获取视频列表 @app.route('/applet/video/get_online_list', methods=['GET', 'POST']) def get_video_online_list(): try: request_data = json.loads(request.get_data()) ab_exp_code = request_data.get('abExpCode', None) search_time = request_data.get('searchTime', None) data_list_type = request_data.get('dataListType', None) region_code = request_data.get('regionCode', None) video_id = request_data.get('videoId', None) page_num = request_data.get('pageNum', 1) page_size = request_data.get('pageSize', 100) if video_id is None: result = get_video_list(ab_exp_code=ab_exp_code, search_time=search_time, data_list_type=data_list_type, region_code=region_code, page_num=page_num, page_size=page_size) else: result = search_video(ab_exp_code=ab_exp_code, search_time=search_time, data_list_type=data_list_type, region_code=region_code, video_id=video_id, page_num=page_num, page_size=page_size) return json.dumps(result) except Exception as e: log_.error(traceback.format_exc()) result = {'code': -1, 'message': 'fail'} return json.dumps(result) # 广告推荐 @app.route('/applet/ad/predict', methods=['GET', 'POST']) def ad_predict(): start_time = time.time() try: request_data = json.loads(request.get_data()) mid = request_data.get('mid') video_id = request_data.get('videoId') app_type = request_data.get('appType') ab_exp_info = request_data.get('abExpInfo') ab_test_code = request_data.get('abTestCode') predict_result = ad_recommend_predict(app_type=app_type, mid=mid, video_id=video_id, ab_exp_info=ab_exp_info, ab_test_code=ab_test_code) if predict_result is None: result = {'code': -1, 'message': 'fail'} else: result = {'code': 200, 'message': 'success', 'data': predict_result.get('ad_predict')} log_message = { 'requestUri': '/applet/ad/predict', 'request_data': request_data, 'logTimestamp': int(time.time() * 1000), 'app_type': app_type, 'mid': mid, 'video_id': video_id, 'predict_result': predict_result, 'result': result, 'executeTime': (time.time() - start_time) * 1000 } log_.info(log_message) return json.dumps(result) except Exception as e: log_.error(traceback.format_exc()) result = {'code': -1, 'message': 'fail'} return json.dumps(result) # app热榜 @app.route('/app/video/hot_list', methods=['GET', 'POST']) def app_video_hot_list(): try: page_size = 10 request_data = request.get_data() request_data = json.loads(request_data) page = request_data.get('page', 0) log_.info({'requestUri': '/app/video/hot_lis', 'requestData': request_data}) # log_.info('app_video_hot_list request data: {}'.format(request_data)) redis_helper = RedisHelper() datas = redis_helper.get_data_from_redis('app_video_hot_list') if datas is None or len(datas) == 0: result = {'code': -1, 'message': 'no data'} log_.info({'requestUri': '/app/video/hot_lis', 'result': result}) # log_.info('result: {}'.format(result)) return json.dumps(result) datas = ast.literal_eval(datas) total_page = int(len(datas)/page_size) if len(datas)%page_size > 0: total_page += 1 if page > total_page -1 : result = {'code': -1, 'message': 'page exceed max'} log_.info({'requestUri': '/app/video/hot_lis', 'result': result}) # log_.info('result: {}'.format(result)) return json.dumps(result) result = {'code': 200, 'message': '', 'data': {'total_page': total_page, 'hot_list': datas[page*page_size:page*page_size+page_size]}} log_.info({'requestUri': '/app/video/hot_lis', 'result': result}) # log_.info('result: {}'.format(result)) return json.dumps(result) except Exception as e: log_.error(e) print(traceback.format_exc()) result = {'code': -1, 'message': 'fail'} return json.dumps(result) def serve_forever(ip='0.0.0.0', port=5001): pywsgi.WSGIServer((ip, port), app).serve_forever() def apprun(MULTI_PROCESS=True, ip='0.0.0.0', port=5001): if MULTI_PROCESS == False: WSGIServer((ip, port), app).serve_forever() else: # mulserver = WSGIServer((ip, port), app, handler_class=WebSocketHandler) mulserver = WSGIServer((ip, port), app) mulserver.start() def server_forever(): mulserver.start_accepting() mulserver._stop_event.wait() #for i in range(cpu_count()): for i in range(20): p = Process(target=server_forever) p.start() if __name__ == '__main__': app.run() #server = pywsgi.WSGIServer(('0.0.0.0', 5000), app) #server.serve_forever() # apprun()