Selaa lähdekoodia

Merge branch 'debug-timeout' into test

liqian 3 vuotta sitten
vanhempi
commit
43f35ac019
5 muutettua tiedostoa jossa 70 lisäystä ja 25 poistoa
  1. 0 0
      add
  2. 1 1
      app.py
  3. 7 2
      db_helper.py
  4. 10 6
      recommend.py
  5. 52 16
      video_recall.py

+ 0 - 0
add


+ 1 - 1
app.py

@@ -145,7 +145,7 @@ def apprun(MULTI_PROCESS=True, ip='0.0.0.0', port=5001):
             mulserver._stop_event.wait()
  
         #for i in range(cpu_count()):
-        for i in range(4):
+        for i in range(3):
             p = Process(target=server_forever)
             p.start()
 

+ 7 - 2
db_helper.py

@@ -1,5 +1,6 @@
 import redis
 import psycopg2
+from psycopg2 import pool as pgpool
 from config import set_config
 from log import Log
 
@@ -230,20 +231,24 @@ class RedisHelper(object):
 #hologres_info = config_.HOLOGRES_INFO
 #conn = psycopg2.connect(**hologres_info)
 #cur = conn.cursor()
+connectPool = pgpool.SimpleConnectionPool(1, 10, **config_.HOLOGRES_INFO)
 class HologresHelper(object):
     def __init__(self):
         """初始化hologres连接信息"""
-        self.hologres_info = config_.HOLOGRES_INFO
+        #self.hologres_info = config_.HOLOGRES_INFO
 
     def get_data(self, sql):
         #global conn
         # 连接Hologres
-        conn = psycopg2.connect(**self.hologres_info)
+        #conn = psycopg2.connect(**self.hologres_info)
+        conn = connectPool.getconn()
         # 创建游标
         cur = conn.cursor()
         # 查询数据
         cur.execute(sql)
         data = cur.fetchall()
+        cur.close()
+        connectPool.putconn(conn, close=False)
         # 提交事务
         #conn.commit()
         # 释放资源

+ 10 - 6
recommend.py

@@ -47,6 +47,7 @@ def video_recommend(mid, uid, size, app_type, algo_type):
     pool_recall = PoolRecall(app_type=app_type, mid=mid, uid=uid, ab_code=ab_code)
     _, last_rov_recall_key, _ = pool_recall.get_video_last_idx()
     t = [gevent.spawn(pool_recall.rov_pool_recall, size), gevent.spawn(pool_recall.flow_pool_recall, size) ]
+    gevent.joinall(t)
     recall_result_list = [i.get() for i in t]
 
     end_recall = time.time()
@@ -112,12 +113,15 @@ def update_local_distribute_count(videos):
         redis_helper = RedisHelper()
         for item in videos:
             key_name = '{}{}.{}'.format(config_.LOCAL_DISTRIBUTE_COUNT_PREFIX, item['videoId'], item['flowPool'])
-            if redis_helper.key_exists(key_name=key_name):
-                # 该视频本地有记录,本地记录的分发数 - 1
-                redis_helper.decr_key(key_name=key_name, amount=1, expire_time=5 * 60)
-            else:
-                # 该视频本地无记录,接口获取的分发数 - 1
-                redis_helper.incr_key(key_name=key_name, amount=int(item['distributeCount']) - 1, expire_time=5 * 60)
+            # 本地记录的分发数 - 1
+            redis_helper.decr_key(key_name=key_name, amount=1, expire_time=5 * 60)
+
+            # if redis_helper.key_exists(key_name=key_name):
+            #     # 该视频本地有记录,本地记录的分发数 - 1
+            #     redis_helper.decr_key(key_name=key_name, amount=1, expire_time=5 * 60)
+            # else:
+            #     # 该视频本地无记录,接口获取的分发数 - 1
+            #     redis_helper.incr_key(key_name=key_name, amount=int(item['distributeCount']) - 1, expire_time=5 * 60)
 
     except Exception as e:
         log_.error(traceback.format_exc())

+ 52 - 16
video_recall.py

@@ -5,6 +5,7 @@ from log import Log
 from db_helper import RedisHelper
 from config import set_config
 from utils import FilterVideos, get_videos_remain_view_count, get_videos_local_distribute_count
+import gevent
 
 log_ = Log()
 config_ = set_config()
@@ -63,7 +64,10 @@ class PoolRecall(object):
             # 过滤
             debug_tm_b = time.time()
             filter_ = FilterVideos(app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
-            filtered_result = filter_.filter_videos()
+            ge = gevent.spawn(filter_.filter_videos)
+            ge.join()
+            filtered_result = ge.get()
+            #filtered_result = filter_.filter_videos()
             debug_tm_e = time.time()
             if filtered_result:
                 # 添加视频源参数 pushFrom, abCode
@@ -117,12 +121,17 @@ class PoolRecall(object):
                     video_mapping[video_id].append(flow_pool)
             # 过滤
             filter_ = FilterVideos(app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
-            filtered_result = filter_.filter_videos()
+            ge = gevent.spawn(filter_.filter_videos)
+            ge.join()
+            filtered_result = ge.get()
+            #filtered_result = filter_.filter_videos()
             # 检查可分发数
             if filtered_result:
                 st_check = time.time()
-                check_result, error_flag = self.check_video_counts(video_ids=filtered_result,
-                                                                   flow_pool_mapping=video_mapping)
+                ge = gevent.spawn(self.check_video_counts, video_ids=filtered_result, flow_pool_mapping=video_mapping)
+                ge.join()
+                check_result, error_flag = ge.get()
+                #check_result, error_flag = self.check_video_counts(video_ids=filtered_result, flow_pool_mapping=video_mapping)
                 # 判断错误标记, True为错误
                 if error_flag:
                     # 结束流量池召回
@@ -133,9 +142,13 @@ class PoolRecall(object):
                         # 取其中一个 flow_pool 作为召回结果
                         # 添加视频源参数 pushFrom, abCode
                         flow_pool_recall_result.append(
-                            {'videoId': item[0], 'flowPool': item[1], 'distributeCount': item[2],
+                            {'videoId': item[0], 'flowPool': item[1],
                              'rovScore': video_score[item[0]], 'pushFrom': 'flow_pool', 'abCode': self.ab_code}
                         )
+                        # flow_pool_recall_result.append(
+                        #     {'videoId': item[0], 'flowPool': item[1], 'distributeCount': item[2],
+                        #      'rovScore': video_score[item[0]], 'pushFrom': 'flow_pool', 'abCode': self.ab_code}
+                        # )
                         flow_pool_recall_videos.append(item[0])
                 et_check = time.time()
                 log_.info('check result: result = {}, execute time = {}ms'.format(
@@ -153,9 +166,27 @@ class PoolRecall(object):
         """
         flow_pool_key = self.get_pool_redis_key('flow')
         videos = []
+        check_result = []
         for video_id in video_ids:
             for flow_pool in flow_pool_mapping[video_id]:
-                videos.append({'videoId': video_id, 'flowPool': flow_pool})
+                # 判断是否有本地分发记录
+                cur_count = get_videos_local_distribute_count(video_id=video_id, flow_pool=flow_pool)
+                # 无记录
+                if cur_count is None:
+                    videos.append({'videoId': video_id, 'flowPool': flow_pool})
+                # 本地分发数 cur_count > 0
+                elif cur_count > 0:
+                    check_result.append((video_id, flow_pool))
+                # 本地分发数 cur_count <= 0,从流量召回池移除
+                else:
+                    value = '{}-{}'.format(video_id, flow_pool)
+                    self.redis_helper.remove_value_from_zset(key_name=flow_pool_key, value=value)
+        # 本次视频都有本地记录
+        if len(video_ids) == 0:
+            error_flag = False
+            return check_result, error_flag
+
+        # 本地无记录视频,检查实时分发数
         view_count_result, error_flag = get_videos_remain_view_count(app_type=self.app_type, videos=videos)
         log_.info('error_flag = {}, view_count_result = {}'.format(error_flag, view_count_result))
 
@@ -166,19 +197,24 @@ class PoolRecall(object):
                 value = '{}-{}'.format(item['videoId'], item['flowPool'])
                 self.redis_helper.remove_value_from_zset(key_name=flow_pool_key, value=value)
 
-        check_result = []
+        redis_helper = RedisHelper()
         for item in view_count_result:
             if item[2] > 0:
                 # viewCount > 0
-                # 判断本地分发数
-                cur_count = get_videos_local_distribute_count(video_id=item[0], flow_pool=item[1])
-                # 无记录 或 cur_count > 0
-                if cur_count is None or cur_count > 0:
-                    check_result.append(item)
-                # cur_count <= 0,从流量召回池移除
-                else:
-                    value = '{}-{}'.format(item[0], item[1])
-                    self.redis_helper.remove_value_from_zset(key_name=flow_pool_key, value=value)
+                check_result.append(item)
+                # 将分发数更新到本地记录
+                key_name = '{}{}.{}'.format(config_.LOCAL_DISTRIBUTE_COUNT_PREFIX, item[0], item[1])
+                redis_helper.incr_key(key_name=key_name, amount=int(item[2]), expire_time=5 * 60)
+
+                # # 判断本地分发数
+                # cur_count = get_videos_local_distribute_count(video_id=item[0], flow_pool=item[1])
+                # # 无记录 或 cur_count > 0
+                # if cur_count is None or cur_count > 0:
+                #     check_result.append(item)
+                # # cur_count <= 0,从流量召回池移除
+                # else:
+                #     value = '{}-{}'.format(item[0], item[1])
+                #     self.redis_helper.remove_value_from_zset(key_name=flow_pool_key, value=value)
             else:
                 # viewCount <= 0
                 # 从流量召回池移除