user_group_update.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. import datetime
  2. import logging
  3. import multiprocessing
  4. import time
  5. import traceback
  6. import gevent
  7. import asyncio
  8. from threading import Timer
  9. from concurrent.futures import ThreadPoolExecutor
  10. from utils import RedisHelper, data_check, get_feature_data, send_msg_to_feishu
  11. from config import set_config
  12. from log import Log
  13. config_, _ = set_config()
  14. log_ = Log()
  15. redis_helper = RedisHelper()
  16. # features = [
  17. # 'apptype',
  18. # 'return1mids',
  19. # 'return2_3mids',
  20. # 'return4_8mids',
  21. # 'return9_24mids',
  22. # 'return25_nmids',
  23. # 'return0share1mids',
  24. # 'return0share2_nmids'
  25. # ]
  26. mid_group_mapping_global = {}
  27. mids_global = []
  28. def to_redis(group, mid_list, class_key_list):
  29. log_.info(f"group = {group} update redis start ...")
  30. start_time = time.time()
  31. log_.info(f"mid count = {len(mid_list)}")
  32. for class_key in class_key_list:
  33. for i in range(len(mid_list) // 100 + 1):
  34. # log_.info(f"i = {i}")
  35. mid_temp_list = mid_list[i * 100:(i + 1) * 100]
  36. # print(mid_temp_list)
  37. task_list = [
  38. gevent.spawn(redis_helper.set_data_to_redis,
  39. f"{config_.KEY_NAME_PREFIX_MID_GROUP}{class_key}:{mid}", group, 28 * 3600)
  40. for mid in mid_temp_list
  41. ]
  42. gevent.joinall(task_list)
  43. log_.info(f"group = {group}, mid count = {len(mid_list)}, update redis finished! "
  44. f"execute time = {(time.time() - start_time) / 60}min")
  45. def to_redis2(process_mid_list, mid_group_mapping, ad_mid_group_key_params):
  46. log_.info(f"mid count = {len(process_mid_list)}")
  47. start_time = time.time()
  48. for i in range(len(process_mid_list) // 100 + 1):
  49. mid_temp_list = process_mid_list[i * 100:(i + 1) * 100]
  50. task_list = []
  51. for mid in mid_temp_list:
  52. group_list = mid_group_mapping.get(mid)
  53. mid_value = {}
  54. for group in group_list:
  55. for class_key in ad_mid_group_key_params.get(group, []):
  56. mid_value[class_key] = group
  57. # print(f"mid={mid}, mid_value={mid_value}")
  58. if len(mid_value) > 0:
  59. task_list.append(
  60. gevent.spawn(redis_helper.set_data_to_redis,
  61. f"{config_.KEY_NAME_PREFIX_MID_GROUP}{mid}", str(mid_value), 28 * 3600)
  62. )
  63. gevent.joinall(task_list)
  64. log_.info(f"mid count = {len(process_mid_list)}, update redis finished! "
  65. f"execute time = {(time.time() - start_time) / 60}min")
  66. def mapping_process(group, mid_list):
  67. global mids_global, mid_group_mapping_global
  68. for mid in mid_list:
  69. if mid is None:
  70. continue
  71. if mid in mids_global:
  72. mid_group_mapping_global[mid].append(group)
  73. else:
  74. mid_group_mapping_global[mid] = [group]
  75. mids_global.append(mid)
  76. def mapping_process2(mid_list, group_list, group_mid_list):
  77. global mid_group_mapping_global
  78. for mid in mid_list:
  79. if mid is None:
  80. continue
  81. mid_group = [group for group in group_list if mid in group_mid_list.get(group, [])]
  82. mid_group_mapping_global[mid] = mid_group
  83. async def get_mid_group_mapping2(feature_df, group_list):
  84. """获取mid对应的分组列表"""
  85. start_time = time.time()
  86. group_mid_list = {}
  87. mid_list_all = []
  88. for group in group_list:
  89. start_time = time.time()
  90. mid_list = feature_df[group].tolist()
  91. mid_list = list(set(mid_list))
  92. group_mid_list[group] = mid_list
  93. mid_list_all.extend(mid_list)
  94. log_.info(f"group = {group}, mid_list_count = {len(mid_list)}")
  95. mid_list_all = list(set(mid_list_all))
  96. log_.info(f"mid_list_all count = {len(mid_list_all)}")
  97. global mids_global, mid_group_mapping_global
  98. mids_global = mid_list_all
  99. step = 10000
  100. loop = asyncio.get_running_loop()
  101. executor = ThreadPoolExecutor(max_workers=20)
  102. tasks = []
  103. for i in range(len(mid_list_all) // step + 1):
  104. log_.info(f"i = {i}")
  105. process_mid_list = mid_list_all[i * step:(i + 1) * step]
  106. tasks.append(loop.run_in_executor(executor, mapping_process2, process_mid_list, group_list, group_mid_list))
  107. await asyncio.wait(tasks)
  108. # mid_group_mapping = {}
  109. # for mid in mid_list_all:
  110. # if mid is None:
  111. # continue
  112. # mid_group = [group for group in group_list if mid in group_mid_list.get(group, [])]
  113. # mid_group_mapping[mid] = mid_group
  114. # return mid_group_mapping, mid_list_all
  115. log_.info(f"group mid mapping finished! "
  116. f"mid_count = {len(mids_global)}, mid_group_mapping_count = {len(mid_group_mapping_global)}, "
  117. f"execute time = {(time.time() - start_time) / 60}min")
  118. async def get_mid_group_mapping(feature_df, group_list):
  119. """获取mid对应的分组列表"""
  120. for group in group_list:
  121. start_time = time.time()
  122. mid_list = feature_df[group].tolist()
  123. mid_list = list(set(mid_list))
  124. log_.info(f"group = {group}, mid_list_count = {len(mid_list)}")
  125. # pool = multiprocessing.Pool(processes=10)
  126. # step = len(mid_list) // (10 - 1)
  127. # for i in range(10 + 1):
  128. # process_mid_list = mid_list[i * step:(i + 1) * step]
  129. # pool.apply_async(func=mapping_process, args=(group, process_mid_list))
  130. # pool.close()
  131. # pool.join()
  132. # step = len(mid_list) // (100 - 1)
  133. step = 100
  134. loop = asyncio.get_running_loop()
  135. executor = ThreadPoolExecutor(max_workers=20)
  136. tasks = []
  137. for i in range(len(mid_list)//step + 1):
  138. log_.info(f"i = {i}")
  139. process_mid_list = mid_list[i * step:(i + 1) * step]
  140. tasks.append(loop.run_in_executor(executor, mapping_process, group, process_mid_list))
  141. await asyncio.wait(tasks)
  142. global mids_global, mid_group_mapping_global
  143. log_.info(f"group = {group} mid mapping finished! "
  144. f"mid_count = {len(mids_global)}, mid_group_mapping_count = {len(mid_group_mapping_global)}, "
  145. f"execute time = {(time.time() - start_time) / 60}min")
  146. # for mid in mid_list:
  147. # if mid is None:
  148. # continue
  149. # if mid in mids:
  150. # mid_group_mapping[mid].append(group)
  151. # else:
  152. # mid_group_mapping[mid] = [group]
  153. # mids.append(mid)
  154. # mid_group_mapping, mids = mid_group_mapping_global, mids_global
  155. # return mid_group_mapping, mids
  156. def update_user_group_to_redis(project, table, dt, app_type_list, features, ad_mid_group_key_params):
  157. """更新mid对应分组到redis中"""
  158. # 获取用户分组数据
  159. feature_df = get_feature_data(project=project, table=table, features=features, dt=dt)
  160. feature_df['apptype'] = feature_df['apptype'].astype(int)
  161. feature_df = feature_df[feature_df['apptype'].isin(app_type_list)]
  162. # print(len(feature_df))
  163. group_list = [group for group in ad_mid_group_key_params]
  164. # mid_group_mapping, mids = get_mid_group_mapping(feature_df=feature_df, group_list=group_list)
  165. asyncio.run(get_mid_group_mapping2(feature_df=feature_df, group_list=group_list))
  166. global mid_group_mapping_global, mids_global
  167. mid_group_mapping, mids = mid_group_mapping_global, mids_global
  168. pool = multiprocessing.Pool(processes=len(ad_mid_group_key_params))
  169. step = len(mids) // (len(ad_mid_group_key_params) - 1)
  170. for i in range(len(ad_mid_group_key_params) + 1):
  171. process_mid_list = mids[i*step:(i+1)*step]
  172. pool.apply_async(func=to_redis2, args=(process_mid_list, mid_group_mapping, ad_mid_group_key_params))
  173. # for group, class_key_list in ad_mid_group_key_params.items():
  174. # mid_list = feature_df[group].tolist()
  175. # mid_list = list(set(mid_list))
  176. # mid_list = [mid for mid in mid_list if mid is not None]
  177. # # class_key_list = ad_mid_group_key_params.get(group)
  178. # pool.apply_async(func=to_redis, args=(group, mid_list, class_key_list))
  179. pool.close()
  180. pool.join()
  181. def get_group_keys_mapping(ad_mid_group):
  182. ad_mid_group_key_params = {}
  183. features = ['apptype']
  184. for class_key, group_list in ad_mid_group.items():
  185. for group in group_list:
  186. if group not in features:
  187. features.append(group)
  188. ad_mid_group_key_params[group] = [class_key]
  189. else:
  190. ad_mid_group_key_params[group].append(class_key)
  191. return features, ad_mid_group_key_params
  192. def timer_check():
  193. try:
  194. app_type_list = config_.AD_APP_TYPE_LIST
  195. ad_mid_group = config_.AD_MID_GROUP
  196. project = config_.ad_model_data['user_group'].get('project')
  197. table = config_.ad_model_data['user_group'].get('table')
  198. now_date = datetime.datetime.today()
  199. dt = datetime.datetime.strftime(now_date, '%Y%m%d')
  200. log_.info(f"now_date: {dt}")
  201. now_min = datetime.datetime.now().minute
  202. # 查看当前更新的数据是否已准备好
  203. data_count = data_check(project=project, table=table, dt=dt)
  204. if data_count > 0:
  205. log_.info(f"user group data count = {data_count}")
  206. # 获取features & 用户分组对应key
  207. features, ad_mid_group_key_params = get_group_keys_mapping(ad_mid_group=ad_mid_group)
  208. log_.info(f"features = {features}, \nad_mid_group_key_params = {ad_mid_group_key_params}")
  209. # 数据准备好,进行更新
  210. update_user_group_to_redis(project=project, table=table, dt=dt, app_type_list=app_type_list,
  211. features=features, ad_mid_group_key_params=ad_mid_group_key_params)
  212. log_.info(f"user group data update end!")
  213. # elif now_min > 45:
  214. # log_.info('user group data is None!')
  215. # send_msg_to_feishu(
  216. # webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  217. # key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  218. # msg_text=f"rov-offline{config_.ENV_TEXT} - 用户分组数据未准备好!\n"
  219. # f"traceback: {traceback.format_exc()}"
  220. # )
  221. else:
  222. # 数据没准备好,1分钟后重新检查
  223. Timer(60, timer_check).start()
  224. except Exception as e:
  225. log_.error(f"用户分组数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
  226. send_msg_to_feishu(
  227. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  228. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  229. msg_text=f"rov-offline{config_.ENV_TEXT} - 用户分组数据更新失败\n"
  230. f"exception: {e}\n"
  231. f"traceback: {traceback.format_exc()}"
  232. )
  233. if __name__ == '__main__':
  234. timer_check()