ad_predict_video_data_process.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. import os.path
  2. import time
  3. import datetime
  4. import pandas as pd
  5. from odps import ODPS
  6. from utils import data_check, RedisHelper
  7. from threading import Timer
  8. from config import set_config
  9. config_, _ = set_config()
  10. redis_helper = RedisHelper()
  11. # ODPS服务配置
  12. odps_config = {
  13. 'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api',
  14. 'ACCESSID': 'LTAIWYUujJAm7CbH',
  15. 'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
  16. }
  17. features = [
  18. 'apptype',
  19. 'videoid',
  20. 'video_preview_count_uv_30day',
  21. 'video_preview_count_pv_30day',
  22. 'video_view_count_uv_30day',
  23. 'video_view_count_pv_30day',
  24. 'video_play_count_uv_30day',
  25. 'video_play_count_pv_30day',
  26. 'video_share_count_uv_30day',
  27. 'video_share_count_pv_30day',
  28. 'video_return_count_30day',
  29. 'video_ctr_uv_30day',
  30. 'video_ctr_pv_30day',
  31. 'video_share_rate_uv_30day',
  32. 'video_share_rate_pv_30day',
  33. 'video_return_rate_30day',
  34. ]
  35. def get_feature_data(project, table, dt, app_type):
  36. """获取特征数据"""
  37. odps = ODPS(
  38. access_id=odps_config['ACCESSID'],
  39. secret_access_key=odps_config['ACCESSKEY'],
  40. project=project,
  41. endpoint=odps_config['ENDPOINT'],
  42. )
  43. feature_data = []
  44. sql = f"select * from {project}.{table} where dt={dt} and apptype={app_type} limit 1000"
  45. with odps.execute_sql(sql).open_reader() as reader:
  46. for record in reader:
  47. # print(record)
  48. item = {}
  49. for feature_name in features:
  50. item[feature_name] = record[feature_name]
  51. feature_data.append(item)
  52. feature_df = pd.DataFrame(feature_data)
  53. return feature_df
  54. def video_data_process(project, table, dt, app_type):
  55. """每日特征处理"""
  56. print('step 1: get video feature data')
  57. feature_initial_df = get_feature_data(project=project, table=table, dt=dt, app_type=app_type)
  58. print(f"feature_initial_df shape: {feature_initial_df.shape}")
  59. print('step 2: process')
  60. feature_initial_df['apptype'] = feature_initial_df['apptype'].astype(int)
  61. feature_df = feature_initial_df.copy()
  62. # 缺失值填充
  63. feature_df.fillna(0, inplace=True)
  64. # 数据类型校正
  65. type_int_columns = [
  66. 'video_preview_count_uv_30day',
  67. 'video_preview_count_pv_30day',
  68. 'video_view_count_uv_30day',
  69. 'video_view_count_pv_30day',
  70. 'video_play_count_uv_30day',
  71. 'video_play_count_pv_30day',
  72. 'video_share_count_uv_30day',
  73. 'video_share_count_pv_30day',
  74. 'video_return_count_30day',
  75. ]
  76. for column_name in type_int_columns:
  77. feature_df[column_name] = feature_df[column_name].astype(int)
  78. type_float_columns = [
  79. 'video_ctr_uv_30day',
  80. 'video_ctr_pv_30day',
  81. 'video_share_rate_uv_30day',
  82. 'video_share_rate_pv_30day',
  83. 'video_return_rate_30day',
  84. ]
  85. for column_name in type_float_columns:
  86. feature_df[column_name] = feature_df[column_name].astype(float)
  87. print(f"feature_df shape: {feature_df.shape}")
  88. print('step 3: add new video feature')
  89. # 补充新视频默认数据(使用均值)
  90. new_video_feature = {
  91. 'apptype': app_type,
  92. 'videoid': '-1',
  93. 'video_preview_count_uv_30day': int(feature_df['video_preview_count_uv_30day'].mean()),
  94. 'video_preview_count_pv_30day': int(feature_df['video_preview_count_pv_30day'].mean()),
  95. 'video_view_count_uv_30day': int(feature_df['video_view_count_uv_30day'].mean()),
  96. 'video_view_count_pv_30day': int(feature_df['video_view_count_pv_30day'].mean()),
  97. 'video_play_count_uv_30day': int(feature_df['video_play_count_uv_30day'].mean()),
  98. 'video_play_count_pv_30day': int(feature_df['video_play_count_pv_30day'].mean()),
  99. 'video_share_count_uv_30day': int(feature_df['video_share_count_uv_30day'].mean()),
  100. 'video_share_count_pv_30day': int(feature_df['video_share_count_pv_30day'].mean()),
  101. 'video_return_count_30day': int(feature_df['video_return_count_30day'].mean()),
  102. }
  103. new_video_feature['video_ctr_uv_30day'] = float(
  104. new_video_feature['video_play_count_uv_30day'] / new_video_feature['video_view_count_uv_30day'] + 1)
  105. new_video_feature['video_ctr_pv_30day'] = float(
  106. new_video_feature['video_play_count_pv_30day'] / new_video_feature['video_view_count_pv_30day'] + 1)
  107. new_video_feature['video_share_rate_uv_30day'] = float(
  108. new_video_feature['video_share_count_uv_30day'] / new_video_feature['video_play_count_uv_30day'] + 1)
  109. new_video_feature['video_share_rate_pv_30day'] = float(
  110. new_video_feature['video_share_count_pv_30day'] / new_video_feature['video_play_count_pv_30day'] + 1)
  111. new_video_feature['video_return_rate_30day'] = float(
  112. new_video_feature['video_return_count_30day'] / new_video_feature['video_view_count_pv_30day'] + 1)
  113. new_video_feature_df = pd.DataFrame([new_video_feature])
  114. video_df = pd.concat([feature_df, new_video_feature_df])
  115. print(f"video_df shape: {video_df.shape}")
  116. print(f"step 4: to csv")
  117. # 写入csv
  118. predict_data_dir = './data/predict_data'
  119. if not os.path.exists(predict_data_dir):
  120. os.makedirs(predict_data_dir)
  121. video_df.to_csv(f"{predict_data_dir}/video_feature.csv", index=False)
  122. # to redis
  123. xgb_config = config_.AD_MODEL_ABTEST_CONFIG['xgb']
  124. for ind, row in video_df.iterrows():
  125. app_type = row['apptype']
  126. video_id = row['videoid']
  127. value = {
  128. 'video_preview_count_uv_30day': row['video_preview_count_uv_30day'],
  129. 'video_preview_count_pv_30day': row['video_preview_count_pv_30day'],
  130. 'video_view_count_uv_30day': row['video_view_count_uv_30day'],
  131. 'video_view_count_pv_30day': row['video_view_count_pv_30day'],
  132. 'video_play_count_uv_30day': row['video_play_count_uv_30day'],
  133. 'video_play_count_pv_30day': row['video_play_count_pv_30day'],
  134. 'video_share_count_uv_30day': row['video_share_count_uv_30day'],
  135. 'video_share_count_pv_30day': row['video_share_count_pv_30day'],
  136. 'video_return_count_30day': row['video_return_count_30day'],
  137. 'video_ctr_uv_30day': row['video_ctr_uv_30day'],
  138. 'video_ctr_pv_30day': row['video_ctr_pv_30day'],
  139. 'video_share_rate_uv_30day': row['video_share_rate_uv_30day'],
  140. 'video_share_rate_pv_30day': row['video_share_rate_pv_30day'],
  141. 'video_return_rate_30day': row['video_return_rate_30day']
  142. }
  143. key = f"{xgb_config['predict_video_feature_key_prefix']}{app_type}:{video_id}"
  144. redis_helper.set_data_to_redis(key_name=key, value=str(value), expire_time=48*3600)
  145. def timer_check():
  146. project = 'loghubods'
  147. table = 'admodel_testset_video'
  148. # dt = '20230725'
  149. now_date = datetime.datetime.today()
  150. dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d')
  151. # 查看当前更新的数据是否已准备好
  152. data_count = data_check(project=project, table=table, dt=dt)
  153. if data_count > 0:
  154. print(f"ad predict video data count = {data_count}")
  155. # 数据准备好,进行更新
  156. video_data_process(project=project, table=table, dt=dt, app_type=0)
  157. print(f"ad predict video data update end!")
  158. else:
  159. # 数据没准备好,1分钟后重新检查
  160. Timer(60, timer_check).start()
  161. if __name__ == '__main__':
  162. st_time = time.time()
  163. timer_check()
  164. print(f"{time.time() - st_time}s")