app.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513
  1. import random
  2. import os
  3. import logging
  4. import datetime
  5. import json
  6. import time
  7. import traceback
  8. import ast
  9. from gevent import monkey
  10. monkey.patch_all()
  11. from flask import Flask, request
  12. from log import Log
  13. from config import set_config
  14. from recommend import video_homepage_recommend, video_relevant_recommend
  15. from category import get_category_videos
  16. from video_recall import PoolRecall
  17. from db_helper import RedisHelper
  18. from gevent.pywsgi import WSGIServer
  19. from multiprocessing import cpu_count, Process
  20. from utils import update_video_w_h_rate
  21. from user2new import user2new
  22. from params_helper import Params
  23. from manager_op import get_video_list, search_video
  24. from ad_recommend import ad_recommend_predict
  25. # from werkzeug.middleware.profiler import ProfilerMiddleware
  26. # from geventwebsocket.handler import WebSocketHandler
  27. from apscheduler.schedulers.background import BackgroundScheduler
  28. log_ = Log()
  29. config_ = set_config()
  30. level_weight = {'1': 1, '2': 1, '3': 1, '4': 1, '5': 1, '6': 1}
  31. flow_pool_abtest_config = {'control_group': [7, 8, 9, 10, 11, 12, 13, 14, 15, 16],
  32. 'experimental_flow_set_level': []}
  33. def update_flow_pool_config():
  34. """
  35. 定时更新流量池相关预设配置到内存变量中
  36. 1. level_weight: 流量池层级权重
  37. 2. flow_pool_abtest_config: 流量池ab实验配置
  38. :return: None
  39. """
  40. redis_helper = RedisHelper()
  41. global level_weight
  42. level_weight_initial = redis_helper.get_data_from_redis(key_name=config_.FLOWPOOL_LEVEL_WEIGHT_KEY_NAME)
  43. if level_weight_initial is not None:
  44. level_weight = json.loads(level_weight_initial)
  45. # log_.info({
  46. # "now_date": datetime.datetime.strftime(datetime.datetime.today(), '%Y%m%d %H:%M:%S'),
  47. # "level_weight": level_weight
  48. # })
  49. # print(level_weight)
  50. global flow_pool_abtest_config
  51. flow_pool_abtest_config = redis_helper.get_data_from_redis(key_name=config_.FLOWPOOL_ABTEST_KEY_NAME)
  52. if flow_pool_abtest_config is not None:
  53. flow_pool_abtest_config = json.loads(flow_pool_abtest_config)
  54. sched = BackgroundScheduler(daemon=True, timezone='Asia/Shanghai') # 指定时区
  55. sched.add_job(func=update_flow_pool_config, trigger="interval", seconds=10*60) # 间隔10min后启动
  56. update_flow_pool_config()
  57. sched.start()
  58. app = Flask(__name__)
  59. @app.route('/healthcheck')
  60. def health_check():
  61. return 'ok!'
  62. # 首页推荐及tab分类
  63. @app.route('/applet/video/homepage/recommend', methods=['GET', 'POST'])
  64. def homepage_recommend():
  65. start_time = time.time()
  66. # in_homepage = start_time * 1000 + random.randint(0, 100)
  67. # log_.info({'type': 'homepage', 'in_homepage': in_homepage})
  68. try:
  69. global level_weight
  70. # log_.info({'request_headers': request.headers})
  71. request_data = json.loads(request.get_data())
  72. request_id = request_data.get('requestId')
  73. mid = request_data.get('mid')
  74. uid = request_data.get('uid')
  75. category_id = request_data.get('categoryId')
  76. size = request_data.get('size', 4)
  77. app_type = request_data.get('appType', 4)
  78. algo_type = request_data.get('algoType')
  79. client_info = request_data.get('clientInfo')
  80. ab_exp_info = request_data.get('abExpInfo', None)
  81. ab_info_data = request_data.get('abInfoData', None)
  82. version_audit_status = request_data.get('versionAuditStatus', 2) # 小程序版本审核参数:1-审核中,2-审核通过,默认:2
  83. machineinfoBrand = request_data.get('machineinfoBrand', '')
  84. machineinfoModel = request_data.get('machineinfoModel', '')
  85. machineinfoPlatform = request_data.get('machineinfoPlatform', '')
  86. pagesource = request_data.get('pageSource', '')
  87. versioncode = request_data.get('versionCode', 0)
  88. recommendsource = request_data.get('recommendSource', '0')
  89. sencetype = request_data.get('senceType', 0)
  90. recomTraceId=request_data.get('recomTraceId', "")
  91. env_dict = {}
  92. try:
  93. env_dict['app_type'] = int(app_type)
  94. env_dict['pagesource'] = str(pagesource)
  95. env_dict['versioncode'] = int(versioncode)
  96. env_dict['machineinfo_brand'] = str(machineinfoBrand)
  97. env_dict['machineinfo_model'] = str(machineinfoModel)
  98. env_dict['machineinfo_platform'] = str(machineinfoPlatform)
  99. env_dict['recommendsource'] = str(recommendsource)
  100. env_dict['sencetype'] = int(sencetype)
  101. env_dict['recomTraceId'] = recomTraceId
  102. env_dict['recomInterface'] = "homepage"
  103. except:
  104. env_dict['app_type'] = 4
  105. env_dict['pagesource'] = str(pagesource)
  106. env_dict['versioncode'] = 0
  107. env_dict['machineinfo_brand'] = str(machineinfoBrand)
  108. env_dict['machineinfo_model'] = str(machineinfoModel)
  109. env_dict['machineinfo_platform'] = str(machineinfoPlatform)
  110. env_dict['recommendsource'] = str(recommendsource)
  111. env_dict['sencetype'] = sencetype
  112. env_dict['recomTraceId'] = recomTraceId
  113. env_dict['recomInterface'] = "homepage"
  114. log_.error("feature error",env_dict)
  115. params = Params(request_id=request_id)
  116. # size默认为10
  117. if not size:
  118. size = 10
  119. if category_id in config_.CATEGORY['recommend']:
  120. # 推荐
  121. recommend_result = video_homepage_recommend(
  122. request_id=request_id,
  123. mid=mid,
  124. uid=uid,
  125. size=size,
  126. app_type=app_type,
  127. algo_type=algo_type,
  128. client_info=client_info,
  129. ab_exp_info=ab_exp_info,
  130. params=params,
  131. ab_info_data=ab_info_data,
  132. version_audit_status=version_audit_status,
  133. env_dict=env_dict,
  134. level_weight=level_weight,
  135. flow_pool_abtest_config=flow_pool_abtest_config
  136. )
  137. result = {'code': 200, 'message': 'success', 'data': {'videos': recommend_result['videos']}}
  138. log_message = {
  139. 'requestUri': '/applet/video/homepage/recommend',
  140. 'logTimestamp': int(time.time() * 1000),
  141. 'request_id': request_id,
  142. 'app_type': app_type,
  143. 'client_info': client_info,
  144. 'ab_exp_info': ab_exp_info,
  145. 'ab_info_data': ab_info_data,
  146. 'version_audit_status': version_audit_status,
  147. 'category_id': category_id,
  148. 'mid': mid,
  149. 'uid': uid,
  150. 'getRecommendParamsTime': recommend_result.get('getRecommendParamsTime', ''),
  151. 'getRecommendResultTime': recommend_result.get('getRecommendResultTime', ''),
  152. 'updateRedisDataTime': recommend_result.get('updateRedisDataTime', ''),
  153. 'recommendOperation': recommend_result.get('recommendOperation', ''),
  154. 'result': result,
  155. 'executeTime': (time.time() - start_time) * 1000,
  156. 'fea_info': recommend_result.get('fea_info', {})
  157. }
  158. log_.info(log_message)
  159. # log_.info('category_id: {}, mid: {}, uid: {}, result: {}, execute time = {}ms'.format(
  160. # category_id, mid, uid, result, (time.time() - start_time)*1000))
  161. return json.dumps(result)
  162. elif category_id in config_.CATEGORY['other']:
  163. # 其他类别
  164. videos = get_category_videos()
  165. result = {'code': 200, 'message': 'success', 'data': {'videos': videos}}
  166. log_.info('category_id: {}, mid: {}, uid: {}, result: {}, execute time = {}ms'.format(
  167. category_id, mid, uid, result, (time.time() - start_time) * 1000))
  168. return json.dumps(result)
  169. else:
  170. log_.error('categoryId error, categoryId = {}'.format(category_id))
  171. result = {'code': -1, 'message': 'categoryId error'}
  172. return json.dumps(result)
  173. except Exception as e:
  174. log_.error(traceback.format_exc())
  175. result = {'code': -1, 'message': 'fail'}
  176. return json.dumps(result)
  177. # 相关推荐
  178. @app.route('/applet/video/relevant/recommend', methods=['GET', 'POST'])
  179. def relevant_recommend():
  180. start_time = time.time()
  181. # in_relevant = start_time * 1000 + random.randint(0, 100)
  182. # log_.info({"type": "relevant", "in_relevant": in_relevant})
  183. try:
  184. global level_weight
  185. request_data = json.loads(request.get_data())
  186. request_id = request_data.get('requestId')
  187. # log_.info({
  188. # 'logTimestamp': int(time.time() * 1000),
  189. # 'request_id': request_id,
  190. # 'in_relevant': in_relevant,
  191. # 'type': "relevant_recommend",
  192. # 'text': 'in relevant_recommend',
  193. # 'executeTime': (time.time() - start_time) * 1000
  194. # })
  195. mid = request_data.get('mid')
  196. uid = request_data.get('uid')
  197. video_id = request_data.get('videoId')
  198. # up_uid = request_data.get('upUid')
  199. # share_mid = request_data.get('shareMid')
  200. # share_uid = request_data.get('shareUid')
  201. # page_num = request_data.get('pageNum', 1)
  202. page_size = request_data.get('pageSize', 4)
  203. app_type = request_data.get('appType')
  204. client_info = request_data.get('clientInfo')
  205. ab_exp_info = request_data.get('abExpInfo', None)
  206. page_type = request_data.get('pageType') # 1:详情页;2:分享页
  207. ab_info_data = request_data.get('abInfoData', None)
  208. version_audit_status = request_data.get('versionAuditStatus', 2) # 小程序版本审核参数:1-审核中,2-审核通过,默认:2
  209. machineinfoBrand = request_data.get('machineinfoBrand', '')
  210. machineinfoModel = request_data.get('machineinfoModel', '')
  211. machineinfoPlatform = request_data.get('machineinfoPlatform', '')
  212. pagesource = request_data.get('pageSource', '')
  213. versioncode = request_data.get('versionCode', 0)
  214. recommendsource = request_data.get('recommendSource', '0')
  215. sencetype = request_data.get('senceType', 0)
  216. recomTraceId = request_data.get('recomTraceId', "")
  217. env_dict = {}
  218. try:
  219. env_dict['app_type'] = int(app_type)
  220. env_dict['pagesource'] = str(pagesource)
  221. env_dict['versioncode'] = int(versioncode)
  222. env_dict['machineinfo_brand'] = str(machineinfoBrand)
  223. env_dict['machineinfo_model'] = str(machineinfoModel)
  224. env_dict['machineinfo_platform'] = str(machineinfoPlatform)
  225. env_dict['recommendsource'] = str(recommendsource)
  226. env_dict['sencetype'] = int(sencetype)
  227. env_dict['recomInterface'] = "relevant_recommend"
  228. env_dict['recomTraceId'] = recomTraceId
  229. env_dict['relevant_video_id'] = video_id
  230. except:
  231. env_dict['app_type'] = 4
  232. env_dict['pagesource'] = str(pagesource)
  233. env_dict['versioncode'] = 0
  234. env_dict['machineinfo_brand'] = str(machineinfoBrand)
  235. env_dict['machineinfo_model'] = str(machineinfoModel)
  236. env_dict['machineinfo_platform'] = str(machineinfoPlatform)
  237. env_dict['recommendsource'] = str(recommendsource)
  238. env_dict['sencetype'] = sencetype
  239. env_dict['recomInterface'] = "relevant_recommend"
  240. env_dict['relevant_video_id'] = video_id
  241. env_dict['recomTraceId'] = recomTraceId
  242. log_.error("feature error", env_dict)
  243. params = Params(request_id=request_id)
  244. recommend_result = video_relevant_recommend(
  245. request_id=request_id,
  246. video_id=video_id,
  247. mid=mid,
  248. uid=uid,
  249. size=page_size,
  250. app_type=app_type,
  251. ab_exp_info=ab_exp_info,
  252. client_info=client_info,
  253. page_type=page_type,
  254. params=params,
  255. ab_info_data=ab_info_data,
  256. version_audit_status=version_audit_status,
  257. env_dict=env_dict,
  258. level_weight=level_weight,
  259. flow_pool_abtest_config=flow_pool_abtest_config
  260. )
  261. result = {'code': 200, 'message': 'success', 'data': {'videos': recommend_result['videos']}}
  262. log_message = {
  263. 'requestUri': '/applet/video/relevant/recommend',
  264. 'logTimestamp': int(time.time() * 1000),
  265. 'request_id': request_id,
  266. 'app_type': app_type,
  267. 'client_info': client_info,
  268. 'ab_exp_info': ab_exp_info,
  269. 'ab_info_data': ab_info_data,
  270. 'version_audit_status': version_audit_status,
  271. 'mid': mid,
  272. 'uid': uid,
  273. 'getRecommendParamsTime': recommend_result.get('getRecommendParamsTime', ''),
  274. 'getRecommendResultTime': recommend_result.get('getRecommendResultTime', ''),
  275. 'updateRedisDataTime': recommend_result.get('updateRedisDataTime', ''),
  276. 'recommendOperation': recommend_result.get('recommendOperation', ''),
  277. 'result': result,
  278. 'executeTime': (time.time() - start_time) * 1000,
  279. 'fea_info': recommend_result.get('fea_info', {})
  280. }
  281. log_.info(log_message)
  282. # log_.info('app_type: {}, mid: {}, uid: {}, relevant-result: {}, execute time = {}ms'.format(
  283. # app_type, mid, uid, result, (time.time() - start_time) * 1000))
  284. return json.dumps(result)
  285. except Exception as e:
  286. log_.error(traceback.format_exc())
  287. result = {'code': -1, 'message': 'fail'}
  288. return json.dumps(result)
  289. # 管理后台实时修改rov
  290. @app.route('/applet/video/update/rov', methods=['GET', 'POST'])
  291. def update_rov():
  292. try:
  293. # log_.info({'requestUri': '/applet/video/update/rov', 'request_initial': request.get_data()})
  294. request_data = json.loads(request.get_data())
  295. log_.info({'requestUri': '/applet/video/update/rov',
  296. 'logTimestamp': int(time.time() * 1000),
  297. 'requestData': request_data})
  298. # log_.info('update_rov request data: {}'.format(request_data))
  299. # app_type = request_data.get('appType')
  300. video_id = request_data.get('videoId')
  301. rov_score = request_data.get('rovScore')
  302. redis_helper = RedisHelper()
  303. # 将修改ROV值视频的 videoId 和 rovScore 存入对应的redis中
  304. redis_helper.update_score_with_value(key_name=config_.UPDATE_ROV_KEY_NAME, value=video_id, score=rov_score)
  305. redis_helper.update_score_with_value(key_name=config_.UPDATE_ROV_KEY_NAME_APP, value=video_id, score=rov_score)
  306. # ###### 下线 横屏实验
  307. # # 判断该视频是否为 横屏视频,如果是则 存入rov召回池横屏视频 redis 中
  308. # update_video_w_h_rate(video_id=int(video_id), key_name=config_.W_H_RATE_UP_1_VIDEO_LIST_KEY_NAME['rov_recall'])
  309. result = {'code': 200, 'message': 'update rov success'}
  310. log_.info({'requestUri': '/applet/video/update/rov', 'logTimestamp': int(time.time() * 1000), 'result': result})
  311. # log_.info('result: {}'.format(result))
  312. return json.dumps(result)
  313. except Exception as e:
  314. log_.error(traceback.format_exc())
  315. result = {'code': -1, 'message': 'update rov fail'}
  316. return json.dumps(result)
  317. # 管理后台指定用户恢复成新用户
  318. @app.route('/applet/user/to_new', methods=['GET', 'POST'])
  319. def user_to_new():
  320. try:
  321. request_data = json.loads(request.get_data())
  322. log_.info({'requestUri': '/applet/user/to_new', 'requestData': request_data})
  323. app_type = request_data.get('appType', None)
  324. mid = request_data.get('mid')
  325. uid = request_data.get('uid')
  326. user2new(app_type=app_type, mid=mid, uid=uid)
  327. result = {'code': 200, 'message': 'success'}
  328. log_.info({'requestUri': '/applet/user/to_new', 'result': result})
  329. return json.dumps(result)
  330. except Exception as e:
  331. log_.error(traceback.format_exc())
  332. result = {'code': -1, 'message': 'fail'}
  333. return json.dumps(result)
  334. # 管理后台算法视频列表可视化 - 视频数据表类型获取
  335. @app.route('/applet/video/get_video_type_list', methods=['GET', 'POST'])
  336. def get_video_type_list():
  337. try:
  338. data = [
  339. {'dataListDesc': val.get('dataListDesc'), 'dataListCode': val.get('dataListCode')}
  340. for key, val in config_.VIDEO_DATA_LIST_MAPPING.items()
  341. ]
  342. data.sort(key=lambda x: x['dataListCode'], reverse=False)
  343. result = {'code': 200, 'message': 'success', 'data': data}
  344. return json.dumps(result)
  345. except Exception as e:
  346. log_.error(traceback.format_exc())
  347. result = {'code': -1, 'message': 'fail'}
  348. return json.dumps(result)
  349. # 管理后台算法视频列表可视化 - 获取视频列表
  350. @app.route('/applet/video/get_online_list', methods=['GET', 'POST'])
  351. def get_video_online_list():
  352. try:
  353. request_data = json.loads(request.get_data())
  354. ab_exp_code = request_data.get('abExpCode', None)
  355. search_time = request_data.get('searchTime', None)
  356. data_list_type = request_data.get('dataListType', None)
  357. region_code = request_data.get('regionCode', None)
  358. video_id = request_data.get('videoId', None)
  359. page_num = request_data.get('pageNum', 1)
  360. page_size = request_data.get('pageSize', 100)
  361. if video_id is None:
  362. result = get_video_list(ab_exp_code=ab_exp_code, search_time=search_time, data_list_type=data_list_type,
  363. region_code=region_code, page_num=page_num, page_size=page_size)
  364. else:
  365. result = search_video(ab_exp_code=ab_exp_code, search_time=search_time, data_list_type=data_list_type,
  366. region_code=region_code, video_id=video_id, page_num=page_num, page_size=page_size)
  367. return json.dumps(result)
  368. except Exception as e:
  369. log_.error(traceback.format_exc())
  370. result = {'code': -1, 'message': 'fail'}
  371. return json.dumps(result)
  372. # 广告推荐
  373. @app.route('/applet/ad/predict', methods=['GET', 'POST'])
  374. def ad_predict():
  375. start_time = time.time()
  376. try:
  377. request_data = json.loads(request.get_data())
  378. mid = request_data.get('mid')
  379. video_id = request_data.get('videoId')
  380. app_type = request_data.get('appType')
  381. ab_exp_info = request_data.get('abExpInfo')
  382. ab_test_code = request_data.get('abTestCode')
  383. care_model_status = request_data.get('careModelStatus', 1) # 用户关怀模式状态 1: 未开启,2: 开启, 默认: 1
  384. predict_result = ad_recommend_predict(app_type=app_type,
  385. mid=mid,
  386. video_id=video_id,
  387. ab_exp_info=ab_exp_info,
  388. ab_test_code=ab_test_code,
  389. care_model_status=care_model_status)
  390. if predict_result is None:
  391. result = {'code': -1, 'message': 'fail'}
  392. else:
  393. result = {'code': 200, 'message': 'success', 'data': predict_result.get('ad_predict')}
  394. log_message = {
  395. 'requestUri': '/applet/ad/predict',
  396. 'request_data': request_data,
  397. 'logTimestamp': int(time.time() * 1000),
  398. 'app_type': app_type,
  399. 'mid': mid,
  400. 'video_id': video_id,
  401. 'predict_result': predict_result,
  402. 'result': result,
  403. 'executeTime': (time.time() - start_time) * 1000
  404. }
  405. log_.info(log_message)
  406. return json.dumps(result)
  407. except Exception as e:
  408. log_.error(traceback.format_exc())
  409. result = {'code': -1, 'message': 'fail'}
  410. return json.dumps(result)
  411. # app热榜
  412. @app.route('/app/video/hot_list', methods=['GET', 'POST'])
  413. def app_video_hot_list():
  414. try:
  415. page_size = 10
  416. request_data = request.get_data()
  417. request_data = json.loads(request_data)
  418. page = request_data.get('page', 0)
  419. log_.info({'requestUri': '/app/video/hot_lis', 'requestData': request_data})
  420. # log_.info('app_video_hot_list request data: {}'.format(request_data))
  421. redis_helper = RedisHelper()
  422. datas = redis_helper.get_data_from_redis('app_video_hot_list')
  423. if datas is None or len(datas) == 0:
  424. result = {'code': -1, 'message': 'no data'}
  425. log_.info({'requestUri': '/app/video/hot_lis', 'result': result})
  426. # log_.info('result: {}'.format(result))
  427. return json.dumps(result)
  428. datas = ast.literal_eval(datas)
  429. total_page = int(len(datas)/page_size)
  430. if len(datas)%page_size > 0:
  431. total_page += 1
  432. if page > total_page -1 :
  433. result = {'code': -1, 'message': 'page exceed max'}
  434. log_.info({'requestUri': '/app/video/hot_lis', 'result': result})
  435. # log_.info('result: {}'.format(result))
  436. return json.dumps(result)
  437. result = {'code': 200, 'message': '', 'data': {'total_page': total_page,
  438. 'hot_list': datas[page*page_size:page*page_size+page_size]}}
  439. log_.info({'requestUri': '/app/video/hot_lis', 'result': result})
  440. # log_.info('result: {}'.format(result))
  441. return json.dumps(result)
  442. except Exception as e:
  443. log_.error(e)
  444. # print(traceback.format_exc())
  445. result = {'code': -1, 'message': 'fail'}
  446. return json.dumps(result)
  447. def serve_forever(ip='0.0.0.0', port=5001):
  448. pywsgi.WSGIServer((ip, port), app).serve_forever()
  449. def apprun(MULTI_PROCESS=True, ip='0.0.0.0', port=5001):
  450. if MULTI_PROCESS == False:
  451. WSGIServer((ip, port), app).serve_forever()
  452. else:
  453. # mulserver = WSGIServer((ip, port), app, handler_class=WebSocketHandler)
  454. mulserver = WSGIServer((ip, port), app)
  455. mulserver.start()
  456. def server_forever():
  457. mulserver.start_accepting()
  458. mulserver._stop_event.wait()
  459. #for i in range(cpu_count()):
  460. for i in range(20):
  461. p = Process(target=server_forever)
  462. p.start()
  463. if __name__ == '__main__':
  464. app.run()
  465. #server = pywsgi.WSGIServer(('0.0.0.0', 5000), app)
  466. #server.serve_forever()
  467. # apprun()