ad_predict_user_data_process.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. import os.path
  2. import time
  3. import datetime
  4. import pandas as pd
  5. from odps import ODPS
  6. from utils import data_check, RedisHelper
  7. from threading import Timer
  8. from config import set_config
  9. config_, _ = set_config()
  10. redis_helper = RedisHelper()
  11. # ODPS服务配置
  12. odps_config = {
  13. 'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api',
  14. 'ACCESSID': 'LTAIWYUujJAm7CbH',
  15. 'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
  16. }
  17. features = [
  18. 'apptype',
  19. 'mid',
  20. 'mid_preview_count_30day',
  21. 'mid_view_count_30day',
  22. 'mid_view_count_pv_30day',
  23. 'mid_play_count_30day',
  24. 'mid_play_count_pv_30day',
  25. 'mid_share_count_30day',
  26. 'mid_share_count_pv_30day',
  27. 'mid_return_count_30day',
  28. 'mid_share_rate_30day',
  29. 'mid_return_rate_30day',
  30. ]
  31. def get_feature_data(project, table, dt, app_type):
  32. """获取特征数据"""
  33. odps = ODPS(
  34. access_id=odps_config['ACCESSID'],
  35. secret_access_key=odps_config['ACCESSKEY'],
  36. project=project,
  37. endpoint=odps_config['ENDPOINT'],
  38. )
  39. feature_data = []
  40. sql = f"select * from {project}.{table} where dt={dt} and apptype={app_type} limit 10000"
  41. with odps.execute_sql(sql).open_reader() as reader:
  42. for record in reader:
  43. # print(record)
  44. item = {}
  45. for feature_name in features:
  46. item[feature_name] = record[feature_name]
  47. feature_data.append(item)
  48. feature_df = pd.DataFrame(feature_data)
  49. return feature_df
  50. def user_data_process(project, table, dt, app_type):
  51. """每日特征处理"""
  52. print('step 1: get user feature data')
  53. feature_initial_df = get_feature_data(project=project, table=table, dt=dt, app_type=app_type)
  54. print(f"feature_initial_df shape: {feature_initial_df.shape}")
  55. print('step 2: process')
  56. feature_initial_df['apptype'] = feature_initial_df['apptype'].astype(int)
  57. feature_df = feature_initial_df.copy()
  58. # 缺失值填充
  59. feature_df.fillna(0, inplace=True)
  60. # 数据类型校正
  61. type_int_columns = [
  62. 'mid_preview_count_30day',
  63. 'mid_view_count_30day',
  64. 'mid_view_count_pv_30day',
  65. 'mid_play_count_30day',
  66. 'mid_play_count_pv_30day',
  67. 'mid_share_count_30day',
  68. 'mid_share_count_pv_30day',
  69. 'mid_return_count_30day',
  70. ]
  71. for column_name in type_int_columns:
  72. feature_df[column_name] = feature_df[column_name].astype(int)
  73. type_float_columns = [
  74. 'mid_share_rate_30day',
  75. 'mid_return_rate_30day',
  76. ]
  77. for column_name in type_float_columns:
  78. feature_df[column_name] = feature_df[column_name].astype(float)
  79. print(f"feature_df shape: {feature_df.shape}")
  80. print('step 3: add new user feature')
  81. # 补充新用户默认数据(使用均值)
  82. new_user_feature = {
  83. 'apptype': app_type,
  84. 'mid': '-1',
  85. 'mid_preview_count_30day': int(feature_df['mid_preview_count_30day'].mean()),
  86. 'mid_view_count_30day': int(feature_df['mid_view_count_30day'].mean()),
  87. 'mid_view_count_pv_30day': int(feature_df['mid_view_count_pv_30day'].mean()),
  88. 'mid_play_count_30day': int(feature_df['mid_play_count_30day'].mean()),
  89. 'mid_play_count_pv_30day': int(feature_df['mid_play_count_pv_30day'].mean()),
  90. 'mid_share_count_30day': int(feature_df['mid_share_count_30day'].mean()),
  91. 'mid_share_count_pv_30day': int(feature_df['mid_share_count_pv_30day'].mean()),
  92. 'mid_return_count_30day': int(feature_df['mid_return_count_30day'].mean()),
  93. }
  94. new_user_feature['mid_share_rate_30day'] = float(
  95. new_user_feature['mid_share_count_pv_30day'] / new_user_feature['mid_play_count_pv_30day'] + 1)
  96. new_user_feature['mid_return_rate_30day'] = float(
  97. new_user_feature['mid_return_count_30day'] / new_user_feature['mid_view_count_pv_30day'] + 1)
  98. new_user_feature_df = pd.DataFrame([new_user_feature])
  99. user_df = pd.concat([feature_df, new_user_feature_df])
  100. print(f"user_df shape: {user_df.shape}")
  101. print(f"step 4: to csv")
  102. # 写入csv
  103. predict_data_dir = './data/predict_data'
  104. if not os.path.exists(predict_data_dir):
  105. os.makedirs(predict_data_dir)
  106. user_df.to_csv(f"{predict_data_dir}/user_feature.csv", index=False)
  107. # to redis
  108. xgb_config = config_.AD_MODEL_ABTEST_CONFIG['xgb']
  109. for ind, row in user_df.iterrows():
  110. app_type = row['apptype']
  111. mid = row['mid']
  112. value = {
  113. 'mid_preview_count_30day': row['mid_preview_count_30day'],
  114. 'mid_view_count_30day': row['mid_view_count_30day'],
  115. 'mid_view_count_pv_30day': row['mid_view_count_pv_30day'],
  116. 'mid_play_count_30day': row['mid_play_count_30day'],
  117. 'mid_play_count_pv_30day': row['mid_play_count_pv_30day'],
  118. 'mid_share_count_30day': row['mid_share_count_30day'],
  119. 'mid_share_count_pv_30day': row['mid_share_count_pv_30day'],
  120. 'mid_return_count_30day': row['mid_return_count_30day'],
  121. 'mid_share_rate_30day': row['mid_share_rate_30day'],
  122. 'mid_return_rate_30day': row['mid_return_rate_30day']
  123. }
  124. key = f"{xgb_config['predict_user_feature_key_prefix']}{app_type}:{mid}"
  125. redis_helper.set_data_to_redis(key_name=key, value=str(value), expire_time=48 * 3600)
  126. def timer_check():
  127. project = 'loghubods'
  128. table = 'admodel_testset_mid'
  129. # dt = '20230725'
  130. now_date = datetime.datetime.today()
  131. dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d')
  132. # 查看当前更新的数据是否已准备好
  133. data_count = data_check(project=project, table=table, dt=dt)
  134. if data_count > 0:
  135. print(f"ad predict user data count = {data_count}")
  136. # 数据准备好,进行更新
  137. user_data_process(project=project, table=table, dt=dt, app_type=0)
  138. print(f"ad predict user data update end!")
  139. else:
  140. # 数据没准备好,1分钟后重新检查
  141. Timer(60, timer_check).start()
  142. if __name__ == '__main__':
  143. st_time = time.time()
  144. timer_check()
  145. print(f"{time.time() - st_time}s")