liqian hace 3 años
padre
commit
6bc8f85f8e
Se han modificado 6 ficheros con 422 adiciones y 27 borrados
  1. 62 1
      config.py
  2. 225 0
      db_helper.py
  3. 58 5
      pool_predict.py
  4. 14 14
      process_feature.py
  5. 40 7
      rov_train.py
  6. 23 0
      utils.py

+ 62 - 1
config.py

@@ -29,30 +29,91 @@ class BaseConfig(object):
     # 模型存放文件
     MODEL_FILENAME = 'model.pickle'
 
+    # 小程序离线ROV模型结果存放 redis key前缀,完整格式:com.weiqu.video.recall.hot.item.score.{date}
+    RECALL_KEY_NAME_PREFIX = 'com.weiqu.video.recall.hot.item.score.'
+    # 流量池离线模型结果存放 redis key
+    FLOWPOOL_KEY_NAME = 'com.weiqu.video.flowpool.hot.item.score'
+
 
 class TestConfig(BaseConfig):
     """测试环境配置"""
+    # 测试环境redis地址
+    REDIS_INFO = {
+        'host': 'r-bp1ps6my7lzg8rdhwx682.redis.rds.aliyuncs.com',
+        'port': 6379,
+        'password': 'Wqsd@2019',
+    }
+
+    # Hologres连接参数,服务器使用
+    HOLOGRES_INFO = {
+        'host': 'hgprecn-cn-7pp28y18c00c-cn-hangzhou-vpc.hologres.aliyuncs.com',
+        'port': 80,
+        'dbname': 'dssm',
+        'user': 'LTAI5tMPqPy9yboQAf1mBCCN',
+        'password': '4BEcOgxREOPq7t3A7EWkjciVULeQGj'
+    }
+
     # 从流量池获取视频接口地址
     GET_VIDEOS_FROM_POOL_URL = 'http://testapi-internal.piaoquantv.com/flowpool/video/getAllVideo'
     # 获取视频在流量池中的剩余可分发数接口地址
     GET_REMAIN_VIEW_COUNT_URL = 'http://testapi-internal.piaoquantv.com/flowpool/video/remainViewCount'
+    # 计算完ROV通知后端接口地址
+    NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL = 'http://speedtest.wx.com/longvideoapi/openapi/recommend/updateRovScore'
 
 
 class PreProductionConfig(BaseConfig):
     """预发布环境配置"""
+    # redis地址
+    REDIS_INFO = {
+        'host': 'r-bp1fogs2mflr1ybfot.redis.rds.aliyuncs.com',
+        'port': 6379,
+        'password': 'Wqsd@2019',
+    }
+
+    # Hologres连接参数,服务器使用
+    HOLOGRES_INFO = {
+        'host': 'hgprecn-cn-7pp28y18c00c-cn-hangzhou-vpc.hologres.aliyuncs.com',
+        'port': 80,
+        'dbname': 'dssm',
+        'user': 'LTAI5tMPqPy9yboQAf1mBCCN',
+        'password': '4BEcOgxREOPq7t3A7EWkjciVULeQGj'
+    }
+
     # 从流量池获取视频接口地址
     GET_VIDEOS_FROM_POOL_URL = 'http://preapi-internal.piaoquantv.com/flowpool/video/getAllVideo'
     # 获取视频在流量池中的剩余可分发数接口地址
     GET_REMAIN_VIEW_COUNT_URL = 'http://preapi-internal.piaoquantv.com/flowpool/video/remainViewCount'
+    # 计算完ROV通知后端接口地址
+    NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL = 'http://speedpre.wx.com/longvideoapi/openapi/recommend/updateRovScore'
 
 
 class ProductionConfig(BaseConfig):
-    """预发布环境配置"""
+    """生产环境配置"""
+    # 线上环境redis地址
+    REDIS_INFO = {
+        'host': 'r-bp1fogs2mflr1ybfot.redis.rds.aliyuncs.com',
+        'port': 6379,
+        'password': 'Wqsd@2019',
+    }
+
+    # Hologres连接参数,服务器使用
+    HOLOGRES_INFO = {
+        'host': 'hgprecn-cn-7pp28y18c00c-cn-hangzhou-vpc.hologres.aliyuncs.com',
+        'port': 80,
+        'dbname': 'dssm',
+        'user': 'LTAI5tMPqPy9yboQAf1mBCCN',
+        'password': '4BEcOgxREOPq7t3A7EWkjciVULeQGj'
+    }
+
     # 从流量池获取视频接口地址
     GET_VIDEOS_FROM_POOL_URL = 'http://api-internal.piaoquantv.com/flowpool/video/getAllVideo'
     # 获取视频在流量池中的剩余可分发数接口地址
     GET_REMAIN_VIEW_COUNT_URL = 'http://api-internal.piaoquantv.com/flowpool/video/remainViewCount'
+    # 计算完ROV通知后端接口地址
+    NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL = 'http://speed.wx.com/longvideoapi/openapi/recommend/updateRovScore'
 
 
 def set_config():
     return TestConfig()
+    # return PreProductionConfig()
+    # return ProductionConfig()

+ 225 - 0
db_helper.py

@@ -0,0 +1,225 @@
+import redis
+import psycopg2
+from config import set_config
+from log import Log
+
+config_ = set_config()
+log = Log()
+
+
+class RedisHelper(object):
+    def __init__(self):
+        """
+        初始化redis连接信息
+        redis_info: redis连接信息, 格式:dict, {'host': '', 'port': '', 'password': ''}
+        """
+        redis_info = config_.REDIS_INFO
+        self.host = redis_info['host']
+        self.port = redis_info['port']
+        self.password = redis_info['password']
+
+    def connect(self):
+        """
+        连接redis
+        :return: conn
+        """
+        pool = redis.ConnectionPool(host=self.host,
+                                    port=self.port,
+                                    password=self.password,
+                                    decode_responses=True)
+        conn = redis.Redis(connection_pool=pool)
+        return conn
+
+    def key_exists(self, key_name):
+        """
+        判断key是否存在
+        :param key_name: key
+        :return: 存在-True, 不存在-False
+        """
+        conn = self.connect()
+        return conn.exists(key_name)
+
+    def del_keys(self, key_name):
+        """
+        删除key
+        :param key_name: key
+        :return: None
+        """
+        conn = self.connect()
+        conn.delete(key_name)
+
+    def get_data_from_redis(self, key_name):
+        """
+        读取redis中的数据
+        :param key_name: key
+        :return: data
+        """
+        conn = self.connect()
+        if not conn.exists(key_name):
+            # key不存在
+            return None
+        data = conn.get(key_name)
+        return data
+
+    def set_data_to_redis(self, key_name, value, expire_time=24*3600):
+        """
+        新增数据
+        :param key_name: key
+        :param value: 元素的值 videoId
+        :param expire_time: 过期时间,单位:s,默认1天
+        :return: None
+        """
+        conn = self.connect()
+        conn.set(key_name, value, ex=expire_time)
+
+    def add_data_with_zset(self, key_name, data, expire_time=7*24*3600):
+        """
+        新增数据,有序set
+        :param key_name: key
+        :param data: 元素的值及对应分数 type-dict  {value: score}
+        :param expire_time: 过期时间,单位:s,默认7天
+        :return: None
+        """
+        conn = self.connect()
+        conn.zadd(key_name, data)
+        # 设置过期时间
+        conn.expire(key_name, expire_time)
+
+    def get_data_zset_with_index(self, key_name, start, end, desc=True, with_scores=False):
+        """
+        根据索引位置获取元素的值
+        :param key_name: key
+        :param start: 索引起始点 闭区间,包含start
+        :param end: 索引结束点 闭区间,包含end
+        :param desc: 分数排序方式,默认从大到小
+        :param with_scores: 是否获取元素的分数,默认 False,只获取元素的值
+        :return: data 元素值列表(不包含分数),value(videoId)类型转换为int, 包含分数时不进行类型转换
+        """
+        conn = self.connect()
+        if not conn.exists(key_name):
+            return None
+        data = conn.zrange(key_name, start, end, desc, with_scores)
+        if with_scores:
+            return data
+        else:
+            return [eval(value) for value in data]
+
+    def get_score_with_value(self, key_name, value):
+        """
+        在zset中,根据元素的value获取对应的score
+        :param key_name: key
+        :param value: 元素的值
+        :return: score value对应的score
+        """
+        conn = self.connect()
+        return conn.zscore(key_name, value)
+
+    def update_score_with_value(self, key_name, value, score, expire_time=7*24*3600):
+        """
+        在zset中,修改元素value对应的score
+        :param key_name: key
+        :param value: 元素的值
+        :param score: value对应的score更新值
+        :param expire_time: 过期时间,单位:s,默认7天
+        """
+        conn = self.connect()
+        if conn.exists(key_name):
+            conn.zadd(key_name, {value: score})
+        else:
+            # key不存在时,需设置过期时间
+            conn.zadd(key_name, {value: score})
+            conn.expire(key_name, expire_time)
+
+    def remove_value_from_zset(self, key_name, value):
+        """
+        删除zset中的指定元素
+        :param key_name: key
+        :param value: 元素的值
+        :return: None
+        """
+        conn = self.connect()
+        conn.zrem(key_name, value)
+
+    def get_index_with_data(self, key_name, value):
+        """
+        根据元素的值获取在有序set中的位置,按照分数倒序(从大到小)
+        :param key_name: key
+        :param value: 元素的值
+        :return: idx 位置索引
+        """
+        conn = self.connect()
+        return conn.zrevrank(key_name, value)
+
+    def get_data_from_set(self, key_name):
+        """
+        获取set中的所有数据
+        :param key_name: key
+        :return: data
+        """
+        conn = self.connect()
+        if not conn.exists(key_name):
+            # key不存在
+            return None
+        data = conn.sscan(key_name)
+        return data[1]
+
+    def add_data_with_set(self, key_name, values, expire_time=30*60):
+        """
+        新增数据,set
+        :param key_name: key
+        :param values: 要添加的元素  类型-set
+        :param expire_time: 过期时间,单位:s,默认0.5小时
+        :return: None
+        """
+        conn = self.connect()
+        conn.sadd(key_name, *values)
+        # 设置过期时间
+        conn.expire(key_name, expire_time)
+
+    def data_exists_with_set(self, key_name, value):
+        """
+        判断元素value是否在集合key_name中
+        :param key_name: key
+        :param value: 需判断的元素
+        :return: 存在-True, 不存在-False
+        """
+        conn = self.connect()
+        return conn.sismember(key_name, value)
+
+    def remove_value_from_set(self, key_name, values):
+        """
+        删除set中的指定元素
+        :param key_name: key
+        :param values: 元素的值, 类型-set
+        :return: None
+        """
+        conn = self.connect()
+        conn.srem(key_name, *values)
+
+
+class HologresHelper(object):
+    def __init__(self):
+        """初始化hologres连接信息"""
+        self.hologres_info = config_.HOLOGRES_INFO
+
+    def get_data(self, sql):
+        # 连接Hologres
+        conn = psycopg2.connect(**self.hologres_info)
+        # 创建游标
+        cur = conn.cursor()
+        # 查询数据
+        cur.execute(sql)
+        data = cur.fetchall()
+        # 提交事务
+        conn.commit()
+        # 释放资源
+        cur.close()
+        conn.close()
+        return data
+
+
+if __name__ == '__main__':
+    redis_helper = RedisHelper()
+    key = 'com.weiqu.video.hot.recommend.item.score.20210901'
+    res = redis_helper.get_score_with_value(key, 90797)
+    print(res)

+ 58 - 5
pool_predict.py

@@ -1,19 +1,20 @@
 import time
 
 from config import set_config
-from utils import request_post
+from utils import request_post, filter_video_status
 from log import Log
+from db_helper import RedisHelper
 
 config_ = set_config()
 log_ = Log()
 
 
-def get_videos_from_pool(app_type, size=1000):
+def get_videos_from_flow_pool(app_type, size=1000):
     """
     从流量池获取视频,循环获取,直到返回数据为None结束
     :param app_type: 产品标识 type-int
     :param size: 每次获取视频数量,type-int,默认1000
-    :return: videos
+    :return: videos  [{'videoId': 1111, 'flowPool': ''}, ...]
     """
     # 获取批次标识,利用首次获取数据时间戳为标记
     batch_flag = time.time()
@@ -55,7 +56,59 @@ def get_videos_remain_view_count(app_type, videos_info):
     return data
 
 
+def get_score(video_ids):
+    return [1] * len(video_ids)
+
+
+def predict():
+    """
+    对流量池视频排序,并将结果上传Redis
+    :return: None
+    """
+    # 从流量池获取数据
+    videos = get_videos_from_flow_pool(app_type=config_.APP_TYPE['VLOG'])
+    if len(videos) <= 0:
+        log_.info('流量池中无需分发的视频')
+        return None
+    # video_id 与 flow_pool 进行mapping
+    video_ids = set()
+    log_.info('流量池中视频数:{}'.format(len(video_ids)))
+    mapping = {}
+    for video in videos:
+        video_ids.add(video['videoId'])
+        mapping[video['videoId']] = video['flowPool']
+    # 对视频状态进行过滤
+    filtered_videos = filter_video_status(list(video_ids))
+    log_.info('filter videos status finished, filtered_videos nums={}'.format(len(filtered_videos)))
+    if not filtered_videos:
+        log_.info('流量池中视频状态不符合分发')
+        return None
+    # 预测
+    video_score = get_score(filtered_videos)
+    log_.info('predict finished!')
+    # 上传数据到redis
+    redis_data = {}
+    for i in range(len(video_score)):
+        video_id = filtered_videos[i]
+        score = video_score[i]
+        for flow_pool in mapping.get(video_id):
+            value = '{}-{}'.format(video_id, flow_pool)
+            redis_data[value] = score
+    key_name = config_.FLOWPOOL_KEY_NAME
+    redis_helper = RedisHelper()
+    # 如果key已存在,删除key
+    if redis_helper.key_exists(key_name):
+        redis_helper.del_keys(key_name)
+    # 写入redis
+    redis_helper.add_data_with_zset(key_name=key_name, data=redis_data, expire_time=24 * 3600)
+    log_.info('data to redis finished!')
+
+
 if __name__ == '__main__':
     # res = get_videos_from_pool(app_type=0)
-    res = get_videos_remain_view_count(app_type=0, videos_info=[('12345', '#2#1#111')])
-    print(res)
+    # res = get_videos_remain_view_count(app_type=0, videos_info=[('12345', '#2#1#111')])
+    # print(res)
+    log_.info('flow pool predict start...')
+    predict()
+    log_.info('flow pool predict end...')
+

+ 14 - 14
process_feature.py

@@ -6,29 +6,29 @@ features_name_list = [
     'day3viewcount',
     'day7viewcount',
     'day14viewcount',
-    'day30viewcount',
-    'day60viewcount',
+    # 'day30viewcount',
+    # 'day60viewcount',
 
     'day1playcount',  # 1/3/7/14/30/60日内播放
     'day3playcount',
     'day7playcount',
     'day14playcount',
-    'day30playcount',
-    'day60playcount',
+    # 'day30playcount',
+    # 'day60playcount',
 
     'day1sharecount',  # 1/3/7/14/30/60日内分享,一层回流
     'day3sharecount',
     'day7sharecount',
     'day14sharecount',
-    'day30sharecount',
-    'day60sharecount',
+    # 'day30sharecount',
+    # 'day60sharecount',
 
     'day1returncount',  # 一层回流
     'day3returncount',
     'day7returncount',
     'day14returncount',
-    'day30returncount',
-    'day60returncount',
+    # 'day30returncount',
+    # 'day60returncount',
 
     'videocategory11',
     'videocategory12',
@@ -136,15 +136,15 @@ add_feature = [
     'day3ctr',
     'day7ctr',
     'day14ctr',
-    'day30ctr',
-    'day60ctr',
+    # 'day30ctr',
+    # 'day60ctr',
 
     'day1sov',  # --  1/3/7/14/30/60日内分享/曝光  #18
     'day3sov',
     'day7sov',
     'day14sov',
-    'day30sov',
-    'day60sov',
+    # 'day30sov',
+    # 'day60sov',
 
     'day1rov',  # -- 1/3/7/14日内曝光的回流/曝光   #19
     'day3rov',
@@ -155,8 +155,8 @@ add_feature = [
     'day3soc',
     'day7soc',
     'day14soc',
-    'day30soc',
-    'day60soc',
+    # 'day30soc',
+    # 'day60soc',
 
     'day1roc',  # -- 1/3/7/14日内曝光的回流/播放  #21
     'day3roc',

+ 40 - 7
rov_train.py

@@ -1,12 +1,16 @@
 import os
+import time
+
 import lightgbm as lgb
 import pandas as pd
 
 from sklearn.model_selection import train_test_split
 from sklearn.metrics import mean_absolute_error, r2_score, mean_absolute_percentage_error
+
 from config import set_config
-from utils import read_from_pickle, write_to_pickle
+from utils import read_from_pickle, write_to_pickle, data_normalization, request_post
 from log import Log
+from db_helper import RedisHelper
 
 config_ = set_config()
 log_ = Log()
@@ -98,7 +102,6 @@ def train(x, y, features):
     pre_y_test = model.predict(data=x_test, num_iteration=model.best_iteration)
     y_test = y_test.values
 
-    err_mae = mean_absolute_error(y_test, pre_y_test)
     err_mape = mean_absolute_percentage_error(y_test, pre_y_test)
     r2 = r2_score(y_test, pre_y_test)
 
@@ -107,7 +110,7 @@ def train(x, y, features):
     test_result_filename = 'test_result.csv'
     pack_result_to_csv(filename=test_result_filename, sort_columns=['pre_y_test'], ascending=False, **test_data)
 
-    print(err_mae, err_mape, r2)
+    log_.info('err_mape={}, r2={}'.format(err_mape, r2))
 
     # 保存模型
     write_to_pickle(data=model, filename=config_.MODEL_FILENAME)
@@ -136,20 +139,50 @@ def predict():
     """预测"""
     # 读取预测数据并进行清洗
     x, y, video_ids, _ = process_data(config_.PREDICT_DATA_FILENAME)
+    log_.info('predict data shape: x={}'.format(x.shape))
     # 获取训练好的模型
     model = read_from_pickle(filename=config_.MODEL_FILENAME)
     # 预测
     y_ = model.predict(x)
+    log_.info('predict finished!')
+    # 将结果进行归一化到[0, 100]
+    normal_y_ = data_normalization(list(y_))
+    log_.info('normalization finished!')
     # 打包预测结果存入csv
-    predict_data = {'y_': y_, 'y': y, 'video_ids': video_ids}
+    predict_data = {'normal_y_': normal_y_, 'y_': y_, 'y': y, 'video_ids': video_ids}
     predict_result_filename = 'predict.csv'
-    pack_result_to_csv(filename=predict_result_filename, sort_columns=['y_'], ascending=False, **predict_data)
+    pack_result_to_csv(filename=predict_result_filename, sort_columns=['normal_y_'], ascending=False, **predict_data)
+    # 上传redis
+    redis_data = {}
+    json_data = []
+    for i in range(len(video_ids)):
+        redis_data[video_ids[i]] = normal_y_[i]
+        json_data.append({'videoId': video_ids[i], 'rovScore': normal_y_[i]})
+    key_name = config_.RECALL_KEY_NAME_PREFIX + time.strftime('%Y%m%d')
+    redis_helper = RedisHelper()
+    redis_helper.add_data_with_zset(key_name=key_name, data=redis_data)
+    log_.info('data to redis finished!')
+    # 通知后端更新数据
+    result = request_post(request_url=config_.NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL, request_data={'videos': json_data})
+    if result['code'] == 0:
+        log_.info('notify backend success!')
+    else:
+        log_.error('notify backend fail!')
 
 
 if __name__ == '__main__':
+    log_.info('rov model train start...')
+    train_start = time.time()
     train_filename = config_.TRAIN_DATA_FILENAME
     X, Y, videos, fea = process_data(filename=train_filename)
-    print(X.shape, Y.shape)
-    print(len(fea), fea)
+    log_.info('X_shape = {}, Y_sahpe = {}'.format(X.shape, Y.shape))
     train(X, Y, features=fea)
+    train_end = time.time()
+    log_.info('rov model train end, execute time = {}ms'.format((train_end - train_start)*1000))
+
+    log_.info('rov model predict start...')
+    predict_start = time.time()
     predict()
+    predict_end = time.time()
+    log_.info('rov model predict end, execute time = {}ms'.format((predict_end - predict_start)*1000))
+

+ 23 - 0
utils.py

@@ -77,3 +77,26 @@ def request_post(request_url, request_data):
     if response.status_code == 200:
         res_data = json.loads(response.text)
         return res_data
+
+
+def data_normalization(data):
+    """
+    对结果做归一化处理(Min-Max Normalization),将分数控制在[0, 100]
+    :param data: type-list
+    :return: normal_data, type-list 归一化后的数据
+    """
+    x_max = max(data)
+    x_min = min(data)
+    normal_data = [(x-x_min)/(x_max-x_min)*100 for x in data]
+    return normal_data
+
+
+def filter_video_status(video_ids):
+    filtered_videos = video_ids
+    return filtered_videos
+
+
+if __name__ == '__main__':
+    # data_test = [9.20273281e+03, 7.00795065e+03, 5.54813112e+03, 9.97402494e-01, 9.96402495e-01, 9.96402494e-01]
+    # data_normalization(data_test)
+    request_post(request_url=config_.NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL, request_data={'videos': []})