baichongyang 3 years ago
parent
commit
e0abdcfb32
12 changed files with 396 additions and 89 deletions
  1. 0 0
      __init__.py
  2. 196 0
      aiocache.py
  3. 6 0
      asgi.py
  4. 6 6
      config.py
  5. 16 23
      recommend.py
  6. 9 0
      route.py
  7. 27 28
      utils.py
  8. 5 6
      video_rank.py
  9. 26 26
      video_recall.py
  10. 0 0
      views/__init__.py
  11. 43 0
      views/app_hot_list.py
  12. 62 0
      views/homepage_rs.py

+ 0 - 0
__init__.py


+ 196 - 0
aiocache.py

@@ -0,0 +1,196 @@
+import aioredis
+import aiopg
+from config import set_config
+from log import Log
+
+config_ = set_config()
+log = Log()
+
+redis_info = config_.REDIS_INFO
+
+# R = get_redis_connect()
+
+pg_info = config_.HOLOGRES_INFO
+# P = aiopg.connect(database=pg_info['dbname'],
+#                                user=pg_info['user'],
+#                                password=pg_info['password'],
+#                                host=pg_info['host'])
+
+
+R = aioredis.from_url(redis_info['host'], password=redis_info['password'])
+# async def get_redis_connect():
+#     return await aioredis.from_url(redis_info['host'], password=redis_info['password'])
+
+# async def pg_getdata(sql):
+#     P = await aiopg.connect(database=pg_info['dbname'],
+#                                user=pg_info['user'],
+#                                password=pg_info['password'],
+#                                host=pg_info['host'],
+#                                port=pg_info['port'])
+#     cur = await P.cursor()
+#     await cur.execute(sql)
+#     ret = await cur.fetchall()
+#     return ret
+
+dsn = f'dbname={pg_info["dbname"]} user={pg_info["user"]} password={pg_info["password"]} host={pg_info["host"]} port={pg_info["port"]}'
+print(dsn)
+async def pg_getdata(sql):
+    pool = await aiopg.create_pool(dsn)
+    async with pool.acquire() as conn:
+        async with conn.cursor() as cur:
+            await cur.execute(sql)    
+            return await cur.fetchall()
+
+async def key_exists(key_name):
+    """
+    判断key是否存在
+    :param key_name: key
+    :return: 存在-True, 不存在-False
+    """
+    return await R.exists(key_name)
+
+async def del_keys(key_name):
+    """
+    删除key
+    :param key_name: key
+    :return: None
+    """
+    return await R.delete(key_name)
+
+async def get_data_from_redis(key_name):
+    """
+    读取redis中的数据
+    :param key_name: key
+    :return: data
+    """
+    if not await R.exists(key_name):
+        # key不存在
+        return None
+    data = await R.get(key_name)
+    return data
+
+async def set_data_to_redis(key_name, value, expire_time=24*3600):
+    """
+    新增数据
+    :param key_name: key
+    :param value: 元素的值 videoId
+    :param expire_time: 过期时间,单位:s,默认1天
+    :return: None
+    """
+    await R.set(key_name, value, ex=expire_time)
+
+async def add_data_with_zset(key_name, data, expire_time=7*24*3600):
+    """
+    新增数据,有序set
+    :param key_name: key
+    :param data: 元素的值及对应分数 type-dict  {value: score}
+    :param expire_time: 过期时间,单位:s,默认7天
+    :return: None
+    """
+    await R.zadd(key_name, data)
+    # 设置过期时间
+    await R.expire(key_name, int(expire_time))
+
+async def get_data_zset_with_index(key_name, start, end, desc=True, with_scores=False):
+    """
+    根据索引位置获取元素的值
+    :param key_name: key
+    :param start: 索引起始点 闭区间,包含start
+    :param end: 索引结束点 闭区间,包含end
+    :param desc: 分数排序方式,默认从大到小
+    :param with_scores: 是否获取元素的分数,默认 False,只获取元素的值
+    :return: data 元素值列表(不包含分数),value(videoId)类型转换为int, 包含分数时不进行类型转换
+    """
+    if not await R.exists(key_name):
+        return None
+    data = await R.zrange(key_name, start, end, desc, with_scores)
+    if with_scores:
+        return data
+    else:
+        return [eval(value) for value in data]
+
+async def get_score_with_value(key_name, value):
+    """
+    在zset中,根据元素的value获取对应的score
+    :param key_name: key
+    :param value: 元素的值
+    :return: score value对应的score
+    """
+    if not await R.exists(key_name):
+        return None
+    return await R.zscore(key_name, value)
+
+async def update_score_with_value(key_name, value, score, expire_time=7*24*3600):
+    """
+    在zset中,修改元素value对应的score
+    :param key_name: key
+    :param value: 元素的值
+    :param score: value对应的score更新值
+    :param expire_time: 过期时间,单位:s,默认7天
+    """
+    if R.exists(key_name):
+        R.zadd(key_name, {value: score})
+    else:
+        # key不存在时,需设置过期时间
+        R.zadd(key_name, {value: score})
+        R.expire(key_name, expire_time)
+
+async def remove_value_from_zset(key_name, value):
+    """
+    删除zset中的指定元素
+    :param key_name: key
+    :param value: 元素的值
+    :return: None
+    """
+    R.zrem(key_name, value)
+
+async def get_index_with_data(key_name, value):
+    """
+    根据元素的值获取在有序set中的位置,按照分数倒序(从大到小)
+    :param key_name: key
+    :param value: 元素的值
+    :return: idx 位置索引
+    """
+    return await R.zrevrank(key_name, value)
+
+async def get_data_from_set(key_name):
+    """
+    获取set中的所有数据
+    :param key_name: key
+    :return: data
+    """
+    if not await R.exists(key_name):
+        # key不存在
+        return None
+    data = await R.sscan(key_name)
+    return data[1]
+
+async def add_data_with_set(key_name, values, expire_time=30*60):
+    """
+    新增数据,set
+    :param key_name: key
+    :param values: 要添加的元素  类型-tuple
+    :param expire_time: 过期时间,单位:s,默认0.5小时
+    :return: None
+    """
+    R.sadd(key_name, *values)
+    # 设置过期时间
+    R.expire(key_name, expire_time)
+
+async def data_exists_with_set(key_name, value):
+    """
+    判断元素value是否在集合key_name中
+    :param key_name: key
+    :param value: 需判断的元素
+    :return: 存在-True, 不存在-False
+    """
+    return await R.sismember(key_name, value)
+
+async def remove_value_from_set(key_name, values):
+    """
+    删除set中的指定元素
+    :param key_name: key
+    :param values: 元素的值, 类型-tuple
+    :return: None
+    """
+    await R.srem(key_name, *values)

+ 6 - 0
asgi.py

@@ -0,0 +1,6 @@
+from route import create_app
+
+app = create_app()
+
+
+#uvicorn asgi:app

+ 6 - 6
config.py

@@ -44,7 +44,7 @@ class DevelopmentConfig(BaseConfig):
     """测试环境配置"""
     # 测试环境redis地址
     REDIS_INFO = {
-        'host': 'r-bp1ps6my7lzg8rdhwx682.redis.rds.aliyuncs.com',
+        'host': 'redis://r-bp1ps6my7lzg8rdhwx682.redis.rds.aliyuncs.com',
         'port': 6379,
         'password': 'Wqsd@2019',
     }
@@ -73,7 +73,7 @@ class TestConfig(BaseConfig):
     """测试环境配置"""
     # 测试环境redis地址
     REDIS_INFO = {
-        'host': 'r-bp1ps6my7lzg8rdhwx682.redis.rds.aliyuncs.com',
+        'host': 'redis://r-bp1ps6my7lzg8rdhwx682.redis.rds.aliyuncs.com',
         'port': 6379,
         'password': 'Wqsd@2019',
     }
@@ -102,7 +102,7 @@ class PreProductionConfig(BaseConfig):
     """预发布环境配置"""
     # 线上环境redis地址
     REDIS_INFO = {
-        'host': 'r-bp1fogs2mflr1ybfot.redis.rds.aliyuncs.com',
+        'host': 'redis://r-bp1fogs2mflr1ybfot.redis.rds.aliyuncs.com',
         'port': 6379,
         'password': 'Wqsd@2019',
     }
@@ -131,7 +131,7 @@ class ProductionConfig(BaseConfig):
     """生产环境配置"""
     # 线上环境redis地址
     REDIS_INFO = {
-        'host': 'r-bp1fogs2mflr1ybfot.redis.rds.aliyuncs.com',
+        'host': 'redis://r-bp1fogs2mflr1ybfot.redis.rds.aliyuncs.com',
         'port': 6379,
         'password': 'Wqsd@2019',
     }
@@ -157,7 +157,7 @@ class ProductionConfig(BaseConfig):
 
 
 def set_config():
-    # return DevelopmentConfig()
+    return DevelopmentConfig()
     # return TestConfig()
     # return PreProductionConfig()
-    return ProductionConfig()
+    #return ProductionConfig()

+ 16 - 23
recommend.py

@@ -7,13 +7,13 @@ from log import Log
 from config import set_config
 from video_recall import PoolRecall
 from video_rank import video_rank, bottom_strategy
-from db_helper import RedisHelper
+import aiocache
 
 log_ = Log()
 config_ = set_config()
 
 
-def video_recommend(mid, uid, size, app_type, algo_type):
+async def video_recommend(mid, uid, size, app_type, algo_type):
     """
     首页线上推荐逻辑
     :param mid: mid type-string
@@ -30,26 +30,21 @@ def video_recommend(mid, uid, size, app_type, algo_type):
     cores = multiprocessing.cpu_count()
     pool = multiprocessing.Pool(processes=cores)
     pool_recall = PoolRecall(app_type=app_type, mid=mid, uid=uid, ab_code=ab_code)
-    _, last_rov_recall_key, _ = pool_recall.get_video_last_idx()
-    pool_list = [
-        # rov召回池
-        pool.apply_async(pool_recall.rov_pool_recall, (size,)),
-        # 流量池
-        pool.apply_async(pool_recall.flow_pool_recall, (size,))
-    ]
-    recall_result_list = [p.get() for p in pool_list]
-    pool.close()
-    pool.join()
+    _, last_rov_recall_key, _ = await pool_recall.get_video_last_idx()
+
+    recall_1 = await pool_recall.rov_pool_recall(size)
+    recall_2 = await pool_recall.flow_pool_recall(size)
+
     end_recall = time.time()
     log_.info('mid: {}, uid: {}, recall: {}, execute time = {}ms'.format(
-        mid, uid, recall_result_list, (end_recall - start_recall) * 1000))
+        mid, uid, [recall_1, recall_2], (end_recall - start_recall) * 1000))
 
     # ####### 排序
     start_rank = time.time()
     log_.info('====== rank')
     data = {
-        'rov_pool_recall': recall_result_list[0],
-        'flow_pool_recall': recall_result_list[1]
+        'rov_pool_recall': recall_1,
+        'flow_pool_recall': recall_2
     }
     rank_result = video_rank(data=data, size=size)
     end_rank = time.time()
@@ -68,19 +63,18 @@ def video_recommend(mid, uid, size, app_type, algo_type):
     # ####### redis数据刷新
     log_.info('====== update redis')
     # 预曝光数据同步刷新到Redis, 过期时间为0.5h
-    redis_helper = RedisHelper()
     preview_key_name = config_.PREVIEW_KEY_PREFIX + '{}.{}'.format(app_type, mid)
     preview_video_ids = [item['videoId'] for item in rank_result]
     if preview_video_ids:
-        redis_helper.add_data_with_set(key_name=preview_key_name, values=tuple(preview_video_ids), expire_time=30*60)
+        await aiocache.add_data_with_set(key_name=preview_key_name, values=tuple(preview_video_ids), expire_time=30*60)
         log_.info('preview redis update success!')
 
     # 将此次获取的ROV召回池config_.K末位视频id同步刷新到Redis中,方便下次快速定位到召回位置,过期时间为1天
     rov_recall_video = [item['videoId'] for item in rank_result if item['pushFrom'] == 'recall_pool']
     if 0 < len(rov_recall_video) <= config_.K:
-        redis_helper.set_data_to_redis(key_name=last_rov_recall_key, value=rov_recall_video[-1])
+        await aiocache.set_data_to_redis(key_name=last_rov_recall_key, value=rov_recall_video[-1])
     elif len(rov_recall_video) > config_.K:
-        redis_helper.set_data_to_redis(key_name=last_rov_recall_key, value=rov_recall_video[config_.K - 1])
+        await aiocache.set_data_to_redis(key_name=last_rov_recall_key, value=rov_recall_video[config_.K - 1])
     log_.info('last video redis update success!')
 
     # 将此次分发的流量池视频,对 本地分发数-1 进行记录
@@ -92,7 +86,7 @@ def video_recommend(mid, uid, size, app_type, algo_type):
     return rank_result
 
 
-def update_local_distribute_count(videos):
+async def update_local_distribute_count(videos):
     """
     更新本地分发数
     :param videos: 视频列表 type-list [{'videoId':'', 'flowPool':'', 'distributeCount': '',
@@ -105,11 +99,10 @@ def update_local_distribute_count(videos):
             redis_h += 0.5
         key_name = config_.LOCAL_DISTRIBUTE_COUNT_PREFIX + str(redis_h)
         print(key_name)
-        redis_helper = RedisHelper()
         update_data = {}
         for item in videos:
             video = '{}-{}'.format(item['videoId'], item['flowPool'])
-            current_count = redis_helper.get_score_with_value(key_name=key_name, value=video)
+            current_count = await aiocache.get_score_with_value(key_name=key_name, value=video)
             if current_count is not None:
                 # 该视频本地有记录,本地记录的分发数 - 1
                 new_count = current_count - 1
@@ -119,7 +112,7 @@ def update_local_distribute_count(videos):
             update_data[video] = new_count
         log_.info('now update video local distribute count: {}, key: {}'.format(update_data, key_name))
         # 更新redis中的数据
-        redis_helper.add_data_with_zset(key_name=key_name, data=update_data, expire_time=0.5*3600)
+        await aiocache.add_data_with_zset(key_name=key_name, data=update_data, expire_time=0.5*3600)
 
     except Exception as e:
         log_.error(traceback.format_exc())

+ 9 - 0
route.py

@@ -0,0 +1,9 @@
+import falcon.asgi
+from views.app_hot_list import AppHotlist
+from views.homepage_rs import HomepageRecommend
+
+def create_app():
+    app = falcon.asgi.App()
+    app.add_route('/app/video/hot_list', AppHotlist())
+    app.add_route('/applet/video/homepage/recommend', HomepageRecommend())
+    return app

+ 27 - 28
utils.py

@@ -1,9 +1,9 @@
-import requests
 import json
 import time
+import aiohttp
 
 from datetime import datetime
-from db_helper import HologresHelper, RedisHelper
+import aiocache
 from config import set_config
 from log import Log
 
@@ -11,7 +11,7 @@ config_ = set_config()
 log_ = Log()
 
 
-def request_post(request_url, request_data, timeout=1.0):
+async def request_post(url, data, timeout=1):
     """
     post 请求 HTTP接口
     :param request_url: 接口URL
@@ -19,19 +19,21 @@ def request_post(request_url, request_data, timeout=1.0):
     :param timeout: 超时时间,默认为1秒,type-float
     :return: res_data json格式
     """
+
     try:
-        response = requests.post(url=request_url, json=request_data, timeout=timeout)
-        if response.status_code == 200:
-            res_data = json.loads(response.text)
-            return res_data
-        else:
-            return None
-    except requests.exceptions.Timeout as e:
-        log_.error('url: {} timeout, exception: {}'.format(request_url, e))
+        async with aiohttp.ClientSession() as session:
+            async with session.post(url, data=data) as res:
+                if res.status == 200:
+                    res_data = json.loads(await res.text())
+                    return res_data
+                else:
+                    return None
+    except:
+        log_.error('url: {} timeout, exception: {}'.format(url, e))
         return None
 
 
-def get_videos_remain_view_count(app_type, videos):
+async def get_videos_remain_view_count(app_type, videos):
     """
     获取视频在流量池中的剩余可分发数
     :param app_type: 产品标识 type-int
@@ -42,7 +44,7 @@ def get_videos_remain_view_count(app_type, videos):
         return []
 
     request_data = {'appType': app_type, 'videos': videos}
-    result = request_post(request_url=config_.GET_REMAIN_VIEW_COUNT_URL, request_data=request_data, timeout=1)
+    result = await request_post(url=config_.GET_REMAIN_VIEW_COUNT_URL, data=request_data, timeout=1)
 
     if result is None:
         return []
@@ -55,7 +57,7 @@ def get_videos_remain_view_count(app_type, videos):
     return data
 
 
-def get_videos_local_distribute_count(video_id, flow_pool):
+async def get_videos_local_distribute_count(video_id, flow_pool):
     """
     获取流量池视频本地分发数
     :param video_id: video_id
@@ -66,9 +68,8 @@ def get_videos_local_distribute_count(video_id, flow_pool):
     if datetime.now().minute >= 30:
         redis_h += 0.5
     key_name = config_.LOCAL_DISTRIBUTE_COUNT_PREFIX + str(redis_h)
-    redis_helper = RedisHelper()
     video = '{}-{}'.format(video_id, flow_pool)
-    current_count = redis_helper.get_score_with_value(key_name=key_name, value=video)
+    current_count = await aiocache.get_score_with_value(key_name=key_name, value=video)
     if current_count is not None:
         return current_count
     else:
@@ -90,11 +91,11 @@ class FilterVideos(object):
         self.uid = uid
         self.video_ids = video_ids
 
-    def filter_videos(self):
+    async def filter_videos(self):
         """视频过滤"""
         # 预曝光过滤
         st_pre = time.time()
-        filtered_pre_result = self.filter_video_previewed(self.video_ids)
+        filtered_pre_result = await self.filter_video_previewed(self.video_ids)
         et_pre = time.time()
         log_.info('filter by previewed: app_type = {}, result = {}, execute time = {}ms'.format(
             self.app_type, filtered_pre_result, (et_pre - st_pre) * 1000))
@@ -103,7 +104,7 @@ class FilterVideos(object):
 
         # 视频状态过滤
         st_status = time.time()
-        filtered_status_result = self.filter_video_status(video_ids=filtered_pre_result)
+        filtered_status_result = await self.filter_video_status(video_ids=filtered_pre_result)
         et_status = time.time()
         log_.info('filter by video status: result = {}, execute time = {}ms'.format(
             filtered_status_result, (et_status - st_status) * 1000))
@@ -112,7 +113,7 @@ class FilterVideos(object):
 
         # 视频已曝光过滤
         st_viewed = time.time()
-        filtered_viewed_result = self.filter_video_viewed(video_ids=filtered_status_result)
+        filtered_viewed_result = await self.filter_video_viewed(video_ids=filtered_status_result)
         et_viewed = time.time()
         log_.info('filter by viewed: app_type = {}, mid = {}, uid = {}, result = {}, execute time = {}ms'.format(
             self.app_type, self.mid, self.uid, filtered_viewed_result, (et_viewed - st_viewed) * 1000))
@@ -121,24 +122,23 @@ class FilterVideos(object):
         else:
             return filtered_viewed_result
 
-    def filter_video_previewed(self, video_ids):
+    async def filter_video_previewed(self, video_ids):
         """
         预曝光过滤
         :param video_ids: 需过滤的视频列表 type-list
         :return: filtered_videos  过滤后的列表  type-list
         """
         # 根据Redis缓存中的数据过滤
-        redis_helper = RedisHelper()
         # key拼接
         key_name = config_.PREVIEW_KEY_PREFIX + '{}.{}'.format(self.app_type, self.mid)
-        pe_videos_list = redis_helper.get_data_from_set(key_name)
+        pe_videos_list = await aiocache.get_data_from_set(key_name)
         if not pe_videos_list:
             return video_ids
         pe_videos = [eval(video) for video in pe_videos_list]
         filtered_videos = [video_id for video_id in video_ids if video_id not in pe_videos]
         return filtered_videos
 
-    def filter_video_status(self, video_ids):
+    async def filter_video_status(self, video_ids):
         """
         对视频状态进行过滤
         :param video_ids: 视频id列表 type-list
@@ -167,12 +167,11 @@ class FilterVideos(object):
                   "AND transcoding_status = 3 " \
                   "AND video_id IN {};".format(config_.VIDEO_STATUS, tuple(video_ids))
 
-        hologres_helper = HologresHelper()
-        data = hologres_helper.get_data(sql=sql)
+        data = await aiocache.pg_getdata(sql=sql)
         filtered_videos = [temp[0] for temp in data]
         return filtered_videos
 
-    def filter_video_viewed(self, video_ids, types=(1,)):
+    async def filter_video_viewed(self, video_ids, types=(1,)):
         """
         调用后端接口过滤用户已观看视频
         :param video_ids: 视频id列表 type-list
@@ -185,7 +184,7 @@ class FilterVideos(object):
                         "uid": self.uid,
                         "types": list(types),
                         "videoIds": video_ids}
-        result = request_post(request_url=config_.VIDEO_FILTER_URL, request_data=request_data, timeout=2.5)
+        result = await request_post(url=config_.VIDEO_FILTER_URL, data=request_data, timeout=2.5)
 
         if result is None:
             log_.info('过滤失败,types: {}'.format(types))

+ 5 - 6
video_rank.py

@@ -4,7 +4,6 @@ import numpy
 from log import Log
 from config import set_config
 from video_recall import PoolRecall
-from db_helper import RedisHelper
 from utils import FilterVideos
 
 log_ = Log()
@@ -22,6 +21,7 @@ def video_rank(data, size):
         return None
     # 将各路召回的视频按照score从大到小排序
     # ROV召回池
+    print('---------a',data['rov_pool_recall'])
     rov_recall_rank = sorted(data['rov_pool_recall'], key=lambda k: (k.get('rovScore'), 0), reverse=True)
     # 流量池
     flow_recall_rank = sorted(data['flow_pool_recall'], key=lambda k: (k.get('rovScore'), 0), reverse=True)
@@ -93,7 +93,7 @@ def remove_duplicate(rov_recall, flow_recall):
     return rov_recall, flow_recall_result
 
 
-def bottom_strategy(size, app_type, ab_code):
+async def bottom_strategy(size, app_type, ab_code):
     """
     兜底策略: 从ROV召回池中获取top1000,进行状态过滤后的视频
     :param size: 需要获取的视频数
@@ -104,18 +104,17 @@ def bottom_strategy(size, app_type, ab_code):
     :return:
     """
     pool_recall = PoolRecall(app_type=app_type, ab_code=ab_code)
-    key_name, _ = pool_recall.get_pool_redis_key(pool_type='rov')
+    key_name, _ = await pool_recall.get_pool_redis_key(pool_type='rov')
     if not key_name:
         log_.info('bottom strategy no data!')
         return []
-    redis_helper = RedisHelper()
-    data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=1000)
+    data = await aiocache.get_data_zset_with_index(key_name=key_name, start=0, end=1000)
     if not data:
         log_.info('bottom strategy no data!')
         return []
     # 状态过滤
     filter_videos = FilterVideos(app_type=app_type, video_ids=data)
-    filtered_data = filter_videos.filter_video_status(video_ids=data)
+    filtered_data = await filter_videos.filter_video_status(video_ids=data)
     if len(filtered_data) > size:
         random_data = numpy.random.choice(filtered_data, size, False)
     else:

+ 26 - 26
video_recall.py

@@ -2,9 +2,9 @@ import time
 
 from datetime import date, timedelta, datetime
 from log import Log
-from db_helper import RedisHelper
 from config import set_config
 from utils import FilterVideos, get_videos_remain_view_count, get_videos_local_distribute_count
+import aiocache
 
 log_ = Log()
 config_ = set_config()
@@ -24,13 +24,12 @@ class PoolRecall(object):
         self.mid = mid
         self.uid = uid
         self.ab_code = ab_code
-        self.redis_helper = RedisHelper()
 
-    def rov_pool_recall(self, size=10):
+    async def rov_pool_recall(self, size=10):
         """从ROV召回池中获取视频"""
         log_.info('====== rov pool recall')
         # 获取相关redis key, 用户上一次在rov召回池对应的位置
-        rov_pool_key, last_rov_recall_key, idx = self.get_video_last_idx()
+        rov_pool_key, last_rov_recall_key, idx = await self.get_video_last_idx()
         if not rov_pool_key:
             log_.info('ROV召回池中无视频')
             return []
@@ -45,7 +44,7 @@ class PoolRecall(object):
                 break
             # 获取数据
             st_get = time.time()
-            data = self.redis_helper.get_data_zset_with_index(key_name=rov_pool_key,
+            data = await aiocache.get_data_zset_with_index(key_name=rov_pool_key,
                                                               start=idx, end=idx + get_size - 1,
                                                               with_scores=True)
             et_get = time.time()
@@ -62,7 +61,7 @@ class PoolRecall(object):
                 video_score[eval(value[0])] = value[1]
             # 过滤
             filter_ = FilterVideos(app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
-            filtered_result = filter_.filter_videos()
+            filtered_result = await filter_.filter_videos()
             if filtered_result:
                 # 添加视频源参数 pushFrom, abCode
                 temp_result = [{'videoId': item, 'rovScore': video_score[item],
@@ -71,14 +70,14 @@ class PoolRecall(object):
                 rov_pool_recall_result.extend(temp_result)
             else:
                 # 将此次获取的末位视频id同步刷新到Redis中,方便下次快速定位到召回位置,过期时间为1天
-                self.redis_helper.set_data_to_redis(key_name=last_rov_recall_key, value=data[-1][0])
+                await aiocache.set_data_to_redis(key_name=last_rov_recall_key, value=data[-1][0])
             idx += get_size
         return rov_pool_recall_result[:size]
 
-    def flow_pool_recall(self, size=10):
+    async def flow_pool_recall(self, size=10):
         """从流量池中获取视频"""
         log_.info('====== flow pool recall')
-        flow_pool_key = self.get_pool_redis_key('flow')
+        flow_pool_key = await self.get_pool_redis_key('flow')
         flow_pool_recall_result = []
         flow_pool_recall_videos = []
         # 每次获取的视频数
@@ -90,7 +89,7 @@ class PoolRecall(object):
             freq += 1
             # 获取数据
             st_get = time.time()
-            data = self.redis_helper.get_data_zset_with_index(key_name=flow_pool_key,
+            data = await aiocache.get_data_zset_with_index(key_name=flow_pool_key,
                                                               start=idx, end=idx + get_size - 1,
                                                               with_scores=True)
             et_get = time.time()
@@ -104,7 +103,8 @@ class PoolRecall(object):
             video_mapping = {}
             video_score = {}
             for value in data:
-                video_id, flow_pool = value[0].split('-')
+                value0 = str(value[0], 'utf-8')
+                video_id, flow_pool = value0.split('-')
                 video_id = eval(video_id)
                 if video_id not in video_ids:
                     video_ids.append(video_id)
@@ -115,11 +115,11 @@ class PoolRecall(object):
                     video_mapping[video_id].append(flow_pool)
             # 过滤
             filter_ = FilterVideos(app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
-            filtered_result = filter_.filter_videos()
+            filtered_result = await filter_.filter_videos()
             # 检查可分发数
             if filtered_result:
                 st_check = time.time()
-                check_result = self.check_video_counts(video_ids=filtered_result, flow_pool_mapping=video_mapping)
+                check_result = await self.check_video_counts(video_ids=filtered_result, flow_pool_mapping=video_mapping)
                 for item in check_result:
                     if item[0] not in flow_pool_recall_videos:
                         # 取其中一个 flow_pool 作为召回结果
@@ -136,22 +136,22 @@ class PoolRecall(object):
 
         return flow_pool_recall_result[:size]
 
-    def check_video_counts(self, video_ids, flow_pool_mapping):
+    async def check_video_counts(self, video_ids, flow_pool_mapping):
         """
         检查视频剩余可分发数
         :param video_ids: 视频id type-list
         :param flow_pool_mapping: 视频id-流量池标记mapping, type-dict
         :return:
         """
-        flow_pool_key = self.get_pool_redis_key('flow')
+        flow_pool_key = await self.get_pool_redis_key('flow')
         videos = []
         for video_id in video_ids:
             for flow_pool in flow_pool_mapping[video_id]:
                 videos.append({'videoId': video_id, 'flowPool': flow_pool})
-        view_count_result = get_videos_remain_view_count(app_type=self.app_type, videos=videos)
+        view_count_result = await get_videos_remain_view_count(app_type=self.app_type, videos=videos)
         log_.info('view_count_result = {}'.format(view_count_result))
         if not view_count_result:
-            return None
+            return []
         check_result = []
         for item in view_count_result:
             if item[2] > 0:
@@ -163,15 +163,15 @@ class PoolRecall(object):
                 # cur_count <= 0,从流量召回池移除
                 else:
                     value = '{}-{}'.format(item[0], item[1])
-                    self.redis_helper.remove_value_from_zset(key_name=flow_pool_key, value=value)
+                    await aiocache.remove_value_from_zset(key_name=flow_pool_key, value=value)
             else:
                 # viewCount <= 0
                 # 从流量召回池移除
                 value = '{}-{}'.format(item[0], item[1])
-                self.redis_helper.remove_value_from_zset(key_name=flow_pool_key, value=value)
+                await aiocache.remove_value_from_zset(key_name=flow_pool_key, value=value)
         return check_result
 
-    def get_pool_redis_key(self, pool_type):
+    async def get_pool_redis_key(self, pool_type):
         """
         拼接key
         :param pool_type: type-string {'rov': rov召回池, 'flow': 流量池}
@@ -184,7 +184,7 @@ class PoolRecall(object):
                 redis_date = datetime.now().hour
                 # 判断热度列表是否更新,未更新则使用前一小时的热度列表
                 key_name = '{}{}.{}'.format(config_.RECALL_KEY_NAME_PREFIX_APP_TYPE, self.app_type, redis_date)
-                if self.redis_helper.key_exists(key_name):
+                if await aiocache.key_exists(key_name):
                     return key_name, redis_date
                 else:
                     key_name = '{}{}.{}'.format(config_.RECALL_KEY_NAME_PREFIX_APP_TYPE, self.app_type, redis_date - 1)
@@ -193,7 +193,7 @@ class PoolRecall(object):
             else:
                 # 判断热度列表是否更新,未更新则使用前一天的热度列表
                 key_name = config_.RECALL_KEY_NAME_PREFIX + time.strftime('%Y%m%d')
-                if self.redis_helper.key_exists(key_name):
+                if await aiocache.key_exists(key_name):
                     redis_date = date.today().strftime('%Y%m%d')
                 else:
                     redis_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d')
@@ -209,16 +209,16 @@ class PoolRecall(object):
             log_.error('pool type error')
             return None, None
 
-    def get_video_last_idx(self):
+    async def get_video_last_idx(self):
         """获取用户上一次在rov召回池对应的位置"""
-        rov_pool_key, redis_date = self.get_pool_redis_key('rov')
+        rov_pool_key, redis_date = await self.get_pool_redis_key('rov')
         if not rov_pool_key:
             return None, None, None
         last_rov_recall_key = config_.LAST_VIDEO_FROM_ROV_POOL_PREFIX + '{}.{}.{}'.format(
             self.app_type, self.mid, redis_date)
-        value = self.redis_helper.get_data_from_redis(last_rov_recall_key)
+        value = await aiocache.get_data_from_redis(last_rov_recall_key)
         if value:
-            idx = self.redis_helper.get_index_with_data(rov_pool_key, value)
+            idx = await aiocache.get_index_with_data(rov_pool_key, value)
             if not idx:
                 idx = 0
             else:

+ 0 - 0
views/__init__.py


+ 43 - 0
views/app_hot_list.py

@@ -0,0 +1,43 @@
+import sys
+sys.path.append("..") 
+import json
+import ast
+import aiocache
+import traceback
+from log import Log
+
+log_ = Log()
+
+class AppHotlist:
+    async def on_post(self, req, resp):
+        try:
+            page_size= 10
+            request_data = await req.stream.read()
+            request_data = json.loads(request_data)
+            page = request_data.get('page', 0)
+            log_.info('request data: {}'.format(request_data))
+            datas = await aiocache.get('app_video_hot_list')
+            if datas is None or len(datas) == 0:
+                result = {'code': -1, 'message': 'no data'}
+                log_.info('result: {}'.format(result))
+                resp.text = json.dumps(result)
+                return 
+            # datas = ast.literal.eval(datas)
+            datas = eval(datas)
+            total_page = int(len(datas)/page_size)
+            if page > total_page -1 :
+                result = {'code': -1, 'message': 'page exceed max'}
+                log_.info('result: {}'.format(result))
+                resp.text = json.dumps(result)
+                return
+
+            result = {'code': 200, 'message': '', 'data':{'total_page':total_page, 'hot_list':datas[page*page_size:page*page_size+page_size]}}
+            log_.info('result: {}'.format(result))
+            resp.text = json.dumps(result)
+            return
+        except:
+            print(traceback.format_exc())
+            log_.error(e)
+            result = {'code': -1, 'message': 'fail'}
+            resp.text = json.dumps(result)
+            return 

+ 62 - 0
views/homepage_rs.py

@@ -0,0 +1,62 @@
+import sys
+sys.path.append("..") 
+import recommend
+import category
+import json
+import ast
+import aiocache
+import traceback
+from log import Log
+from config import set_config
+import time
+
+log_ = Log()
+config_ = set_config()
+
+class HomepageRecommend:
+    async def on_post(self, req, resp):
+        try:
+            start_time = time.time()
+            
+            request_data = await req.stream.read()
+            request_data = json.loads(request_data)
+            mid = request_data.get('mid')
+            uid = request_data.get('uid')
+            category_id = request_data.get('categoryId')
+            size = request_data.get('size')
+            app_type = request_data.get('appType')
+            algo_type = request_data.get('algoType')
+            log_.info('request data: {}'.format(request_data))
+            # size默认为10
+            if not size:
+                size = 10
+            if category_id in config_.CATEGORY['recommend']:
+                # 推荐
+                videos = await recommend.video_recommend(mid=mid, uid=uid, size=size, app_type=app_type, algo_type=algo_type)
+                result = {'code': 200, 'message': 'success', 'data': {'videos': videos}}
+                log_.info('category_id: {}, mid: {}, uid: {}, result: {}, execute time = {}ms'.format(
+                    category_id, mid, uid, result, (time.time() - start_time)*1000))
+                
+                resp.text = json.dumps(result)
+                return 
+            elif category_id in config_.CATEGORY['other']:
+                # 其他类别
+                videos = category.get_category_videos()
+                result = {'code': 200, 'message': 'success', 'data': {'videos': videos}}
+                log_.info('category_id: {}, mid: {}, uid: {}, result: {}, execute time = {}ms'.format(
+                    category_id, mid, uid, result, (time.time() - start_time) * 1000))
+                
+                resp.text = json.dumps(result)
+                return
+            else:
+                log_.error('categoryId error, categoryId = {}'.format(category_id))
+                result = {'code': -1, 'message': 'fail'}
+            
+                resp.text = json.dumps(result)
+                return    
+        except:
+            traceback.print_exc()
+            log_.error(e)
+            result = {'code': -1, 'message': traceback.format_exc()}
+            resp.text = json.dumps(result)
+            return