ad_predict_user_data_process.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. import os.path
  2. import time
  3. import datetime
  4. import pandas as pd
  5. from odps import ODPS
  6. from utils import data_check, RedisHelper
  7. from threading import Timer
  8. from config import set_config
  9. config_, _ = set_config()
  10. redis_helper = RedisHelper()
  11. # ODPS服务配置
  12. odps_config = {
  13. 'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api',
  14. 'ACCESSID': 'LTAIWYUujJAm7CbH',
  15. 'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
  16. }
  17. features = [
  18. 'apptype',
  19. 'mid',
  20. 'mid_preview_count_30day',
  21. 'mid_view_count_30day',
  22. 'mid_view_count_pv_30day',
  23. 'mid_play_count_30day',
  24. 'mid_play_count_pv_30day',
  25. 'mid_share_count_30day',
  26. 'mid_share_count_pv_30day',
  27. 'mid_return_count_30day',
  28. 'mid_share_rate_30day',
  29. 'mid_return_rate_30day',
  30. ]
  31. def get_feature_data(project, table, dt, app_type):
  32. """获取特征数据"""
  33. odps = ODPS(
  34. access_id=odps_config['ACCESSID'],
  35. secret_access_key=odps_config['ACCESSKEY'],
  36. project=project,
  37. endpoint=odps_config['ENDPOINT'],
  38. )
  39. feature_data = []
  40. sql = f"select * from {project}.{table} where dt={dt} and apptype={app_type}"
  41. with odps.execute_sql(sql).open_reader() as reader:
  42. for record in reader:
  43. # print(record)
  44. item = {}
  45. for feature_name in features:
  46. item[feature_name] = record[feature_name]
  47. feature_data.append(item)
  48. feature_df = pd.DataFrame(feature_data)
  49. return feature_df
  50. def user_data_process(project, table, dt, app_type):
  51. """每日特征处理"""
  52. print('step 1: get user feature data')
  53. feature_initial_df = get_feature_data(project=project, table=table, dt=dt, app_type=app_type)
  54. print(f"feature_initial_df shape: {feature_initial_df.shape}")
  55. print('step 2: process')
  56. feature_initial_df['apptype'] = feature_initial_df['apptype'].astype(int)
  57. feature_df = feature_initial_df.copy()
  58. # 缺失值填充
  59. feature_df.fillna(0, inplace=True)
  60. # 数据类型校正
  61. type_int_columns = [
  62. 'mid_preview_count_30day',
  63. 'mid_view_count_30day',
  64. 'mid_view_count_pv_30day',
  65. 'mid_play_count_30day',
  66. 'mid_play_count_pv_30day',
  67. 'mid_share_count_30day',
  68. 'mid_share_count_pv_30day',
  69. 'mid_return_count_30day',
  70. ]
  71. for column_name in type_int_columns:
  72. feature_df[column_name] = feature_df[column_name].astype(int)
  73. type_float_columns = [
  74. 'mid_share_rate_30day',
  75. 'mid_return_rate_30day',
  76. ]
  77. for column_name in type_float_columns:
  78. feature_df[column_name] = feature_df[column_name].astype(float)
  79. print(f"feature_df shape: {feature_df.shape}")
  80. print('step 3: add new user feature')
  81. # 补充新用户默认数据(使用均值)
  82. new_user_feature = {
  83. 'apptype': app_type,
  84. 'mid': '-1',
  85. 'mid_preview_count_30day': int(feature_df['mid_preview_count_30day'].mean()),
  86. 'mid_view_count_30day': int(feature_df['mid_view_count_30day'].mean()),
  87. 'mid_view_count_pv_30day': int(feature_df['mid_view_count_pv_30day'].mean()),
  88. 'mid_play_count_30day': int(feature_df['mid_play_count_30day'].mean()),
  89. 'mid_play_count_pv_30day': int(feature_df['mid_play_count_pv_30day'].mean()),
  90. 'mid_share_count_30day': int(feature_df['mid_share_count_30day'].mean()),
  91. 'mid_share_count_pv_30day': int(feature_df['mid_share_count_pv_30day'].mean()),
  92. 'mid_return_count_30day': int(feature_df['mid_return_count_30day'].mean()),
  93. }
  94. new_user_feature['mid_share_rate_30day'] = float(
  95. new_user_feature['mid_share_count_pv_30day'] / new_user_feature['mid_play_count_pv_30day'] + 1)
  96. new_user_feature['mid_return_rate_30day'] = float(
  97. new_user_feature['mid_return_count_30day'] / new_user_feature['mid_view_count_pv_30day'] + 1)
  98. new_user_feature_df = pd.DataFrame([new_user_feature])
  99. user_df = pd.concat([feature_df, new_user_feature_df])
  100. print(f"user_df shape: {user_df.shape}")
  101. print(f"step 4: to csv")
  102. # 写入csv
  103. predict_data_dir = './data/predict_data'
  104. if not os.path.exists(predict_data_dir):
  105. os.makedirs(predict_data_dir)
  106. user_df.to_csv(f"{predict_data_dir}/user_feature.csv", index=False)
  107. # to redis
  108. xgb_config = config_.AD_MODEL_ABTEST_CONFIG['xgb']
  109. for ind, row in user_df.iterrows():
  110. app_type = row['apptype']
  111. mid = row['mid']
  112. # value = {
  113. # 'mid_preview_count_30day': row['mid_preview_count_30day'],
  114. # 'mid_view_count_30day': row['mid_view_count_30day'],
  115. # 'mid_view_count_pv_30day': row['mid_view_count_pv_30day'],
  116. # 'mid_play_count_30day': row['mid_play_count_30day'],
  117. # 'mid_play_count_pv_30day': row['mid_play_count_pv_30day'],
  118. # 'mid_share_count_30day': row['mid_share_count_30day'],
  119. # 'mid_share_count_pv_30day': row['mid_share_count_pv_30day'],
  120. # 'mid_return_count_30day': row['mid_return_count_30day'],
  121. # 'mid_share_rate_30day': row['mid_share_rate_30day'],
  122. # 'mid_return_rate_30day': row['mid_return_rate_30day']
  123. # }
  124. value = [
  125. row['mid_preview_count_30day'],
  126. row['mid_view_count_30day'],
  127. row['mid_view_count_pv_30day'],
  128. row['mid_play_count_30day'],
  129. row['mid_play_count_pv_30day'],
  130. row['mid_share_count_30day'],
  131. row['mid_share_count_pv_30day'],
  132. row['mid_return_count_30day'],
  133. row['mid_share_rate_30day'],
  134. row['mid_return_rate_30day']
  135. ]
  136. key = f"{xgb_config['predict_user_feature_key_prefix']}{app_type}:{mid}"
  137. redis_helper.set_data_to_redis(key_name=key, value=str(value), expire_time=48 * 3600)
  138. def timer_check():
  139. project = 'loghubods'
  140. table = 'admodel_testset_mid'
  141. # dt = '20230725'
  142. now_date = datetime.datetime.today()
  143. dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d')
  144. # 查看当前更新的数据是否已准备好
  145. data_count = data_check(project=project, table=table, dt=dt)
  146. if data_count > 0:
  147. print(f"ad predict user data count = {data_count}")
  148. # 数据准备好,进行更新
  149. user_data_process(project=project, table=table, dt=dt, app_type=5)
  150. print(f"ad predict user data update end!")
  151. else:
  152. # 数据没准备好,1分钟后重新检查
  153. Timer(60, timer_check).start()
  154. if __name__ == '__main__':
  155. st_time = time.time()
  156. timer_check()
  157. print(f"execute time: {time.time() - st_time}s")