user_group_update.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. import datetime
  2. import logging
  3. import multiprocessing
  4. import time
  5. import traceback
  6. import gevent
  7. import asyncio
  8. from threading import Timer
  9. from concurrent.futures import ThreadPoolExecutor
  10. from utils import RedisHelper, data_check, get_feature_data, send_msg_to_feishu
  11. from config import set_config
  12. from log import Log
  13. config_, _ = set_config()
  14. log_ = Log()
  15. redis_helper = RedisHelper()
  16. # features = [
  17. # 'apptype',
  18. # 'return1mids',
  19. # 'return2_3mids',
  20. # 'return4_8mids',
  21. # 'return9_24mids',
  22. # 'return25_nmids',
  23. # 'return0share1mids',
  24. # 'return0share2_nmids'
  25. # ]
  26. mid_group_mapping_global = {}
  27. mids_global = []
  28. def to_redis(group, mid_list, class_key_list):
  29. log_.info(f"group = {group} update redis start ...")
  30. start_time = time.time()
  31. log_.info(f"mid count = {len(mid_list)}")
  32. for class_key in class_key_list:
  33. for i in range(len(mid_list) // 100 + 1):
  34. # log_.info(f"i = {i}")
  35. mid_temp_list = mid_list[i * 100:(i + 1) * 100]
  36. # print(mid_temp_list)
  37. task_list = [
  38. gevent.spawn(redis_helper.set_data_to_redis,
  39. f"{config_.KEY_NAME_PREFIX_MID_GROUP}{class_key}:{mid}", group, 28 * 3600)
  40. for mid in mid_temp_list
  41. ]
  42. gevent.joinall(task_list)
  43. log_.info(f"group = {group}, mid count = {len(mid_list)}, update redis finished! "
  44. f"execute time = {(time.time() - start_time) / 60}min")
  45. def to_redis2(process_mid_list, mid_group_mapping, ad_mid_group_key_params):
  46. log_.info(f"mid count = {len(process_mid_list)}")
  47. start_time = time.time()
  48. for i in range(len(process_mid_list) // 100 + 1):
  49. mid_temp_list = process_mid_list[i * 100:(i + 1) * 100]
  50. task_list = []
  51. for mid in mid_temp_list:
  52. group_list = mid_group_mapping.get(mid)
  53. mid_value = {}
  54. for group in group_list:
  55. for class_key in ad_mid_group_key_params.get(group, []):
  56. mid_value[class_key] = group
  57. # print(f"mid={mid}, mid_value={mid_value}")
  58. if len(mid_value) > 0:
  59. task_list.append(
  60. gevent.spawn(redis_helper.set_data_to_redis,
  61. f"{config_.KEY_NAME_PREFIX_MID_GROUP}{mid}", str(mid_value), 28 * 3600)
  62. )
  63. gevent.joinall(task_list)
  64. log_.info(f"mid count = {len(process_mid_list)}, update redis finished! "
  65. f"execute time = {(time.time() - start_time) / 60}min")
  66. def mapping_process(group, mid_list):
  67. global mids_global, mid_group_mapping_global
  68. for mid in mid_list:
  69. if mid is None:
  70. continue
  71. if mid in mids_global:
  72. mid_group_mapping_global[mid].append(group)
  73. else:
  74. mid_group_mapping_global[mid] = [group]
  75. mids_global.append(mid)
  76. async def get_mid_group_mapping(feature_df, group_list):
  77. """获取mid对应的分组列表"""
  78. for group in group_list:
  79. start_time = time.time()
  80. mid_list = feature_df[group].tolist()
  81. mid_list = list(set(mid_list))
  82. log_.info(f"group = {group}, mid_list_count = {len(mid_list)}")
  83. # pool = multiprocessing.Pool(processes=10)
  84. # step = len(mid_list) // (10 - 1)
  85. # for i in range(10 + 1):
  86. # process_mid_list = mid_list[i * step:(i + 1) * step]
  87. # pool.apply_async(func=mapping_process, args=(group, process_mid_list))
  88. # pool.close()
  89. # pool.join()
  90. # step = len(mid_list) // (100 - 1)
  91. step = 100
  92. loop = asyncio.get_running_loop()
  93. executor = ThreadPoolExecutor(max_workers=20)
  94. tasks = []
  95. for i in range(len(mid_list)//step + 1):
  96. log_.info(f"i = {i}")
  97. process_mid_list = mid_list[i * step:(i + 1) * step]
  98. tasks.append(loop.run_in_executor(executor, mapping_process, group, process_mid_list))
  99. await asyncio.wait(tasks)
  100. global mids_global, mid_group_mapping_global
  101. log_.info(f"group = {group} mid mapping finished! "
  102. f"mid_count = {len(mids_global)}, mid_group_mapping_count = {len(mid_group_mapping_global)}, "
  103. f"execute time = {(time.time() - start_time) / 60}min")
  104. # for mid in mid_list:
  105. # if mid is None:
  106. # continue
  107. # if mid in mids:
  108. # mid_group_mapping[mid].append(group)
  109. # else:
  110. # mid_group_mapping[mid] = [group]
  111. # mids.append(mid)
  112. # mid_group_mapping, mids = mid_group_mapping_global, mids_global
  113. # return mid_group_mapping, mids
  114. def update_user_group_to_redis(project, table, dt, app_type_list, features, ad_mid_group_key_params):
  115. """更新mid对应分组到redis中"""
  116. # 获取用户分组数据
  117. feature_df = get_feature_data(project=project, table=table, features=features, dt=dt)
  118. feature_df['apptype'] = feature_df['apptype'].astype(int)
  119. feature_df = feature_df[feature_df['apptype'].isin(app_type_list)]
  120. # print(len(feature_df))
  121. group_list = [group for group in ad_mid_group_key_params]
  122. # mid_group_mapping, mids = get_mid_group_mapping(feature_df=feature_df, group_list=group_list)
  123. asyncio.run(get_mid_group_mapping(feature_df=feature_df, group_list=group_list))
  124. global mid_group_mapping_global, mids_global
  125. mid_group_mapping, mids = mid_group_mapping_global, mids_global
  126. pool = multiprocessing.Pool(processes=len(ad_mid_group_key_params))
  127. step = len(mids) // (len(ad_mid_group_key_params) - 1)
  128. for i in range(len(ad_mid_group_key_params) + 1):
  129. process_mid_list = mids[i*step:(i+1)*step]
  130. pool.apply_async(func=to_redis2, args=(process_mid_list, mid_group_mapping, ad_mid_group_key_params))
  131. # for group, class_key_list in ad_mid_group_key_params.items():
  132. # mid_list = feature_df[group].tolist()
  133. # mid_list = list(set(mid_list))
  134. # mid_list = [mid for mid in mid_list if mid is not None]
  135. # # class_key_list = ad_mid_group_key_params.get(group)
  136. # pool.apply_async(func=to_redis, args=(group, mid_list, class_key_list))
  137. pool.close()
  138. pool.join()
  139. def get_group_keys_mapping(ad_mid_group):
  140. ad_mid_group_key_params = {}
  141. features = ['apptype']
  142. for class_key, group_list in ad_mid_group.items():
  143. for group in group_list:
  144. if group not in features:
  145. features.append(group)
  146. ad_mid_group_key_params[group] = [class_key]
  147. else:
  148. ad_mid_group_key_params[group].append(class_key)
  149. return features, ad_mid_group_key_params
  150. def timer_check():
  151. try:
  152. app_type_list = config_.AD_APP_TYPE_LIST
  153. ad_mid_group = config_.AD_MID_GROUP
  154. project = config_.ad_model_data['user_group'].get('project')
  155. table = config_.ad_model_data['user_group'].get('table')
  156. now_date = datetime.datetime.today()
  157. dt = datetime.datetime.strftime(now_date, '%Y%m%d')
  158. log_.info(f"now_date: {dt}")
  159. now_min = datetime.datetime.now().minute
  160. # 查看当前更新的数据是否已准备好
  161. data_count = data_check(project=project, table=table, dt=dt)
  162. if data_count > 0:
  163. log_.info(f"user group data count = {data_count}")
  164. # 获取features & 用户分组对应key
  165. features, ad_mid_group_key_params = get_group_keys_mapping(ad_mid_group=ad_mid_group)
  166. log_.info(f"features = {features}, \nad_mid_group_key_params = {ad_mid_group_key_params}")
  167. # 数据准备好,进行更新
  168. update_user_group_to_redis(project=project, table=table, dt=dt, app_type_list=app_type_list,
  169. features=features, ad_mid_group_key_params=ad_mid_group_key_params)
  170. log_.info(f"user group data update end!")
  171. # elif now_min > 45:
  172. # log_.info('user group data is None!')
  173. # send_msg_to_feishu(
  174. # webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  175. # key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  176. # msg_text=f"rov-offline{config_.ENV_TEXT} - 用户分组数据未准备好!\n"
  177. # f"traceback: {traceback.format_exc()}"
  178. # )
  179. else:
  180. # 数据没准备好,1分钟后重新检查
  181. Timer(60, timer_check).start()
  182. except Exception as e:
  183. log_.error(f"用户分组数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
  184. send_msg_to_feishu(
  185. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  186. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  187. msg_text=f"rov-offline{config_.ENV_TEXT} - 用户分组数据更新失败\n"
  188. f"exception: {e}\n"
  189. f"traceback: {traceback.format_exc()}"
  190. )
  191. if __name__ == '__main__':
  192. timer_check()