ad_predict_user_data_process.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. import os.path
  2. import time
  3. import datetime
  4. import pandas as pd
  5. from odps import ODPS
  6. from utils import data_check
  7. from threading import Timer
  8. # ODPS服务配置
  9. odps_config = {
  10. 'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api',
  11. 'ACCESSID': 'LTAIWYUujJAm7CbH',
  12. 'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
  13. }
  14. features = [
  15. 'apptype',
  16. 'mid',
  17. 'mid_preview_count_30day',
  18. 'mid_view_count_30day',
  19. 'mid_view_count_pv_30day',
  20. 'mid_play_count_30day',
  21. 'mid_play_count_pv_30day',
  22. 'mid_share_count_30day',
  23. 'mid_share_count_pv_30day',
  24. 'mid_return_count_30day',
  25. 'mid_share_rate_30day',
  26. 'mid_return_rate_30day',
  27. ]
  28. def get_feature_data(project, table, dt, app_type):
  29. """获取特征数据"""
  30. odps = ODPS(
  31. access_id=odps_config['ACCESSID'],
  32. secret_access_key=odps_config['ACCESSKEY'],
  33. project=project,
  34. endpoint=odps_config['ENDPOINT'],
  35. )
  36. feature_data = []
  37. sql = f"select * from {project}.{table} where dt={dt} and apptype={app_type} limit 10000"
  38. with odps.execute_sql(sql).open_reader() as reader:
  39. for record in reader:
  40. # print(record)
  41. item = {}
  42. for feature_name in features:
  43. item[feature_name] = record[feature_name]
  44. feature_data.append(item)
  45. feature_df = pd.DataFrame(feature_data)
  46. return feature_df
  47. def user_data_process(project, table, dt, app_type):
  48. """每日特征处理"""
  49. print('step 1: get user feature data')
  50. feature_initial_df = get_feature_data(project=project, table=table, dt=dt, app_type=app_type)
  51. print(f"feature_initial_df shape: {feature_initial_df.shape}")
  52. print('step 2: process')
  53. feature_initial_df['apptype'] = feature_initial_df['apptype'].astype(int)
  54. feature_df = feature_initial_df.copy()
  55. # 缺失值填充
  56. feature_df.fillna(0, inplace=True)
  57. # 数据类型校正
  58. type_int_columns = [
  59. 'mid_preview_count_30day',
  60. 'mid_view_count_30day',
  61. 'mid_view_count_pv_30day',
  62. 'mid_play_count_30day',
  63. 'mid_play_count_pv_30day',
  64. 'mid_share_count_30day',
  65. 'mid_share_count_pv_30day',
  66. 'mid_return_count_30day',
  67. ]
  68. for column_name in type_int_columns:
  69. feature_df[column_name] = feature_df[column_name].astype(int)
  70. type_float_columns = [
  71. 'mid_share_rate_30day',
  72. 'mid_return_rate_30day',
  73. ]
  74. for column_name in type_float_columns:
  75. feature_df[column_name] = feature_df[column_name].astype(float)
  76. print(f"feature_df shape: {feature_df.shape}")
  77. print('step 3: add new user feature')
  78. # 补充新用户默认数据(使用均值)
  79. new_user_feature = {
  80. 'apptype': app_type,
  81. 'mid': '-1',
  82. 'mid_preview_count_30day': int(feature_df['mid_preview_count_30day'].mean()),
  83. 'mid_view_count_30day': int(feature_df['mid_view_count_30day'].mean()),
  84. 'mid_view_count_pv_30day': int(feature_df['mid_view_count_pv_30day'].mean()),
  85. 'mid_play_count_30day': int(feature_df['mid_play_count_30day'].mean()),
  86. 'mid_play_count_pv_30day': int(feature_df['mid_play_count_pv_30day'].mean()),
  87. 'mid_share_count_30day': int(feature_df['mid_share_count_30day'].mean()),
  88. 'mid_share_count_pv_30day': int(feature_df['mid_share_count_pv_30day'].mean()),
  89. 'mid_return_count_30day': int(feature_df['mid_return_count_30day'].mean()),
  90. }
  91. new_user_feature['mid_share_rate_30day'] = float(
  92. new_user_feature['mid_share_count_pv_30day'] / new_user_feature['mid_play_count_pv_30day'] + 1)
  93. new_user_feature['mid_return_rate_30day'] = float(
  94. new_user_feature['mid_return_count_30day'] / new_user_feature['mid_view_count_pv_30day'] + 1)
  95. new_user_feature_df = pd.DataFrame([new_user_feature])
  96. user_df = pd.concat([feature_df, new_user_feature_df])
  97. print(f"user_df shape: {user_df.shape}")
  98. print(f"step 4: to csv")
  99. # 写入csv
  100. predict_data_dir = './data/predict_data'
  101. if not os.path.exists(predict_data_dir):
  102. os.makedirs(predict_data_dir)
  103. user_df.to_csv(f"{predict_data_dir}/user_feature.csv", index=False)
  104. def timer_check():
  105. project = 'loghubods'
  106. table = 'admodel_testset_mid'
  107. # dt = '20230725'
  108. now_date = datetime.datetime.today()
  109. dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d')
  110. # 查看当前更新的数据是否已准备好
  111. data_count = data_check(project=project, table=table, dt=dt)
  112. if data_count > 0:
  113. print(f"ad predict user data count = {data_count}")
  114. # 数据准备好,进行更新
  115. user_data_process(project=project, table=table, dt=dt, app_type=0)
  116. print(f"ad predict user data update end!")
  117. else:
  118. # 数据没准备好,1分钟后重新检查
  119. Timer(60, timer_check).start()
  120. if __name__ == '__main__':
  121. st_time = time.time()
  122. timer_check()
  123. print(f"{time.time() - st_time}s")