123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- import os.path
- import time
- import datetime
- import pandas as pd
- from odps import ODPS
- from utils import data_check, RedisHelper
- from threading import Timer
- from config import set_config
- config_, _ = set_config()
- redis_helper = RedisHelper()
- # ODPS服务配置
- odps_config = {
- 'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api',
- 'ACCESSID': 'LTAIWYUujJAm7CbH',
- 'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
- }
- features = [
- 'apptype',
- 'videoid',
- 'video_preview_count_uv_30day',
- 'video_preview_count_pv_30day',
- 'video_view_count_uv_30day',
- 'video_view_count_pv_30day',
- 'video_play_count_uv_30day',
- 'video_play_count_pv_30day',
- 'video_share_count_uv_30day',
- 'video_share_count_pv_30day',
- 'video_return_count_30day',
- 'video_ctr_uv_30day',
- 'video_ctr_pv_30day',
- 'video_share_rate_uv_30day',
- 'video_share_rate_pv_30day',
- 'video_return_rate_30day',
- ]
- def get_feature_data(project, table, dt, app_type):
- """获取特征数据"""
- odps = ODPS(
- access_id=odps_config['ACCESSID'],
- secret_access_key=odps_config['ACCESSKEY'],
- project=project,
- endpoint=odps_config['ENDPOINT'],
- )
- feature_data = []
- sql = f"select * from {project}.{table} where dt={dt} and apptype={app_type} limit 1000"
- with odps.execute_sql(sql).open_reader() as reader:
- for record in reader:
- # print(record)
- item = {}
- for feature_name in features:
- item[feature_name] = record[feature_name]
- feature_data.append(item)
- feature_df = pd.DataFrame(feature_data)
- return feature_df
- def video_data_process(project, table, dt, app_type):
- """每日特征处理"""
- print('step 1: get video feature data')
- feature_initial_df = get_feature_data(project=project, table=table, dt=dt, app_type=app_type)
- print(f"feature_initial_df shape: {feature_initial_df.shape}")
- print('step 2: process')
- feature_initial_df['apptype'] = feature_initial_df['apptype'].astype(int)
- feature_df = feature_initial_df.copy()
- # 缺失值填充
- feature_df.fillna(0, inplace=True)
- # 数据类型校正
- type_int_columns = [
- 'video_preview_count_uv_30day',
- 'video_preview_count_pv_30day',
- 'video_view_count_uv_30day',
- 'video_view_count_pv_30day',
- 'video_play_count_uv_30day',
- 'video_play_count_pv_30day',
- 'video_share_count_uv_30day',
- 'video_share_count_pv_30day',
- 'video_return_count_30day',
- ]
- for column_name in type_int_columns:
- feature_df[column_name] = feature_df[column_name].astype(int)
- type_float_columns = [
- 'video_ctr_uv_30day',
- 'video_ctr_pv_30day',
- 'video_share_rate_uv_30day',
- 'video_share_rate_pv_30day',
- 'video_return_rate_30day',
- ]
- for column_name in type_float_columns:
- feature_df[column_name] = feature_df[column_name].astype(float)
- print(f"feature_df shape: {feature_df.shape}")
- print('step 3: add new video feature')
- # 补充新视频默认数据(使用均值)
- new_video_feature = {
- 'apptype': app_type,
- 'videoid': '-1',
- 'video_preview_count_uv_30day': int(feature_df['video_preview_count_uv_30day'].mean()),
- 'video_preview_count_pv_30day': int(feature_df['video_preview_count_pv_30day'].mean()),
- 'video_view_count_uv_30day': int(feature_df['video_view_count_uv_30day'].mean()),
- 'video_view_count_pv_30day': int(feature_df['video_view_count_pv_30day'].mean()),
- 'video_play_count_uv_30day': int(feature_df['video_play_count_uv_30day'].mean()),
- 'video_play_count_pv_30day': int(feature_df['video_play_count_pv_30day'].mean()),
- 'video_share_count_uv_30day': int(feature_df['video_share_count_uv_30day'].mean()),
- 'video_share_count_pv_30day': int(feature_df['video_share_count_pv_30day'].mean()),
- 'video_return_count_30day': int(feature_df['video_return_count_30day'].mean()),
- }
- new_video_feature['video_ctr_uv_30day'] = float(
- new_video_feature['video_play_count_uv_30day'] / new_video_feature['video_view_count_uv_30day'] + 1)
- new_video_feature['video_ctr_pv_30day'] = float(
- new_video_feature['video_play_count_pv_30day'] / new_video_feature['video_view_count_pv_30day'] + 1)
- new_video_feature['video_share_rate_uv_30day'] = float(
- new_video_feature['video_share_count_uv_30day'] / new_video_feature['video_play_count_uv_30day'] + 1)
- new_video_feature['video_share_rate_pv_30day'] = float(
- new_video_feature['video_share_count_pv_30day'] / new_video_feature['video_play_count_pv_30day'] + 1)
- new_video_feature['video_return_rate_30day'] = float(
- new_video_feature['video_return_count_30day'] / new_video_feature['video_view_count_pv_30day'] + 1)
- new_video_feature_df = pd.DataFrame([new_video_feature])
- video_df = pd.concat([feature_df, new_video_feature_df])
- print(f"video_df shape: {video_df.shape}")
- print(f"step 4: to csv")
- # 写入csv
- predict_data_dir = './data/predict_data'
- if not os.path.exists(predict_data_dir):
- os.makedirs(predict_data_dir)
- video_df.to_csv(f"{predict_data_dir}/video_feature.csv", index=False)
- # to redis
- xgb_config = config_.AD_MODEL_ABTEST_CONFIG['xgb']
- for ind, row in video_df.iterrows():
- app_type = row['apptype']
- video_id = row['videoid']
- value = {
- 'video_preview_count_uv_30day': row['video_preview_count_uv_30day'],
- 'video_preview_count_pv_30day': row['video_preview_count_pv_30day'],
- 'video_view_count_uv_30day': row['video_view_count_uv_30day'],
- 'video_view_count_pv_30day': row['video_view_count_pv_30day'],
- 'video_play_count_uv_30day': row['video_play_count_uv_30day'],
- 'video_play_count_pv_30day': row['video_play_count_pv_30day'],
- 'video_share_count_uv_30day': row['video_share_count_uv_30day'],
- 'video_share_count_pv_30day': row['video_share_count_pv_30day'],
- 'video_return_count_30day': row['video_return_count_30day'],
- 'video_ctr_uv_30day': row['video_ctr_uv_30day'],
- 'video_ctr_pv_30day': row['video_ctr_pv_30day'],
- 'video_share_rate_uv_30day': row['video_share_rate_uv_30day'],
- 'video_share_rate_pv_30day': row['video_share_rate_pv_30day'],
- 'video_return_rate_30day': row['video_return_rate_30day']
- }
- key = f"{xgb_config['predict_video_feature_key_prefix']}{app_type}:{video_id}"
- redis_helper.set_data_to_redis(key_name=key, value=str(value), expire_time=48*3600)
- def timer_check():
- project = 'loghubods'
- table = 'admodel_testset_video'
- # dt = '20230725'
- now_date = datetime.datetime.today()
- dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d')
- # 查看当前更新的数据是否已准备好
- data_count = data_check(project=project, table=table, dt=dt)
- if data_count > 0:
- print(f"ad predict video data count = {data_count}")
- # 数据准备好,进行更新
- video_data_process(project=project, table=table, dt=dt, app_type=0)
- print(f"ad predict video data update end!")
- else:
- # 数据没准备好,1分钟后重新检查
- Timer(60, timer_check).start()
- if __name__ == '__main__':
- st_time = time.time()
- timer_check()
- print(f"{time.time() - st_time}s")
|