# encoding: utf-8 import base64 import json # from meinheld import server import flask from flask import request, Flask from flask import Flask from embedding_manager import EmbeddingManager from embedding_manager_user import EmbeddingManagerUser import time import logging from logging.handlers import TimedRotatingFileHandler app = Flask(__name__) def setLog(): log_fmt = '%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s' formatter = logging.Formatter(log_fmt) fh = TimedRotatingFileHandler(filename="log/run_dss_server" + str(time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())) + ".log", when="H", interval=1, backupCount=72) fh.setFormatter(formatter) logging.basicConfig(level=logging.INFO) log = logging.getLogger() log.addHandler(fh) setLog() print("load user embedding") mgr_user_embedding = EmbeddingManagerUser( "/work/xielixun/DeepMatch/DSSM/tensorflow_user_embedding-dssm-tzld-210327-2-bak.csv", "mid", "emb") print("load video embedding") mgr_video_embedding = EmbeddingManager( # "/root/xielixun/ant-learn-recsys/datas/tzld_video_embedding-1106-sort.csv", "/work/xielixun/DeepMatch/DSSM/tensorflow_video_embedding-dssm-tzld-210327-2-bak.csv", "videoid", "emb") @app.route("/") def index(): return "test" """ 健康检查 """ @app.route("/healthcheck", methods=['GET']) def index_health_check(): logging.info("I'm ok") return "ok" # 定义路由 @app.route("/ai/v1/user2video", methods=['POST']) def get_video_by_user_mid2vid(): try: start_time = time.time() resParm = flask.request.data # 转字符串 resParm = str(resParm, encoding="utf-8") resParm = eval(resParm) requestId = resParm.get('requestId') # 服务鉴权 token = resParm.get('token') if not token: res = {'code': 3, 'msg': 'token fail'} logging.error("code: 3 msg: token fail ") return json.dumps(res) # 按照debase64进行处理 mid = resParm.get("mid") vid = resParm.get("vid") page_size = resParm.get("pageSize") # 1. 获取该用户的embedding user_embedding_str = mgr_user_embedding.get_embedding(mid) user_str = "[" target_video_ids = list() if user_embedding_str != "": user_list = user_embedding_str[1:-1].strip('\n').split() for idx, emb in enumerate(user_list): if idx < 31: user_str += emb + "," else: user_str += emb + "]" # 2. 获取该用户看过的电影ID列表 # watch_ids = obj_user_rating.get_user_watched_ids(user_id) # 3. 使用近邻搜索获取用户可能喜欢的视频ID列表 target_video_ids = mgr_video_embedding.search_ids_by_embedding(user_str, page_size) timeUsed = time.time() - start_time data = {'requestId': requestId, 'videoIds': str(target_video_ids), 'timeUsed': timeUsed, 'mid': mid} res = {'code': 0, 'msg': 'success', 'data': data} logging.info(f"code:0 msg:success user2video cost Time is: {str(timeUsed)} ") return json.dumps(res) except Exception as x: logging.exception(x) res = {'code': 6, 'msg': 'request exception', 'data': {}, 'mid': mid} return json.dumps(res) # 定义路由 @app.route("/ai/v1/video2video", methods=['POST']) def get_video_by_video_vid2vid(): try: start_time = time.time() resParm = flask.request.data # 转字符串 resParm = str(resParm, encoding="utf-8") resParm = eval(resParm) requestId = resParm.get('requestId') # 服务鉴权 token = resParm.get('token') if not token: res = {'code': 3, 'msg': 'token fail'} logging.error("code: 3 msg: token fail ") return json.dumps(res) # 按照debase64进行处理 # mid = resParm.get("mid") vid = resParm.get("vid") page_size = resParm.get("pageSize") video_ids = list() video_str = "[" # target_video_ids = list() # 查询自己的embedding video_embedding = mgr_video_embedding.get_embedding(vid) if video_embedding != "": video_list = video_embedding[1:-1].strip('\n').split() for idx, emb in enumerate(video_list): if idx < 31: video_str += emb + "," else: video_str += emb + "]" # 查询相似的视频 video_ids = mgr_video_embedding.search_ids_by_embedding(video_str, page_size) timeUsed = time.time() - start_time data = {'requestId': requestId, 'videoIds': str(video_ids), 'timeUsed': timeUsed, 'vid': vid} res = {'code': 0, 'msg': 'success', 'data': data} logging.info(f"code:0 msg:success video2video cost Time is: {str(timeUsed)} ") return json.dumps(res) except Exception as x: logging.exception(x) res = {'code': 6, 'msg': 'request exception', 'data': {}, 'vid': vid} return json.dumps(res) @app.route("/ai/v1/videolist2video", methods=['POST']) def get_video_by_video_vidList2vid(): try: start_time = time.time() resParm = flask.request.data # 转字符串 resParm = str(resParm, encoding="utf-8") resParm = eval(resParm) requestId = resParm.get('requestId') # 服务鉴权 token = resParm.get('token') if not token: res = {'code': 3, 'msg': 'token fail'} logging.error("code: 3 msg: token fail ") return json.dumps(res) # 按照debase64进行处理 # mid = resParm.get("mid") vid_str = resParm.get("vidList") vid_list = vid_str.split(",") # vid_list = list(map(int, vid_list)) # print("\n\nvid_list is: ") # print(vid_list) page_size = resParm.get("pageSize") video_embedding_list = list() # 查询自己的embedding for vid in vid_list: video_embedding = mgr_video_embedding.get_embedding(vid) if video_embedding == "": continue video_str = "[" if video_embedding != "": video_list = video_embedding[1:-1].strip('\n').split() for idx, emb in enumerate(video_list): if idx < 31: video_str += emb + "," else: video_str += emb + "]" video_embedding_list.append(json.loads(video_str)) # 查询相似的视频 video_ids = list() if len(video_embedding_list) > 0: video_ids = mgr_video_embedding.search_ids_by_embedding_list(video_embedding_list, page_size) timeUsed = time.time() - start_time data = {'requestId': requestId, 'videoIds': str(video_ids), 'timeUsed': timeUsed, 'vid': vid} res = {'code': 0, 'msg': 'success', 'data': data} logging.info(f"code:0 msg:success Faiss videolist2video cost Time is: {str(timeUsed)} ") return json.dumps(res) except Exception as x: logging.exception(x) res = {'code': 6, 'msg': 'request exception', 'data': {}, 'vid': vid} return json.dumps(res) if __name__ == '__main__': # 启动服务 # app.run(host="0.0.0.0", port=9996) # test # app.run(host="0.0.0.0", port=9996) # fm # app.run(host="0.0.0.0", port=9999) # item2vec app.run(host="0.0.0.0", port=9997) # dssm