liqian 3 лет назад
Родитель
Сommit
c9867a72ca
6 измененных файлов с 486 добавлено и 0 удалено
  1. 62 0
      app.py
  2. 2 0
      category.py
  3. 65 0
      config.py
  4. 225 0
      db_helper.py
  5. 37 0
      recommend.py
  6. 95 0
      utils.py

+ 62 - 0
app.py

@@ -0,0 +1,62 @@
+import json
+from flask import Flask, request
+from log import Log
+from config import set_config
+from recommend import video_recommend
+from category import get_category_videos
+
+
+app = Flask(__name__)
+log_ = Log()
+config_ = set_config()
+
+
+@app.route('/healthcheck')
+def health_check():
+    return 'ok!'
+
+
+# 首页推荐及tab分类
+@app.route('/applet/video/homepage/recommend', methods=['GET', 'POST'])
+def homepage_recommend():
+    try:
+        request_data = json.loads(request.get_data())
+        mid = request_data.get('mid')
+        uid = request_data.get('uid')
+        category_id = request_data.get('categoryId')
+        size = request_data.get('size')
+        app_type = request_data.get('appType')
+        algo_type = request_data.get('algoType')
+        log_.info('request data: {}'.format(request_data))
+        # size默认为10
+        if not size:
+            size = 10
+        if category_id in config_.CATEGORY['recommend']:
+            # 推荐
+            videos = video_recommend()
+            result = {'code': 200, 'message': 'success', 'data': {'videos': videos}}
+            return json.dumps(result)
+        elif category_id in config_.CATEGORY['other']:
+            # 其他类别
+            videos = get_category_videos()
+            result = {'code': 200, 'message': 'success', 'data': {'videos': videos}}
+            return json.dumps(result)
+        else:
+            log_.error('categoryId error, categoryId = {}'.format(category_id))
+            result = {'code': -1, 'message': 'fail'}
+            return json.dumps(result)
+
+    except Exception as e:
+        log_.error(e)
+        result = {'code': -1, 'message': 'fail'}
+        return json.dumps(result)
+
+
+# 管理后台实时修改rov
+@app.route('/applet/video/update/rov', methods=['GET', 'POST'])
+def update_rov():
+    pass
+
+
+if __name__ == '__main__':
+    app.run()

+ 2 - 0
category.py

@@ -0,0 +1,2 @@
+def get_category_videos():
+    return None

+ 65 - 0
config.py

@@ -0,0 +1,65 @@
+class BaseConfig(object):
+    # category id mapping
+    CATEGORY = {
+        'recommend': [0],  # 推荐
+        'other': [1, 2, 3]  # 其他类别
+    }
+    # 前k个从ROV召回池中获取视频
+    K = 3
+    # 从流量池获取视频的概率设置
+    P = 0.5
+    # ROV召回池redis key前缀,完整格式:com.weiqu.video.recall.hot.item.score.{date}
+    RECALL_KEY_NAME_PREFIX = 'com.weiqu.video.recall.hot.item.score.'
+    # 流量池redis key
+    FLOW_POOL_KEY_NAME = 'com.weiqu.video.flowpool.hot.item.score'
+    # 首页推荐预曝光列表redis key 前缀,完整key格式:PSEUDO_EXPOSURE_KEY_PREFIX.{appType}.{mid}
+    PSEUDO_EXPOSURE_KEY_PREFIX = 'com.weiqu.video.hot.recommend.pseudo.exposure.'
+
+
+class DevelopmentConfig(BaseConfig):
+    """测试环境配置"""
+    # 测试环境redis地址
+    REDIS_INFO = {
+        'host': 'r-bp1ps6my7lzg8rdhwx682.redis.rds.aliyuncs.com',
+        'port': 6379,
+        'password': 'Wqsd@2019',
+    }
+
+    # Hologres连接参数,本地使用
+    HOLOGRES_INFO = {
+        'host': 'hgprecn-cn-7pp28y18c00c-cn-hangzhou.hologres.aliyuncs.com',
+        'port': 80,
+        'dbname': 'dssm',
+        'user': 'LTAI5tMPqPy9yboQAf1mBCCN',
+        'password': '4BEcOgxREOPq7t3A7EWkjciVULeQGj'
+    }
+
+    # Hologres视频状态存储表名
+    VIDEO_STATUS = 'longvideo_test.dwd_mdm_item_video_stat'
+
+    # 用户已观看视频过滤 & 视频审核条件过滤 & 是否进入老年人社区过滤 & 话题状态过滤 接口地址
+    # 参数types: 1-已观看 2-视频状态 3-是否进入老年人社区过滤 4-话题状态
+    VIDEO_FILTER_URL = 'http://videotest-internal.yishihui.com/longvideoapi/openapi/recommend/filterVideos'
+
+
+class TestConfig(BaseConfig):
+    """测试环境配置"""
+    # 测试环境redis地址
+    REDIS_INFO = {
+        'host': 'r-bp1ps6my7lzg8rdhwx682.redis.rds.aliyuncs.com',
+        'port': 6379,
+        'password': 'Wqsd@2019',
+    }
+
+    # Hologres连接参数,服务器使用
+    HOLOGRES_INFO = {
+        'host': 'hgprecn-cn-7pp28y18c00c-cn-hangzhou-vpc.hologres.aliyuncs.com',
+        'port': 80,
+        'dbname': 'dssm',
+        'user': 'LTAI5tMPqPy9yboQAf1mBCCN',
+        'password': '4BEcOgxREOPq7t3A7EWkjciVULeQGj'
+    }
+
+
+def set_config():
+    return DevelopmentConfig()

+ 225 - 0
db_helper.py

@@ -0,0 +1,225 @@
+import redis
+import psycopg2
+from config import set_config
+from log import Log
+
+config_ = set_config()
+log = Log()
+
+
+class RedisHelper(object):
+    def __init__(self):
+        """
+        初始化redis连接信息
+        redis_info: redis连接信息, 格式:dict, {'host': '', 'port': '', 'password': ''}
+        """
+        redis_info = config_.REDIS_INFO
+        self.host = redis_info['host']
+        self.port = redis_info['port']
+        self.password = redis_info['password']
+
+    def connect(self):
+        """
+        连接redis
+        :return: conn
+        """
+        pool = redis.ConnectionPool(host=self.host,
+                                    port=self.port,
+                                    password=self.password,
+                                    decode_responses=True)
+        conn = redis.Redis(connection_pool=pool)
+        return conn
+
+    def key_exists(self, key_name):
+        """
+        判断key是否存在
+        :param key_name: key
+        :return: 存在-True, 不存在-False
+        """
+        conn = self.connect()
+        return conn.exists(key_name)
+
+    def del_keys(self, key_name):
+        """
+        删除key
+        :param key_name: key
+        :return: None
+        """
+        conn = self.connect()
+        conn.delete(key_name)
+
+    def get_data_from_redis(self, key_name):
+        """
+        读取redis中的数据
+        :param key_name: key
+        :return: data
+        """
+        conn = self.connect()
+        if not conn.exists(key_name):
+            # key不存在
+            return None
+        data = conn.get(key_name)
+        return data
+
+    def set_data_to_redis(self, key_name, value, expire_time=24*3600):
+        """
+        新增数据
+        :param key_name: key
+        :param value: 元素的值 videoId
+        :param expire_time: 过期时间,单位:s,默认1天
+        :return: None
+        """
+        conn = self.connect()
+        conn.set(key_name, value, ex=expire_time)
+
+    def add_data_with_zset(self, key_name, data, expire_time=7*24*3600):
+        """
+        新增数据,有序set
+        :param key_name: key
+        :param data: 元素的值及对应分数 type-dict  {value: score}
+        :param expire_time: 过期时间,单位:s,默认7天
+        :return: None
+        """
+        conn = self.connect()
+        conn.zadd(key_name, data)
+        # 设置过期时间
+        conn.expire(key_name, expire_time)
+
+    def get_data_zset_with_index(self, key_name, start, end, desc=True, with_scores=False):
+        """
+        根据索引位置获取元素的值
+        :param key_name: key
+        :param start: 索引起始点 闭区间,包含start
+        :param end: 索引结束点 闭区间,包含end
+        :param desc: 分数排序方式,默认从大到小
+        :param with_scores: 是否获取元素的分数,默认 False,只获取元素的值
+        :return: data 元素值列表(不包含分数),value(videoId)类型转换为int, 包含分数时不进行类型转换
+        """
+        conn = self.connect()
+        if not conn.exists(key_name):
+            return None
+        data = conn.zrange(key_name, start, end, desc, with_scores)
+        if with_scores:
+            return data
+        else:
+            return [eval(value) for value in data]
+
+    def get_score_with_value(self, key_name, value):
+        """
+        在zset中,根据元素的value获取对应的score
+        :param key_name: key
+        :param value: 元素的值
+        :return: score value对应的score
+        """
+        conn = self.connect()
+        return conn.zscore(key_name, value)
+
+    def update_score_with_value(self, key_name, value, score, expire_time=7*24*3600):
+        """
+        在zset中,修改元素value对应的score
+        :param key_name: key
+        :param value: 元素的值
+        :param score: value对应的score更新值
+        :param expire_time: 过期时间,单位:s,默认7天
+        """
+        conn = self.connect()
+        if conn.exists(key_name):
+            conn.zadd(key_name, {value: score})
+        else:
+            # key不存在时,需设置过期时间
+            conn.zadd(key_name, {value: score})
+            conn.expire(key_name, expire_time)
+
+    def remove_value_from_zset(self, key_name, value):
+        """
+        删除zset中的指定元素
+        :param key_name: key
+        :param value: 元素的值
+        :return: None
+        """
+        conn = self.connect()
+        conn.zrem(key_name, value)
+
+    def get_index_with_data(self, key_name, value):
+        """
+        根据元素的值获取在有序set中的位置,按照分数倒序(从大到小)
+        :param key_name: key
+        :param value: 元素的值
+        :return: idx 位置索引
+        """
+        conn = self.connect()
+        return conn.zrevrank(key_name, value)
+
+    def get_data_from_set(self, key_name):
+        """
+        获取set中的所有数据
+        :param key_name: key
+        :return: data
+        """
+        conn = self.connect()
+        if not conn.exists(key_name):
+            # key不存在
+            return None
+        data = conn.sscan(key_name)
+        return data[1]
+
+    def add_data_with_set(self, key_name, values, expire_time=30*60):
+        """
+        新增数据,set
+        :param key_name: key
+        :param values: 要添加的元素  类型-set
+        :param expire_time: 过期时间,单位:s,默认0.5小时
+        :return: None
+        """
+        conn = self.connect()
+        conn.sadd(key_name, *values)
+        # 设置过期时间
+        conn.expire(key_name, expire_time)
+
+    def data_exists_with_set(self, key_name, value):
+        """
+        判断元素value是否在集合key_name中
+        :param key_name: key
+        :param value: 需判断的元素
+        :return: 存在-True, 不存在-False
+        """
+        conn = self.connect()
+        return conn.sismember(key_name, value)
+
+    def remove_value_from_set(self, key_name, values):
+        """
+        删除set中的指定元素
+        :param key_name: key
+        :param values: 元素的值, 类型-set
+        :return: None
+        """
+        conn = self.connect()
+        conn.srem(key_name, *values)
+
+
+class HologresHelper(object):
+    def __init__(self):
+        """初始化hologres连接信息"""
+        self.hologres_info = config_.HOLOGRES_INFO
+
+    def get_data(self, sql):
+        # 连接Hologres
+        conn = psycopg2.connect(**self.hologres_info)
+        # 创建游标
+        cur = conn.cursor()
+        # 查询数据
+        cur.execute(sql)
+        data = cur.fetchall()
+        # 提交事务
+        conn.commit()
+        # 释放资源
+        cur.close()
+        conn.close()
+        return data
+
+
+if __name__ == '__main__':
+    redis_helper = RedisHelper()
+    key = 'com.weiqu.video.hot.recommend.item.score.20210901'
+    res = redis_helper.get_score_with_value(key, 90797)
+    print(res)

+ 37 - 0
recommend.py

@@ -0,0 +1,37 @@
+from log import Log
+from db_helper import RedisHelper
+from config import set_config
+
+log_ = Log()
+config_ = set_config()
+
+
+def rov_pool_recall(mid, uid, app_type, size):
+    """
+    从ROV召回池中获取视频
+    :param mid: mid type-string
+    :param uid: uid type-string
+    :param app_type: 产品标识 type-int
+    :param size: 获取数量 type-int
+    :return: rov_pool_recall_result
+    """
+
+
+
+
+def video_recommend(mid, uid, size, app_type, algo_type):
+    """
+    首页线上推荐逻辑
+    :param mid: mid type-string
+    :param uid: uid type-string
+    :param size: 请求视频数量 type-int
+    :param app_type: 产品标识  type-int
+    :param algo_type: 算法类型  type-string
+    :return:
+    """
+    # 1. 从ROV召回池中获取 size 个视频, 过滤
+
+    # 2. 从流量池中获取 size-K 个视频,过滤,剩余可分发数 > 0
+
+    # 3. 排序,topK 召回池视频,size-K 按概率 P 从流量池中获取视频
+

+ 95 - 0
utils.py

@@ -0,0 +1,95 @@
+import requests
+import json
+import time
+
+from db_helper import HologresHelper, RedisHelper
+from config import set_config
+from log import Log
+
+config_ = set_config()
+log_ = Log()
+
+
+def request_post(request_url, request_data):
+    """
+    post 请求 HTTP接口
+    :param request_url: 接口URL
+    :param request_data: 请求参数
+    :return: res_data json格式
+    """
+    response = requests.post(url=request_url, json=request_data)
+    if response.status_code == 200:
+        res_data = json.loads(response.text)
+        return res_data
+
+
+def filter_by_pseudo_exposure(app_type, mid, video_ids):
+    """
+    伪曝光过滤
+    :param app_type: 产品标识 type-int
+    :param mid: 设备id type-string
+    :param video_ids: 需过滤的视频列表 type-list
+    :return: filtered_videos  过滤后的列表  type-list
+    """
+    # 根据Redis缓存中的数据过滤
+    redis_helper = RedisHelper()
+    # key拼接
+    key_name = '{}.{}.{}'.format(config_.PSEUDO_EXPOSURE_KEY_PREFIX, app_type, mid)
+    pe_videos_list = redis_helper.get_data_from_set(key_name)
+    if not pe_videos_list:
+        return video_ids
+    pe_videos = [eval(video) for video in pe_videos_list]
+    filtered_videos = [video_id for video_id in video_ids if video_id not in pe_videos]
+    return filtered_videos
+
+
+def filter_video_status(video_ids):
+    """
+    对视频状态进行过滤
+    :param video_ids: 视频id列表 type-list
+    :return: filtered_videos
+    """
+    st = time.time()
+    sql = "SELECT  video_id" \
+          "FROM    {}" \
+          "WHERE   audit_status = 5" \
+          "AND     applet_rec_status IN (1, 6)" \
+          "-- AND     open_status = 1" \
+          "-- AND     payment_status = 0" \
+          "-- AND     encryption_status != 5" \
+          "-- AND     transcoding_status = 3" \
+          "AND     video_id IN {};".format(config_.VIDEO_STATUS, tuple(video_ids))
+    hologres_helper = HologresHelper()
+    data = hologres_helper.get_data(sql=sql)
+    filtered_videos = [temp[0] for temp in data]
+    et = time.time()
+    log_.info('filter video status finished! filtered_videos = {}, execute time = {}ms'.format(
+        filtered_videos, (et - st)*1000))
+
+
+def filter_video_viewed(app_type, mid, uid, video_ids, types=(1,)):
+    """
+    调用后端接口过滤用户已观看视频
+    :param app_type: 产品标识 type-int
+    :param mid: mid type-string
+    :param uid: uid type-string
+    :param video_ids: 视频id列表 type-list
+    :param types: 过滤参数 type-tuple, 默认(1, )  1-已观看 2-视频状态 3-是否进入老年人社区 4-话题状态 5-推荐状态
+    :return: filtered_videos
+    """
+    # 调用http接口
+    request_data = {"appType": app_type,
+                    "mid": mid,
+                    "uid": uid,
+                    "types": list(types),
+                    "videoIds": video_ids}
+    result = request_post(request_url=config_.VIDEO_FILTER_URL, request_data=request_data)
+    if result['code'] != 0:
+        log_.info('过滤失败,types: {}'.format(types))
+        return video_ids
+    filtered_videos = result['data']
+    return filtered_videos
+
+
+if __name__ == '__main__':
+    filter_video_status([1, 2, 3, 5978661])