user_group_update.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. import datetime
  2. import logging
  3. import multiprocessing
  4. import time
  5. import traceback
  6. import gevent
  7. import asyncio
  8. from threading import Timer
  9. from concurrent.futures import ThreadPoolExecutor
  10. from utils import RedisHelper, data_check, get_feature_data, send_msg_to_feishu
  11. from config import set_config
  12. from log import Log
  13. config_, _ = set_config()
  14. log_ = Log()
  15. redis_helper = RedisHelper()
  16. # features = [
  17. # 'apptype',
  18. # 'return1mids',
  19. # 'return2_3mids',
  20. # 'return4_8mids',
  21. # 'return9_24mids',
  22. # 'return25_nmids',
  23. # 'return0share1mids',
  24. # 'return0share2_nmids'
  25. # ]
  26. mid_group_mapping_global = {}
  27. mids_global = []
  28. def to_redis(group, mid_list, class_key_list):
  29. log_.info(f"group = {group} update redis start ...")
  30. start_time = time.time()
  31. log_.info(f"mid count = {len(mid_list)}")
  32. for class_key in class_key_list:
  33. for i in range(len(mid_list) // 100 + 1):
  34. # log_.info(f"i = {i}")
  35. mid_temp_list = mid_list[i * 100:(i + 1) * 100]
  36. # print(mid_temp_list)
  37. task_list = [
  38. gevent.spawn(redis_helper.set_data_to_redis,
  39. f"{config_.KEY_NAME_PREFIX_MID_GROUP}{class_key}:{mid}", group, 28 * 3600)
  40. for mid in mid_temp_list
  41. ]
  42. gevent.joinall(task_list)
  43. log_.info(f"group = {group}, mid count = {len(mid_list)}, update redis finished! "
  44. f"execute time = {(time.time() - start_time) / 60}min")
  45. def to_redis2(process_mid_list, mid_group_mapping, ad_mid_group_key_params):
  46. log_.info(f"mid count = {len(process_mid_list)}")
  47. start_time = time.time()
  48. for i in range(len(process_mid_list) // 100 + 1):
  49. mid_temp_list = process_mid_list[i * 100:(i + 1) * 100]
  50. task_list = []
  51. for mid in mid_temp_list:
  52. group_list = mid_group_mapping.get(mid)
  53. mid_value = {}
  54. for group in group_list:
  55. for class_key in ad_mid_group_key_params.get(group, []):
  56. mid_value[class_key] = group
  57. # print(f"mid={mid}, mid_value={mid_value}")
  58. if len(mid_value) > 0:
  59. task_list.append(
  60. gevent.spawn(redis_helper.set_data_to_redis,
  61. f"{config_.KEY_NAME_PREFIX_MID_GROUP}{mid}", str(mid_value), 28 * 3600)
  62. )
  63. gevent.joinall(task_list)
  64. log_.info(f"mid count = {len(process_mid_list)}, update redis finished! "
  65. f"execute time = {(time.time() - start_time) / 60}min")
  66. def mapping_process(group, mid_list):
  67. global mids_global, mid_group_mapping_global
  68. for mid in mid_list:
  69. if mid is None:
  70. continue
  71. if mid in mids_global:
  72. mid_group_mapping_global[mid].append(group)
  73. else:
  74. mid_group_mapping_global[mid] = [group]
  75. mids_global.append(mid)
  76. async def get_mid_group_mapping(feature_df, group_list):
  77. """获取mid对应的分组列表"""
  78. for group in group_list:
  79. start_time = time.time()
  80. mid_list = feature_df[group].tolist()
  81. mid_list = list(set(mid_list))
  82. log_.info(f"group = {group}, mid_list_count = {len(mid_list)}")
  83. # pool = multiprocessing.Pool(processes=10)
  84. # step = len(mid_list) // (10 - 1)
  85. # for i in range(10 + 1):
  86. # process_mid_list = mid_list[i * step:(i + 1) * step]
  87. # pool.apply_async(func=mapping_process, args=(group, process_mid_list))
  88. # pool.close()
  89. # pool.join()
  90. step = len(mid_list) // (10 - 1)
  91. loop = asyncio.get_running_loop()
  92. executor = ThreadPoolExecutor(max_workers=10)
  93. tasks = []
  94. for i in range(10 + 1):
  95. process_mid_list = mid_list[i * step:(i + 1) * step]
  96. tasks.append(loop.run_in_executor(executor, mapping_process, group, process_mid_list))
  97. await asyncio.wait(tasks)
  98. global mids_global, mid_group_mapping_global
  99. log_.info(f"group = {group} mid mapping finished! "
  100. f"mid_count = {len(mids_global)}, mid_group_mapping_count = {len(mid_group_mapping_global)}, "
  101. f"execute time = {(time.time() - start_time) / 60}min")
  102. # for mid in mid_list:
  103. # if mid is None:
  104. # continue
  105. # if mid in mids:
  106. # mid_group_mapping[mid].append(group)
  107. # else:
  108. # mid_group_mapping[mid] = [group]
  109. # mids.append(mid)
  110. # mid_group_mapping, mids = mid_group_mapping_global, mids_global
  111. # return mid_group_mapping, mids
  112. def update_user_group_to_redis(project, table, dt, app_type_list, features, ad_mid_group_key_params):
  113. """更新mid对应分组到redis中"""
  114. # 获取用户分组数据
  115. feature_df = get_feature_data(project=project, table=table, features=features, dt=dt)
  116. feature_df['apptype'] = feature_df['apptype'].astype(int)
  117. feature_df = feature_df[feature_df['apptype'].isin(app_type_list)]
  118. # print(len(feature_df))
  119. group_list = [group for group in ad_mid_group_key_params]
  120. # mid_group_mapping, mids = get_mid_group_mapping(feature_df=feature_df, group_list=group_list)
  121. asyncio.run(get_mid_group_mapping(feature_df=feature_df, group_list=group_list))
  122. global mid_group_mapping_global, mids_global
  123. mid_group_mapping, mids = mid_group_mapping_global, mids_global
  124. pool = multiprocessing.Pool(processes=len(ad_mid_group_key_params))
  125. step = len(mids) // (len(ad_mid_group_key_params) - 1)
  126. for i in range(len(ad_mid_group_key_params) + 1):
  127. process_mid_list = mids[i*step:(i+1)*step]
  128. pool.apply_async(func=to_redis2, args=(process_mid_list, mid_group_mapping, ad_mid_group_key_params))
  129. # for group, class_key_list in ad_mid_group_key_params.items():
  130. # mid_list = feature_df[group].tolist()
  131. # mid_list = list(set(mid_list))
  132. # mid_list = [mid for mid in mid_list if mid is not None]
  133. # # class_key_list = ad_mid_group_key_params.get(group)
  134. # pool.apply_async(func=to_redis, args=(group, mid_list, class_key_list))
  135. pool.close()
  136. pool.join()
  137. def get_group_keys_mapping(ad_mid_group):
  138. ad_mid_group_key_params = {}
  139. features = ['apptype']
  140. for class_key, group_list in ad_mid_group.items():
  141. for group in group_list:
  142. if group not in features:
  143. features.append(group)
  144. ad_mid_group_key_params[group] = [class_key]
  145. else:
  146. ad_mid_group_key_params[group].append(class_key)
  147. return features, ad_mid_group_key_params
  148. def timer_check():
  149. try:
  150. app_type_list = config_.AD_APP_TYPE_LIST
  151. ad_mid_group = config_.AD_MID_GROUP
  152. project = config_.ad_model_data['user_group'].get('project')
  153. table = config_.ad_model_data['user_group'].get('table')
  154. now_date = datetime.datetime.today()
  155. dt = datetime.datetime.strftime(now_date, '%Y%m%d')
  156. log_.info(f"now_date: {dt}")
  157. now_min = datetime.datetime.now().minute
  158. # 查看当前更新的数据是否已准备好
  159. data_count = data_check(project=project, table=table, dt=dt)
  160. if data_count > 0:
  161. log_.info(f"user group data count = {data_count}")
  162. # 获取features & 用户分组对应key
  163. features, ad_mid_group_key_params = get_group_keys_mapping(ad_mid_group=ad_mid_group)
  164. log_.info(f"features = {features}, \nad_mid_group_key_params = {ad_mid_group_key_params}")
  165. # 数据准备好,进行更新
  166. update_user_group_to_redis(project=project, table=table, dt=dt, app_type_list=app_type_list,
  167. features=features, ad_mid_group_key_params=ad_mid_group_key_params)
  168. log_.info(f"user group data update end!")
  169. # elif now_min > 45:
  170. # log_.info('user group data is None!')
  171. # send_msg_to_feishu(
  172. # webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  173. # key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  174. # msg_text=f"rov-offline{config_.ENV_TEXT} - 用户分组数据未准备好!\n"
  175. # f"traceback: {traceback.format_exc()}"
  176. # )
  177. else:
  178. # 数据没准备好,1分钟后重新检查
  179. Timer(60, timer_check).start()
  180. except Exception as e:
  181. log_.error(f"用户分组数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
  182. send_msg_to_feishu(
  183. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  184. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  185. msg_text=f"rov-offline{config_.ENV_TEXT} - 用户分组数据更新失败\n"
  186. f"exception: {e}\n"
  187. f"traceback: {traceback.format_exc()}"
  188. )
  189. if __name__ == '__main__':
  190. timer_check()