ad_users_data_update.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. import datetime
  2. import pandas as pd
  3. from odps import ODPS
  4. from utils import get_data_from_odps, RedisHelper, check_table_partition_exits
  5. from config import set_config
  6. from log import Log
  7. config_, _ = set_config()
  8. log_ = Log()
  9. def h_data_check(project, table, now_date):
  10. """检查数据是否准备好"""
  11. odps = ODPS(
  12. access_id=config_.ODPS_CONFIG['ACCESSID'],
  13. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  14. project=project,
  15. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  16. connect_timeout=3000,
  17. read_timeout=500000,
  18. pool_maxsize=1000,
  19. pool_connections=1000
  20. )
  21. try:
  22. dt = datetime.datetime.strftime(now_date, '%Y%m%d')
  23. check_res = check_table_partition_exits(date=dt, project=project, table=table)
  24. if check_res:
  25. sql = f'select * from {project}.{table} where dt = {dt}'
  26. with odps.execute_sql(sql=sql).open_reader() as reader:
  27. data_count = reader.count
  28. else:
  29. data_count = 0
  30. except Exception as e:
  31. data_count = 0
  32. return data_count
  33. def get_feature_data(project, table, features, now_date):
  34. """获取特征数据"""
  35. dt = datetime.datetime.strftime(now_date, '%Y%m%d')
  36. records = get_data_from_odps(date=dt, project=project, table=table)
  37. feature_data = []
  38. for record in records:
  39. item = {}
  40. for feature_name in features:
  41. item[feature_name] = record[feature_name]
  42. feature_data.append(item)
  43. feature_df = pd.DataFrame(feature_data)
  44. return feature_df
  45. def predict_user_group_share_rate(now_date):
  46. """预估用户组对应的有广告时分享率"""
  47. # 获取用户组特征
  48. project = config_.ad_model_data['users_share_rate'].get('project')
  49. table = config_.ad_model_data['users_share_rate'].get('table')
  50. features = [
  51. 'apptype',
  52. 'group',
  53. 'sharerate_all',
  54. 'sharerate_ad'
  55. ]
  56. user_group_df = get_feature_data(project=project, table=table, features=features, now_date=now_date)
  57. user_group_df['sharerate_all'] = user_group_df['sharerate_all'].astype(float)
  58. user_group_df['sharerate_ad'] = user_group_df['sharerate_ad'].astype(float)
  59. # 获取有广告时所有用户组近30天的分享率
  60. ad_all_group_share_rate = user_group_df[user_group_df['group'] == 'allmids']['sharerate_ad']
  61. user_group_df = user_group_df[user_group_df['group'] != 'allmids']
  62. # 计算用户组有广告时分享率
  63. user_group_df['group_ad_share_rate'] = \
  64. user_group_df['sharerate_ad'] * float(ad_all_group_share_rate) / user_group_df['sharerate_all']
  65. # 结果写入redis
  66. # key_name = f"{config_.KEY_NAME_PREFIX_AD_GROUP}{datetime.datetime.strftime(now_date, '%Y%m%d')}"
  67. # redis_data = {}
  68. # for item in user_group_df:
  69. # redis_data[item['group']] = item['group_ad_share_rate']
  70. # if len(redis_data) > 0:
  71. # redis_helper = RedisHelper()
  72. # redis_helper.add_data_with_zset(key_name=key_name, data=redis_data, expire_time=2 * 24 * 3600)
  73. return user_group_df
  74. def predict_video_share_rate(now_date):
  75. """预估视频有广告时分享率"""
  76. # 获取视频特征
  77. project = config_.ad_model_data['videos_share_rate'].get('project')
  78. table = config_.ad_model_data['videos_share_rate'].get('table')
  79. features = [
  80. 'apptype',
  81. 'videoid',
  82. 'sharerate_all',
  83. 'sharerate_ad'
  84. ]
  85. video_df = get_feature_data(project=project, table=table, features=features, now_date=now_date)
  86. video_df['sharerate_all'] = video_df['sharerate_all'].astype(float)
  87. video_df['sharerate_ad'] = video_df['sharerate_ad'].astype(float)
  88. # 获取有广告时所有视频近30天的分享率
  89. ad_all_videos_share_rate = video_df[video_df['videoid'] == 'allvideos']['sharerate_ad']
  90. video_df = video_df[video_df['videoid'] != 'allvideos']
  91. # 计算视频有广告时分享率
  92. video_df['video_ad_share_rate'] = \
  93. video_df['sharerate_ad'] * float(ad_all_videos_share_rate) / video_df['sharerate_all']
  94. # 结果写入redis
  95. # key_name = f"{config_.KEY_NAME_PREFIX_AD_VIDEO}{datetime.datetime.strftime(now_date, '%Y%m%d')}"
  96. # redis_data = {}
  97. # for item in video_df:
  98. # redis_data[item['videoid']] = item['video_ad_share_rate']
  99. # if len(redis_data) > 0:
  100. # redis_helper = RedisHelper()
  101. # redis_helper.add_data_with_zset(key_name=key_name, data=redis_data, expire_time=2 * 24 * 3600)
  102. return video_df
  103. def predict_ad_group_video(now_date):
  104. user_group_df = predict_user_group_share_rate(now_date)
  105. video_df = predict_video_share_rate(now_date)
  106. predict_df = video_df
  107. for item in user_group_df:
  108. predict_df[item['group']] = predict_df['videoid'] * item['group_ad_share_rate']
  109. return predict_df
  110. if __name__ == '__main__':
  111. now_date = datetime.datetime.today()
  112. predict_df = predict_ad_group_video(now_date)