Przeglądaj źródła

Merge branch 'debug-timeout' into pre-master

liqian 3 lat temu
rodzic
commit
5d7a871d33
6 zmienionych plików z 90 dodań i 32 usunięć
  1. 0 0
      add
  2. 4 4
      app.py
  3. 7 2
      db_helper.py
  4. 11 6
      recommend.py
  5. 7 3
      utils.py
  6. 61 17
      video_recall.py

+ 0 - 0
add


+ 4 - 4
app.py

@@ -59,12 +59,12 @@ def homepage_recommend():
             return json.dumps(result)
         else:
             log_.error('categoryId error, categoryId = {}'.format(category_id))
-            result = {'code': -1, 'message': 'fail', 'traceback': traceback.format_exc()}
+            result = {'code': -1, 'message': 'categoryId error'}
             return json.dumps(result)
 
     except Exception as e:
-        log_.error(e)
-        result = {'code': -1, 'message': 'fail', 'traceback': traceback.format_exc()}
+        log_.error(traceback.format_exc())
+        result = {'code': -1, 'message': 'fail'}
         return json.dumps(result)
 
 
@@ -145,7 +145,7 @@ def apprun(MULTI_PROCESS=True, ip='0.0.0.0', port=5001):
             mulserver._stop_event.wait()
  
         #for i in range(cpu_count()):
-        for i in range(4):
+        for i in range(3):
             p = Process(target=server_forever)
             p.start()
 

+ 7 - 2
db_helper.py

@@ -1,5 +1,6 @@
 import redis
 import psycopg2
+from psycopg2 import pool as pgpool
 from config import set_config
 from log import Log
 
@@ -230,20 +231,24 @@ class RedisHelper(object):
 #hologres_info = config_.HOLOGRES_INFO
 #conn = psycopg2.connect(**hologres_info)
 #cur = conn.cursor()
+connectPool = pgpool.SimpleConnectionPool(1, 10, **config_.HOLOGRES_INFO)
 class HologresHelper(object):
     def __init__(self):
         """初始化hologres连接信息"""
-        self.hologres_info = config_.HOLOGRES_INFO
+        #self.hologres_info = config_.HOLOGRES_INFO
 
     def get_data(self, sql):
         #global conn
         # 连接Hologres
-        conn = psycopg2.connect(**self.hologres_info)
+        #conn = psycopg2.connect(**self.hologres_info)
+        conn = connectPool.getconn()
         # 创建游标
         cur = conn.cursor()
         # 查询数据
         cur.execute(sql)
         data = cur.fetchall()
+        cur.close()
+        connectPool.putconn(conn, close=False)
         # 提交事务
         #conn.commit()
         # 释放资源

+ 11 - 6
recommend.py

@@ -47,6 +47,7 @@ def video_recommend(mid, uid, size, app_type, algo_type):
     pool_recall = PoolRecall(app_type=app_type, mid=mid, uid=uid, ab_code=ab_code)
     _, last_rov_recall_key, _ = pool_recall.get_video_last_idx()
     t = [gevent.spawn(pool_recall.rov_pool_recall, size), gevent.spawn(pool_recall.flow_pool_recall, size) ]
+    gevent.joinall(t)
     recall_result_list = [i.get() for i in t]
 
     end_recall = time.time()
@@ -112,14 +113,18 @@ def update_local_distribute_count(videos):
         redis_helper = RedisHelper()
         for item in videos:
             key_name = '{}{}.{}'.format(config_.LOCAL_DISTRIBUTE_COUNT_PREFIX, item['videoId'], item['flowPool'])
-            if redis_helper.key_exists(key_name=key_name):
-                # 该视频本地有记录,本地记录的分发数 - 1
-                redis_helper.decr_key(key_name=key_name, amount=1, expire_time=5 * 60)
-            else:
-                # 该视频本地无记录,接口获取的分发数 - 1
-                redis_helper.incr_key(key_name=key_name, amount=int(item['distributeCount']) - 1, expire_time=5 * 60)
+            # 本地记录的分发数 - 1
+            redis_helper.decr_key(key_name=key_name, amount=1, expire_time=5 * 60)
+
+            # if redis_helper.key_exists(key_name=key_name):
+            #     # 该视频本地有记录,本地记录的分发数 - 1
+            #     redis_helper.decr_key(key_name=key_name, amount=1, expire_time=5 * 60)
+            # else:
+            #     # 该视频本地无记录,接口获取的分发数 - 1
+            #     redis_helper.incr_key(key_name=key_name, amount=int(item['distributeCount']) - 1, expire_time=5 * 60)
 
     except Exception as e:
+        log_.error('update_local_distribute_count error...')
         log_.error(traceback.format_exc())
 
 

+ 7 - 3
utils.py

@@ -208,6 +208,10 @@ class FilterVideos(object):
 
 
 if __name__ == '__main__':
-    filter_ = FilterVideos(app_type=1, mid='22', uid='www', video_ids=[1, 2, 3, 55])
-    filter_.filter_videos()
-    filter_.filter_video_status(video_ids=[1, 3, 5])
+    # filter_ = FilterVideos(app_type=1, mid='22', uid='www', video_ids=[1, 2, 3, 55])
+    # filter_.filter_videos()
+    # filter_.filter_video_status(video_ids=[1, 3, 5])
+
+    videos = [{'videoId': 9034659, 'flowPool': '3#11#3#1637824188547'}, {'videoId': 9035052, 'flowPool': '3#11#3#1637824172827'}]
+    res = get_videos_remain_view_count(4, videos)
+    print(res)

+ 61 - 17
video_recall.py

@@ -1,10 +1,12 @@
 import time
+import traceback
 
 from datetime import date, timedelta, datetime
 from log import Log
 from db_helper import RedisHelper
 from config import set_config
 from utils import FilterVideos, get_videos_remain_view_count, get_videos_local_distribute_count
+import gevent
 
 log_ = Log()
 config_ = set_config()
@@ -63,7 +65,10 @@ class PoolRecall(object):
             # 过滤
             debug_tm_b = time.time()
             filter_ = FilterVideos(app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
-            filtered_result = filter_.filter_videos()
+            ge = gevent.spawn(filter_.filter_videos)
+            ge.join()
+            filtered_result = ge.get()
+            #filtered_result = filter_.filter_videos()
             debug_tm_e = time.time()
             if filtered_result:
                 # 添加视频源参数 pushFrom, abCode
@@ -117,12 +122,17 @@ class PoolRecall(object):
                     video_mapping[video_id].append(flow_pool)
             # 过滤
             filter_ = FilterVideos(app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
-            filtered_result = filter_.filter_videos()
+            ge = gevent.spawn(filter_.filter_videos)
+            ge.join()
+            filtered_result = ge.get()
+            #filtered_result = filter_.filter_videos()
             # 检查可分发数
             if filtered_result:
                 st_check = time.time()
-                check_result, error_flag = self.check_video_counts(video_ids=filtered_result,
-                                                                   flow_pool_mapping=video_mapping)
+                ge = gevent.spawn(self.check_video_counts, video_ids=filtered_result, flow_pool_mapping=video_mapping)
+                ge.join()
+                check_result, error_flag = ge.get()
+                #check_result, error_flag = self.check_video_counts(video_ids=filtered_result, flow_pool_mapping=video_mapping)
                 # 判断错误标记, True为错误
                 if error_flag:
                     # 结束流量池召回
@@ -133,9 +143,13 @@ class PoolRecall(object):
                         # 取其中一个 flow_pool 作为召回结果
                         # 添加视频源参数 pushFrom, abCode
                         flow_pool_recall_result.append(
-                            {'videoId': item[0], 'flowPool': item[1], 'distributeCount': item[2],
+                            {'videoId': item[0], 'flowPool': item[1],
                              'rovScore': video_score[item[0]], 'pushFrom': 'flow_pool', 'abCode': self.ab_code}
                         )
+                        # flow_pool_recall_result.append(
+                        #     {'videoId': item[0], 'flowPool': item[1], 'distributeCount': item[2],
+                        #      'rovScore': video_score[item[0]], 'pushFrom': 'flow_pool', 'abCode': self.ab_code}
+                        # )
                         flow_pool_recall_videos.append(item[0])
                 et_check = time.time()
                 log_.info('check result: result = {}, execute time = {}ms'.format(
@@ -153,9 +167,27 @@ class PoolRecall(object):
         """
         flow_pool_key = self.get_pool_redis_key('flow')
         videos = []
+        check_result = []
         for video_id in video_ids:
             for flow_pool in flow_pool_mapping[video_id]:
-                videos.append({'videoId': video_id, 'flowPool': flow_pool})
+                # 判断是否有本地分发记录
+                cur_count = get_videos_local_distribute_count(video_id=video_id, flow_pool=flow_pool)
+                # 无记录
+                if cur_count is None:
+                    videos.append({'videoId': video_id, 'flowPool': flow_pool})
+                # 本地分发数 cur_count > 0
+                elif cur_count > 0:
+                    check_result.append((video_id, flow_pool))
+                # 本地分发数 cur_count <= 0,从流量召回池移除
+                else:
+                    value = '{}-{}'.format(video_id, flow_pool)
+                    self.redis_helper.remove_value_from_zset(key_name=flow_pool_key, value=value)
+        # 本次视频都有本地记录
+        if len(video_ids) == 0:
+            error_flag = False
+            return check_result, error_flag
+
+        # 本地无记录视频,检查实时分发数
         view_count_result, error_flag = get_videos_remain_view_count(app_type=self.app_type, videos=videos)
         log_.info('error_flag = {}, view_count_result = {}'.format(error_flag, view_count_result))
 
@@ -166,19 +198,31 @@ class PoolRecall(object):
                 value = '{}-{}'.format(item['videoId'], item['flowPool'])
                 self.redis_helper.remove_value_from_zset(key_name=flow_pool_key, value=value)
 
-        check_result = []
+        redis_helper = RedisHelper()
         for item in view_count_result:
-            if item[2] > 0:
+            try:
+                # 接口超时,item[2]可能为None
+                remain_count = int(item[2])
+            except Exception as e:
+                log_.error('remain_count type error...')
+                log_.error(traceback.format_exc())
+                continue
+            if remain_count > 0:
                 # viewCount > 0
-                # 判断本地分发数
-                cur_count = get_videos_local_distribute_count(video_id=item[0], flow_pool=item[1])
-                # 无记录 或 cur_count > 0
-                if cur_count is None or cur_count > 0:
-                    check_result.append(item)
-                # cur_count <= 0,从流量召回池移除
-                else:
-                    value = '{}-{}'.format(item[0], item[1])
-                    self.redis_helper.remove_value_from_zset(key_name=flow_pool_key, value=value)
+                check_result.append(item)
+                # 将分发数更新到本地记录
+                key_name = '{}{}.{}'.format(config_.LOCAL_DISTRIBUTE_COUNT_PREFIX, item[0], item[1])
+                redis_helper.incr_key(key_name=key_name, amount=remain_count, expire_time=5 * 60)
+
+                # # 判断本地分发数
+                # cur_count = get_videos_local_distribute_count(video_id=item[0], flow_pool=item[1])
+                # # 无记录 或 cur_count > 0
+                # if cur_count is None or cur_count > 0:
+                #     check_result.append(item)
+                # # cur_count <= 0,从流量召回池移除
+                # else:
+                #     value = '{}-{}'.format(item[0], item[1])
+                #     self.redis_helper.remove_value_from_zset(key_name=flow_pool_key, value=value)
             else:
                 # viewCount <= 0
                 # 从流量召回池移除