ad_feature_process.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. import os.path
  2. import time
  3. import datetime
  4. import pandas as pd
  5. from odps import ODPS
  6. # ODPS服务配置
  7. odps_config = {
  8. 'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api',
  9. 'ACCESSID': 'LTAIWYUujJAm7CbH',
  10. 'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
  11. }
  12. features = [
  13. 'apptype',
  14. 'subsessionid',
  15. 'mid',
  16. 'videoid',
  17. 'ad_mid',
  18. 'share_videoid',
  19. 'mid_preview_count_30day',
  20. 'mid_view_count_30day',
  21. 'mid_view_count_pv_30day',
  22. 'mid_play_count_30day',
  23. 'mid_play_count_pv_30day',
  24. 'mid_share_count_30day',
  25. 'mid_share_count_pv_30day',
  26. 'mid_return_count_30day',
  27. 'mid_share_rate_30day',
  28. 'mid_return_rate_30day',
  29. 'video_preview_count_uv_30day',
  30. 'video_preview_count_pv_30day',
  31. 'video_view_count_uv_30day',
  32. 'video_view_count_pv_30day',
  33. 'video_play_count_uv_30day',
  34. 'video_play_count_pv_30day',
  35. 'video_share_count_uv_30day',
  36. 'video_share_count_pv_30day',
  37. 'video_return_count_30day',
  38. 'video_ctr_uv_30day',
  39. 'video_ctr_pv_30day',
  40. 'video_share_rate_uv_30day',
  41. 'video_share_rate_pv_30day',
  42. 'video_return_rate_30day',
  43. ]
  44. train_feature = [
  45. 'mid_preview_count_30day',
  46. 'mid_view_count_30day',
  47. 'mid_view_count_pv_30day',
  48. 'mid_play_count_30day',
  49. 'mid_play_count_pv_30day',
  50. 'mid_share_count_30day',
  51. 'mid_share_count_pv_30day',
  52. 'mid_return_count_30day',
  53. 'mid_share_rate_30day',
  54. 'mid_return_rate_30day',
  55. 'video_preview_count_uv_30day',
  56. 'video_preview_count_pv_30day',
  57. 'video_view_count_uv_30day',
  58. 'video_view_count_pv_30day',
  59. 'video_play_count_uv_30day',
  60. 'video_play_count_pv_30day',
  61. 'video_share_count_uv_30day',
  62. 'video_share_count_pv_30day',
  63. 'video_return_count_30day',
  64. 'video_ctr_uv_30day',
  65. 'video_ctr_pv_30day',
  66. 'video_share_rate_uv_30day',
  67. 'video_share_rate_pv_30day',
  68. 'video_return_rate_30day',
  69. 'ad_status',
  70. 'share_status',
  71. ]
  72. def get_feature_data(project, table, features, dt, app_type):
  73. """获取特征数据"""
  74. odps = ODPS(
  75. access_id=odps_config['ACCESSID'],
  76. secret_access_key=odps_config['ACCESSKEY'],
  77. project=project,
  78. endpoint=odps_config['ENDPOINT'],
  79. )
  80. feature_data = []
  81. sql = f"select * from {project}.{table} where dt={dt} and apptype={app_type}"
  82. with odps.execute_sql(sql).open_reader() as reader:
  83. for record in reader:
  84. # print(record)
  85. item = {}
  86. for feature_name in features:
  87. item[feature_name] = record[feature_name]
  88. feature_data.append(item)
  89. feature_df = pd.DataFrame(feature_data)
  90. return feature_df
  91. def daily_data_process(project, table, features, dt, app_type):
  92. """每日特征处理"""
  93. print('step 1: get feature data')
  94. feature_initial_df = get_feature_data(project=project, table=table, features=features, dt=dt, app_type=app_type)
  95. print(f"feature_initial_df shape: {feature_initial_df.shape}")
  96. print('step 2: process')
  97. feature_initial_df['apptype'] = feature_initial_df['apptype'].astype(int)
  98. # feature_df = feature_initial_df[feature_initial_df['apptype'] == app_type].copy()
  99. feature_df = feature_initial_df.copy()
  100. # 增加此次是否有广告字段 'ad_status' 1: 有广告, 0: 无广告
  101. feature_df['ad_status'] = feature_df.apply(func=lambda x: 1 if x['ad_mid'] == x['mid'] else 0, axis=1)
  102. feature_df['share_videoid'].fillna(0, inplace=True)
  103. feature_df['share_videoid'] = feature_df['share_videoid'].astype(int)
  104. feature_df['videoid'] = feature_df['videoid'].astype(int)
  105. # 增加此次是否分享了该视频 'share_status' 1: 分享, 0: 为分享
  106. feature_df['share_status'] = feature_df.apply(func=lambda x: 1 if x['share_videoid'] == x['videoid'] else 0, axis=1)
  107. # 缺失值填充
  108. feature_df.fillna(0, inplace=True)
  109. # 数据类型校正
  110. type_int_columns = [
  111. 'mid_preview_count_30day',
  112. 'mid_view_count_30day',
  113. 'mid_view_count_pv_30day',
  114. 'mid_play_count_30day',
  115. 'mid_play_count_pv_30day',
  116. 'mid_share_count_30day',
  117. 'mid_share_count_pv_30day',
  118. 'mid_return_count_30day',
  119. 'video_preview_count_uv_30day',
  120. 'video_preview_count_pv_30day',
  121. 'video_view_count_uv_30day',
  122. 'video_view_count_pv_30day',
  123. 'video_play_count_uv_30day',
  124. 'video_play_count_pv_30day',
  125. 'video_share_count_uv_30day',
  126. 'video_share_count_pv_30day',
  127. 'video_return_count_30day',
  128. ]
  129. for column_name in type_int_columns:
  130. feature_df[column_name] = feature_df[column_name].astype(int)
  131. type_float_columns = [
  132. 'mid_share_rate_30day',
  133. 'mid_return_rate_30day',
  134. 'video_ctr_uv_30day',
  135. 'video_ctr_pv_30day',
  136. 'video_share_rate_uv_30day',
  137. 'video_share_rate_pv_30day',
  138. 'video_return_rate_30day',
  139. ]
  140. for column_name in type_float_columns:
  141. feature_df[column_name] = feature_df[column_name].astype(float)
  142. print(f"feature_df shape: {feature_df.shape}")
  143. # 获取所需的字段
  144. print('step 3: get train_df')
  145. train_df = feature_df[train_feature]
  146. print(f"train_df shape: {train_df.shape}")
  147. # 写入csv
  148. train_data_dir = './data/train_data'
  149. if not os.path.exists(train_data_dir):
  150. os.makedirs(train_data_dir)
  151. train_df.to_csv(f"{train_data_dir}/{dt}.csv", index=False)
  152. return train_df
  153. if __name__ == '__main__':
  154. st_time = time.time()
  155. project = 'loghubods'
  156. table = 'admodel_data_train'
  157. # dt = '20230725'
  158. now_date = datetime.datetime.today()
  159. # dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d')
  160. # df = daily_data_process(project=project, table=table, features=features, dt=dt, app_type=0)
  161. # print(time.time() - st_time)
  162. for days in range(10, 19):
  163. cur_dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=days), '%Y%m%d')
  164. print(f"cur_dt = {cur_dt}")
  165. df = daily_data_process(project=project, table=table, features=features, dt=cur_dt, app_type=0)