Procházet zdrojové kódy

Merge branch 'gevent'

liqian před 3 roky
rodič
revize
c6e825c689
5 změnil soubory, kde provedl 61 přidání a 11 odebrání
  1. 32 3
      app.py
  2. 1 1
      config.py
  3. 14 6
      db_helper.py
  4. 9 0
      recommend.py
  5. 5 1
      video_recall.py

+ 32 - 3
app.py

@@ -1,3 +1,6 @@
+from gevent import monkey
+monkey.patch_all()
+
 import json
 import time
 
@@ -10,8 +13,11 @@ from video_recall import PoolRecall
 from db_helper import RedisHelper
 import traceback
 import ast
-
+from gevent.pywsgi import WSGIServer
+from multiprocessing import cpu_count, Process
+#from werkzeug.middleware.profiler import ProfilerMiddleware
 app = Flask(__name__)
+#app.wsgi_app = ProfilerMiddleware(app.wsgi_app)
 log_ = Log()
 config_ = set_config()
 
@@ -94,7 +100,6 @@ def app_video_hot_list():
     try:
         page_size= 10
         request_data = request.get_data()
-        print(request_data)
         request_data = json.loads(request_data)
         page = request_data.get('page', 0)
         log_.info('request data: {}'.format(request_data))
@@ -123,6 +128,30 @@ def app_video_hot_list():
         result = {'code': -1, 'message': 'fail'}
         return json.dumps(result)
 
+def serve_forever(ip='0.0.0.0', port=5000):
+    pywsgi.WSGIServer((ip, port), app).serve_forever()
+
+
+def apprun(MULTI_PROCESS=True, ip='0.0.0.0', port=5000):
+    if MULTI_PROCESS == False:
+        WSGIServer((ip, port), app).serve_forever()
+    else:
+        mulserver = WSGIServer((ip, port), app)
+        mulserver.start()
+ 
+        def server_forever():
+            mulserver.start_accepting()
+            mulserver._stop_event.wait()
+ 
+        #for i in range(cpu_count()):
+        for i in range(4):
+            p = Process(target=server_forever)
+            p.start()
+
 
 if __name__ == '__main__':
-    app.run()
+    
+    #app.run()
+    #server = pywsgi.WSGIServer(('0.0.0.0', 5000), app)
+    #server.serve_forever()
+    apprun()

+ 1 - 1
config.py

@@ -159,7 +159,7 @@ class ProductionConfig(BaseConfig):
 
 
 def set_config():
-    # return DevelopmentConfig()
+    #return DevelopmentConfig()
     # return TestConfig()
     # return PreProductionConfig()
     return ProductionConfig()

+ 14 - 6
db_helper.py

@@ -6,6 +6,7 @@ from log import Log
 config_ = set_config()
 log = Log()
 
+conn_redis = None
 
 class RedisHelper(object):
     def __init__(self):
@@ -23,12 +24,15 @@ class RedisHelper(object):
         连接redis
         :return: conn
         """
-        pool = redis.ConnectionPool(host=self.host,
+        global conn_redis
+        if conn_redis is None:
+            pool = redis.ConnectionPool(host=self.host,
                                     port=self.port,
                                     password=self.password,
                                     decode_responses=True)
-        conn = redis.Redis(connection_pool=pool)
-        return conn
+            conn = redis.Redis(connection_pool=pool)
+            conn_redis = conn
+        return conn_redis
 
     def key_exists(self, key_name):
         """
@@ -223,12 +227,16 @@ class RedisHelper(object):
         conn.expire(key_name, int(expire_time))
 
 
+#hologres_info = config_.HOLOGRES_INFO
+#conn = psycopg2.connect(**hologres_info)
+#cur = conn.cursor()
 class HologresHelper(object):
     def __init__(self):
         """初始化hologres连接信息"""
         self.hologres_info = config_.HOLOGRES_INFO
 
     def get_data(self, sql):
+        #global conn
         # 连接Hologres
         conn = psycopg2.connect(**self.hologres_info)
         # 创建游标
@@ -237,10 +245,10 @@ class HologresHelper(object):
         cur.execute(sql)
         data = cur.fetchall()
         # 提交事务
-        conn.commit()
+        #conn.commit()
         # 释放资源
-        cur.close()
-        conn.close()
+        #cur.close()
+        #conn.close()
         return data
 
 

+ 9 - 0
recommend.py

@@ -8,6 +8,7 @@ from config import set_config
 from video_recall import PoolRecall
 from video_rank import video_rank, bottom_strategy
 from db_helper import RedisHelper
+import gevent
 
 log_ = Log()
 config_ = set_config()
@@ -27,6 +28,7 @@ def video_recommend(mid, uid, size, app_type, algo_type):
     # ####### 多进程召回
     start_recall = time.time()
     log_.info('====== recall')
+    '''     
     cores = multiprocessing.cpu_count()
     pool = multiprocessing.Pool(processes=cores)
     pool_recall = PoolRecall(app_type=app_type, mid=mid, uid=uid, ab_code=ab_code)
@@ -40,6 +42,13 @@ def video_recommend(mid, uid, size, app_type, algo_type):
     recall_result_list = [p.get() for p in pool_list]
     pool.close()
     pool.join()
+    '''
+    recall_result_list = []
+    pool_recall = PoolRecall(app_type=app_type, mid=mid, uid=uid, ab_code=ab_code)
+    _, last_rov_recall_key, _ = pool_recall.get_video_last_idx()
+    t = [gevent.spawn(pool_recall.rov_pool_recall, size), gevent.spawn(pool_recall.flow_pool_recall, size) ]
+    recall_result_list = [i.get() for i in t]
+
     end_recall = time.time()
     log_.info('mid: {}, uid: {}, recall: {}, execute time = {}ms'.format(
         mid, uid, recall_result_list, (end_recall - start_recall) * 1000))

+ 5 - 1
video_recall.py

@@ -9,7 +9,7 @@ from utils import FilterVideos, get_videos_remain_view_count, get_videos_local_d
 log_ = Log()
 config_ = set_config()
 
-
+    
 class PoolRecall(object):
     """召回"""
     def __init__(self, app_type, mid='', uid='', ab_code=''):
@@ -61,8 +61,10 @@ class PoolRecall(object):
                 video_ids.append(eval(value[0]))
                 video_score[eval(value[0])] = value[1]
             # 过滤
+            debug_tm_b = time.time()
             filter_ = FilterVideos(app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
             filtered_result = filter_.filter_videos()
+            debug_tm_e = time.time()
             if filtered_result:
                 # 添加视频源参数 pushFrom, abCode
                 temp_result = [{'videoId': item, 'rovScore': video_score[item],
@@ -86,6 +88,7 @@ class PoolRecall(object):
         # 记录获取频次
         freq = 0
         idx = 0
+        debug_tm_b = time.time()
         while len(flow_pool_recall_result) < size:
             freq += 1
             # 获取数据
@@ -114,6 +117,7 @@ class PoolRecall(object):
                 else:
                     video_mapping[video_id].append(flow_pool)
             # 过滤
+            debug_tm_fb = time.time()
             filter_ = FilterVideos(app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
             filtered_result = filter_.filter_videos()
             # 检查可分发数