import random
import os
import logging
import datetime
import json
import time
import traceback
import ast
from gevent import monkey
monkey.patch_all()
from flask import Flask, request
from log import Log
from config import set_config
from recommend import video_homepage_recommend, video_relevant_recommend
from category import get_category_videos
from video_recall import PoolRecall
from db_helper import RedisHelper
from gevent.pywsgi import WSGIServer
from multiprocessing import cpu_count, Process
from utils import update_video_w_h_rate
from user2new import user2new
from params_helper import Params
from manager_op import get_video_list, search_video
from ad_recommend import ad_recommend_predict, ad_recommend_predict_with_roi
# from werkzeug.middleware.profiler import ProfilerMiddleware
# from geventwebsocket.handler import WebSocketHandler
from apscheduler.schedulers.background import BackgroundScheduler
log_ = Log()
config_ = set_config()
level_weight = {'1': 1, '2': 1, '3': 1, '4': 1, '5': 1, '6': 1}
flow_pool_abtest_config = {'control_group': [7, 8, 9, 10, 11, 12, 13, 14, 15, 16],
'experimental_flow_set_level': [],
'experimental_flow_set_level_score': []}
ad_arpu = 0
ad_roi_param = 0
def update_flow_pool_config():
"""
定时更新流量池相关预设配置到内存变量中
1. level_weight: 流量池层级权重
2. flow_pool_abtest_config: 流量池ab实验配置
:return: None
"""
redis_helper = RedisHelper()
global level_weight
level_weight_initial = redis_helper.get_data_from_redis(key_name=config_.FLOWPOOL_LEVEL_WEIGHT_KEY_NAME)
if level_weight_initial is not None:
level_weight = json.loads(level_weight_initial)
global flow_pool_abtest_config
flow_pool_abtest_config = redis_helper.get_data_from_redis(key_name=config_.FLOWPOOL_ABTEST_KEY_NAME)
if flow_pool_abtest_config is not None:
flow_pool_abtest_config = json.loads(flow_pool_abtest_config)
def update_ad_roi_predict_params():
"""
定时更新广告预测相关预设配置到内存变量中
1. arpu: 上一周期arpu值
2. roi_param: 计算roi使用参数
:return: None
"""
redis_helper = RedisHelper()
global ad_arpu
ad_arpu_initial = redis_helper.get_data_from_redis(key_name=config_.KEY_NAME_AD_ARPU)
if ad_arpu_initial is not None:
ad_arpu = float(ad_arpu_initial)
global ad_roi_param
ad_roi_param_initial = redis_helper.get_data_from_redis(key_name=config_.KEY_NAME_AD_ROI_PARAM)
if ad_roi_param_initial is not None:
ad_roi_param = float(ad_roi_param_initial)
sched = BackgroundScheduler(daemon=True, timezone='Asia/Shanghai') # 指定时区
sched.add_job(func=update_flow_pool_config, trigger="interval", seconds=10*60) # 间隔10min后启动
sched.add_job(func=update_ad_roi_predict_params, trigger="interval", seconds=60) # 间隔1min后启动(每1min执行一次)
update_flow_pool_config()
update_ad_roi_predict_params()
sched.start()
app = Flask(__name__)
@app.route('/healthcheck')
def health_check():
return 'ok!'
# 首页推荐及tab分类
@app.route('/applet/video/homepage/recommend', methods=['GET', 'POST'])
def homepage_recommend():
start_time = time.time()
# in_homepage = start_time * 1000 + random.randint(0, 100)
# log_.info({'type': 'homepage', 'in_homepage': in_homepage})
try:
global level_weight
# log_.info({'request_headers': request.headers})
request_data = json.loads(request.get_data())
request_id = request_data.get('requestId')
mid = request_data.get('mid')
uid = request_data.get('uid')
category_id = request_data.get('categoryId')
size = request_data.get('size', 4)
app_type = request_data.get('appType', 4)
algo_type = request_data.get('algoType')
client_info = request_data.get('clientInfo')
ab_exp_info = request_data.get('abExpInfo', None)
ab_info_data = request_data.get('abInfoData', None)
version_audit_status = request_data.get('versionAuditStatus', 2) # 小程序版本审核参数:1-审核中,2-审核通过,默认:2
machineinfoBrand = request_data.get('machineinfoBrand', '')
machineinfoModel = request_data.get('machineinfoModel', '')
machineinfoPlatform = request_data.get('machineinfoPlatform', '')
pagesource = request_data.get('pageSource', '')
versioncode = request_data.get('versionCode', 0)
recommendsource = request_data.get('recommendSource', '0')
sencetype = request_data.get('senceType', 0)
recomTraceId=request_data.get('recomTraceId', "")
env_dict = {}
try:
env_dict['app_type'] = int(app_type)
env_dict['pagesource'] = str(pagesource)
env_dict['versioncode'] = int(versioncode)
env_dict['machineinfo_brand'] = str(machineinfoBrand)
env_dict['machineinfo_model'] = str(machineinfoModel)
env_dict['machineinfo_platform'] = str(machineinfoPlatform)
env_dict['recommendsource'] = str(recommendsource)
env_dict['sencetype'] = int(sencetype)
env_dict['recomTraceId'] = recomTraceId
env_dict['recomInterface'] = "homepage"
except:
env_dict['app_type'] = 4
env_dict['pagesource'] = str(pagesource)
env_dict['versioncode'] = 0
env_dict['machineinfo_brand'] = str(machineinfoBrand)
env_dict['machineinfo_model'] = str(machineinfoModel)
env_dict['machineinfo_platform'] = str(machineinfoPlatform)
env_dict['recommendsource'] = str(recommendsource)
env_dict['sencetype'] = sencetype
env_dict['recomTraceId'] = recomTraceId
env_dict['recomInterface'] = "homepage"
log_.error("feature error",env_dict)
params = Params(request_id=request_id)
# size默认为10
if not size:
size = 10
if category_id in config_.CATEGORY['recommend']:
# 推荐
recommend_result = video_homepage_recommend(
request_id=request_id,
mid=mid,
uid=uid,
size=size,
app_type=app_type,
algo_type=algo_type,
client_info=client_info,
ab_exp_info=ab_exp_info,
params=params,
ab_info_data=ab_info_data,
version_audit_status=version_audit_status,
env_dict=env_dict,
level_weight=level_weight,
flow_pool_abtest_config=flow_pool_abtest_config
)
result = {'code': 200, 'message': 'success', 'data': {'videos': recommend_result['videos']}}
log_message = {
'requestUri': '/applet/video/homepage/recommend',
'logTimestamp': int(time.time() * 1000),
'request_id': request_id,
'app_type': app_type,
'client_info': client_info,
'ab_exp_info': ab_exp_info,
'ab_info_data': ab_info_data,
'version_audit_status': version_audit_status,
'category_id': category_id,
'mid': mid,
'uid': uid,
'getRecommendParamsTime': recommend_result.get('getRecommendParamsTime', ''),
'getRecommendResultTime': recommend_result.get('getRecommendResultTime', ''),
'updateRedisDataTime': recommend_result.get('updateRedisDataTime', ''),
'recommendOperation': recommend_result.get('recommendOperation', ''),
'result': result,
'executeTime': (time.time() - start_time) * 1000,
'fea_info': recommend_result.get('fea_info', {})
}
log_.info(log_message)
# log_.info('category_id: {}, mid: {}, uid: {}, result: {}, execute time = {}ms'.format(
# category_id, mid, uid, result, (time.time() - start_time)*1000))
return json.dumps(result)
elif category_id in config_.CATEGORY['other']:
# 其他类别
videos = get_category_videos()
result = {'code': 200, 'message': 'success', 'data': {'videos': videos}}
log_.info('category_id: {}, mid: {}, uid: {}, result: {}, execute time = {}ms'.format(
category_id, mid, uid, result, (time.time() - start_time) * 1000))
return json.dumps(result)
else:
log_.error('categoryId error, categoryId = {}'.format(category_id))
result = {'code': -1, 'message': 'categoryId error'}
return json.dumps(result)
except Exception as e:
log_.error(traceback.format_exc())
result = {'code': -1, 'message': 'fail'}
return json.dumps(result)
# 相关推荐
@app.route('/applet/video/relevant/recommend', methods=['GET', 'POST'])
def relevant_recommend():
start_time = time.time()
# in_relevant = start_time * 1000 + random.randint(0, 100)
# log_.info({"type": "relevant", "in_relevant": in_relevant})
try:
global level_weight
request_data = json.loads(request.get_data())
request_id = request_data.get('requestId')
# log_.info({
# 'logTimestamp': int(time.time() * 1000),
# 'request_id': request_id,
# 'in_relevant': in_relevant,
# 'type': "relevant_recommend",
# 'text': 'in relevant_recommend',
# 'executeTime': (time.time() - start_time) * 1000
# })
mid = request_data.get('mid')
uid = request_data.get('uid')
video_id = request_data.get('videoId')
# up_uid = request_data.get('upUid')
# share_mid = request_data.get('shareMid')
# share_uid = request_data.get('shareUid')
# page_num = request_data.get('pageNum', 1)
page_size = request_data.get('pageSize', 4)
app_type = request_data.get('appType')
client_info = request_data.get('clientInfo')
ab_exp_info = request_data.get('abExpInfo', None)
page_type = request_data.get('pageType') # 1:详情页;2:分享页
ab_info_data = request_data.get('abInfoData', None)
version_audit_status = request_data.get('versionAuditStatus', 2) # 小程序版本审核参数:1-审核中,2-审核通过,默认:2
machineinfoBrand = request_data.get('machineinfoBrand', '')
machineinfoModel = request_data.get('machineinfoModel', '')
machineinfoPlatform = request_data.get('machineinfoPlatform', '')
pagesource = request_data.get('pageSource', '')
versioncode = request_data.get('versionCode', 0)
recommendsource = request_data.get('recommendSource', '0')
sencetype = request_data.get('senceType', 0)
recomTraceId = request_data.get('recomTraceId', "")
env_dict = {}
try:
env_dict['app_type'] = int(app_type)
env_dict['pagesource'] = str(pagesource)
env_dict['versioncode'] = int(versioncode)
env_dict['machineinfo_brand'] = str(machineinfoBrand)
env_dict['machineinfo_model'] = str(machineinfoModel)
env_dict['machineinfo_platform'] = str(machineinfoPlatform)
env_dict['recommendsource'] = str(recommendsource)
env_dict['sencetype'] = int(sencetype)
env_dict['recomInterface'] = "relevant_recommend"
env_dict['recomTraceId'] = recomTraceId
env_dict['relevant_video_id'] = video_id
except:
env_dict['app_type'] = 4
env_dict['pagesource'] = str(pagesource)
env_dict['versioncode'] = 0
env_dict['machineinfo_brand'] = str(machineinfoBrand)
env_dict['machineinfo_model'] = str(machineinfoModel)
env_dict['machineinfo_platform'] = str(machineinfoPlatform)
env_dict['recommendsource'] = str(recommendsource)
env_dict['sencetype'] = sencetype
env_dict['recomInterface'] = "relevant_recommend"
env_dict['relevant_video_id'] = video_id
env_dict['recomTraceId'] = recomTraceId
log_.error("feature error", env_dict)
params = Params(request_id=request_id)
recommend_result = video_relevant_recommend(
request_id=request_id,
video_id=video_id,
mid=mid,
uid=uid,
size=page_size,
app_type=app_type,
ab_exp_info=ab_exp_info,
client_info=client_info,
page_type=page_type,
params=params,
ab_info_data=ab_info_data,
version_audit_status=version_audit_status,
env_dict=env_dict,
level_weight=level_weight,
flow_pool_abtest_config=flow_pool_abtest_config
)
result = {'code': 200, 'message': 'success', 'data': {'videos': recommend_result['videos']}}
log_message = {
'requestUri': '/applet/video/relevant/recommend',
'logTimestamp': int(time.time() * 1000),
'request_id': request_id,
'app_type': app_type,
'client_info': client_info,
'ab_exp_info': ab_exp_info,
'ab_info_data': ab_info_data,
'version_audit_status': version_audit_status,
'mid': mid,
'uid': uid,
'getRecommendParamsTime': recommend_result.get('getRecommendParamsTime', ''),
'getRecommendResultTime': recommend_result.get('getRecommendResultTime', ''),
'updateRedisDataTime': recommend_result.get('updateRedisDataTime', ''),
'recommendOperation': recommend_result.get('recommendOperation', ''),
'result': result,
'executeTime': (time.time() - start_time) * 1000,
'fea_info': recommend_result.get('fea_info', {})
}
log_.info(log_message)
# log_.info('app_type: {}, mid: {}, uid: {}, relevant-result: {}, execute time = {}ms'.format(
# app_type, mid, uid, result, (time.time() - start_time) * 1000))
return json.dumps(result)
except Exception as e:
log_.error(traceback.format_exc())
result = {'code': -1, 'message': 'fail'}
return json.dumps(result)
# 管理后台实时修改rov
@app.route('/applet/video/update/rov', methods=['GET', 'POST'])
def update_rov():
try:
# log_.info({'requestUri': '/applet/video/update/rov', 'request_initial': request.get_data()})
request_data = json.loads(request.get_data())
log_.info({'requestUri': '/applet/video/update/rov',
'logTimestamp': int(time.time() * 1000),
'requestData': request_data})
# log_.info('update_rov request data: {}'.format(request_data))
# app_type = request_data.get('appType')
video_id = request_data.get('videoId')
rov_score = request_data.get('rovScore')
redis_helper = RedisHelper()
# 将修改ROV值视频的 videoId 和 rovScore 存入对应的redis中
redis_helper.update_score_with_value(key_name=config_.UPDATE_ROV_KEY_NAME, value=video_id, score=rov_score)
redis_helper.update_score_with_value(key_name=config_.UPDATE_ROV_KEY_NAME_APP, value=video_id, score=rov_score)
# ###### 下线 横屏实验
# # 判断该视频是否为 横屏视频,如果是则 存入rov召回池横屏视频 redis 中
# update_video_w_h_rate(video_id=int(video_id), key_name=config_.W_H_RATE_UP_1_VIDEO_LIST_KEY_NAME['rov_recall'])
result = {'code': 200, 'message': 'update rov success'}
log_.info({'requestUri': '/applet/video/update/rov', 'logTimestamp': int(time.time() * 1000), 'result': result})
# log_.info('result: {}'.format(result))
return json.dumps(result)
except Exception as e:
log_.error(traceback.format_exc())
result = {'code': -1, 'message': 'update rov fail'}
return json.dumps(result)
# 管理后台指定用户恢复成新用户
@app.route('/applet/user/to_new', methods=['GET', 'POST'])
def user_to_new():
try:
request_data = json.loads(request.get_data())
log_.info({'requestUri': '/applet/user/to_new', 'requestData': request_data})
app_type = request_data.get('appType', None)
mid = request_data.get('mid')
uid = request_data.get('uid')
user2new(app_type=app_type, mid=mid, uid=uid)
result = {'code': 200, 'message': 'success'}
log_.info({'requestUri': '/applet/user/to_new', 'result': result})
return json.dumps(result)
except Exception as e:
log_.error(traceback.format_exc())
result = {'code': -1, 'message': 'fail'}
return json.dumps(result)
# 管理后台算法视频列表可视化 - 视频数据表类型获取
@app.route('/applet/video/get_video_type_list', methods=['GET', 'POST'])
def get_video_type_list():
try:
data = [
{'dataListDesc': val.get('dataListDesc'), 'dataListCode': val.get('dataListCode')}
for key, val in config_.VIDEO_DATA_LIST_MAPPING.items()
]
data.sort(key=lambda x: x['dataListCode'], reverse=False)
result = {'code': 200, 'message': 'success', 'data': data}
return json.dumps(result)
except Exception as e:
log_.error(traceback.format_exc())
result = {'code': -1, 'message': 'fail'}
return json.dumps(result)
# 管理后台算法视频列表可视化 - 获取视频列表
@app.route('/applet/video/get_online_list', methods=['GET', 'POST'])
def get_video_online_list():
try:
request_data = json.loads(request.get_data())
ab_exp_code = request_data.get('abExpCode', None)
search_time = request_data.get('searchTime', None)
data_list_type = request_data.get('dataListType', None)
region_code = request_data.get('regionCode', None)
video_id = request_data.get('videoId', None)
page_num = request_data.get('pageNum', 1)
page_size = request_data.get('pageSize', 100)
if video_id is None:
result = get_video_list(ab_exp_code=ab_exp_code, search_time=search_time, data_list_type=data_list_type,
region_code=region_code, page_num=page_num, page_size=page_size)
else:
result = search_video(ab_exp_code=ab_exp_code, search_time=search_time, data_list_type=data_list_type,
region_code=region_code, video_id=video_id, page_num=page_num, page_size=page_size)
return json.dumps(result)
except Exception as e:
log_.error(traceback.format_exc())
result = {'code': -1, 'message': 'fail'}
return json.dumps(result)
# 广告推荐
@app.route('/applet/ad/predict', methods=['GET', 'POST'])
def ad_predict():
start_time = time.time()
try:
request_data = json.loads(request.get_data())
mid = request_data.get('mid')
video_id = request_data.get('videoId')
app_type = request_data.get('appType')
ab_exp_info = request_data.get('abExpInfo')
ab_test_code = request_data.get('abTestCode')
care_model_status = request_data.get('careModelStatus', 1) # 用户关怀模式状态 1: 未开启,2: 开启, 默认: 1
predict_result = ad_recommend_predict(app_type=app_type,
mid=mid,
video_id=video_id,
ab_exp_info=ab_exp_info,
ab_test_code=ab_test_code,
care_model_status=care_model_status)
if predict_result is None:
result = {'code': -1, 'message': 'fail'}
else:
result = {'code': 200, 'message': 'success', 'data': predict_result.get('ad_predict')}
log_message = {
'requestUri': '/applet/ad/predict',
'request_data': request_data,
'logTimestamp': int(time.time() * 1000),
'app_type': app_type,
'mid': mid,
'video_id': video_id,
'predict_result': predict_result,
'result': result,
'executeTime': (time.time() - start_time) * 1000
}
log_.info(log_message)
return json.dumps(result)
except Exception as e:
log_.error(traceback.format_exc())
result = {'code': -1, 'message': 'fail'}
return json.dumps(result)
# 广告推荐新策略
@app.route('/applet/ad/roi/predict', methods=['GET', 'POST'])
def ad_roi_predict():
start_time = time.time()
try:
request_data = json.loads(request.get_data())
mid = request_data.get('mid')
video_id = request_data.get('videoId')
app_type = request_data.get('appType')
ads = request_data.get('ads')
predict_result = ad_recommend_predict_with_roi(app_type=app_type,
mid=mid,
video_id=video_id,
ads=ads,
arpu=ad_arpu,
roi_param=ad_roi_param)
if predict_result is None:
result = {'code': -1, 'message': 'fail'}
else:
result = {
'code': 200,
'message': 'success',
'data': {
'adId': predict_result.get('ad_id'),
'adRes': predict_result.get('ad_predict')
}
}
log_message = {
'requestUri': '/applet/ad/roi/predict',
'request_data': request_data,
'logTimestamp': int(time.time() * 1000),
'app_type': app_type,
'mid': mid,
'video_id': video_id,
'predict_result': predict_result,
'result': result,
'executeTime': (time.time() - start_time) * 1000
}
log_.info(log_message)
return json.dumps(result)
except Exception as e:
log_.error(traceback.format_exc())
result = {'code': -1, 'message': 'fail'}
return json.dumps(result)
# app热榜
@app.route('/app/video/hot_list', methods=['GET', 'POST'])
def app_video_hot_list():
try:
page_size = 10
request_data = request.get_data()
request_data = json.loads(request_data)
page = request_data.get('page', 0)
log_.info({'requestUri': '/app/video/hot_lis', 'requestData': request_data})
# log_.info('app_video_hot_list request data: {}'.format(request_data))
redis_helper = RedisHelper()
datas = redis_helper.get_data_from_redis('app_video_hot_list')
if datas is None or len(datas) == 0:
result = {'code': -1, 'message': 'no data'}
log_.info({'requestUri': '/app/video/hot_lis', 'result': result})
# log_.info('result: {}'.format(result))
return json.dumps(result)
datas = ast.literal_eval(datas)
total_page = int(len(datas)/page_size)
if len(datas)%page_size > 0:
total_page += 1
if page > total_page -1 :
result = {'code': -1, 'message': 'page exceed max'}
log_.info({'requestUri': '/app/video/hot_lis', 'result': result})
# log_.info('result: {}'.format(result))
return json.dumps(result)
result = {'code': 200, 'message': '', 'data': {'total_page': total_page,
'hot_list': datas[page*page_size:page*page_size+page_size]}}
log_.info({'requestUri': '/app/video/hot_lis', 'result': result})
# log_.info('result: {}'.format(result))
return json.dumps(result)
except Exception as e:
log_.error(e)
# print(traceback.format_exc())
result = {'code': -1, 'message': 'fail'}
return json.dumps(result)
def serve_forever(ip='0.0.0.0', port=5001):
pywsgi.WSGIServer((ip, port), app).serve_forever()
def apprun(MULTI_PROCESS=True, ip='0.0.0.0', port=5001):
if MULTI_PROCESS == False:
WSGIServer((ip, port), app).serve_forever()
else:
# mulserver = WSGIServer((ip, port), app, handler_class=WebSocketHandler)
mulserver = WSGIServer((ip, port), app)
mulserver.start()
def server_forever():
mulserver.start_accepting()
mulserver._stop_event.wait()
#for i in range(cpu_count()):
for i in range(20):
p = Process(target=server_forever)
p.start()
if __name__ == '__main__':
app.run()
#server = pywsgi.WSGIServer(('0.0.0.0', 5000), app)
#server.serve_forever()
# apprun()