religion_class_user_update.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. import hashlib
  2. import datetime
  3. import sys
  4. import traceback
  5. import pandas as pd
  6. from odps import ODPS
  7. from threading import Timer
  8. from get_data import get_data_from_odps
  9. from my_utils import send_msg_to_feishu
  10. from db_helper import RedisHelper
  11. from my_config import set_config
  12. from log import Log
  13. config_, env = set_config()
  14. log_ = Log()
  15. features = ['mid']
  16. def data_check(project, table, now_date):
  17. """检查数据是否准备好"""
  18. odps = ODPS(
  19. access_id=config_.ODPS_CONFIG['ACCESSID'],
  20. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  21. project=project,
  22. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  23. connect_timeout=3000,
  24. read_timeout=500000,
  25. pool_maxsize=1000,
  26. pool_connections=1000
  27. )
  28. try:
  29. dt = datetime.datetime.strftime(now_date, '%Y%m%d')
  30. sql = f'select * from {project}.{table} where dt = {dt}'
  31. with odps.execute_sql(sql=sql).open_reader() as reader:
  32. data_count = reader.count
  33. except Exception as e:
  34. data_count = 0
  35. return data_count
  36. def get_religion_users(now_date, project, table, key_name_prefix):
  37. """获取宗教用户列表"""
  38. # 获取mid
  39. dt = datetime.datetime.strftime(now_date, '%Y%m%d')
  40. records = get_data_from_odps(date=dt, project=project, table=table)
  41. feature_data = []
  42. for record in records:
  43. item = {}
  44. for feature_name in features:
  45. item[feature_name] = record[feature_name]
  46. feature_data.append(item)
  47. feature_df = pd.DataFrame(feature_data)
  48. mid_list = feature_df['mid'].tolist()
  49. # 对mid哈希,写入对应的key中
  50. hash_result = {}
  51. hash_tag_list = []
  52. for mid in mid_list:
  53. hash_mid = hashlib.md5(mid.encode('utf-8')).hexdigest()
  54. hash_tag = hash_mid[-1:]
  55. if hash_tag in hash_tag_list:
  56. hash_result[hash_tag].append(mid)
  57. else:
  58. hash_result[hash_tag] = [mid]
  59. hash_tag_list.append(hash_tag)
  60. log_.info(f"hash_result_count: {len(hash_result)}")
  61. # 写入对应的redis
  62. if len(hash_result) == 0:
  63. return
  64. redis_helper = RedisHelper()
  65. for key, val in hash_result.items():
  66. log_.info(f"key: {key}, val_count: {len(val)}")
  67. if len(val) > 0:
  68. key_name = f"{key_name_prefix}{key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}"
  69. redis_helper.add_data_with_set(key_name=key_name, values=val, expire_time=2 * 24 * 3600)
  70. def timer_check(religion_name):
  71. key_name_prefix = config_.RELIGION_USERS[religion_name]['key_name_prefix']
  72. now_date = datetime.datetime.today()
  73. now_h = datetime.datetime.now().hour
  74. log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}, now_h: {now_h}")
  75. if now_h == 3:
  76. data_class = 'day'
  77. else:
  78. data_class = 'hour'
  79. log_.info(f"data_class: {data_class}")
  80. project = config_.RELIGION_USERS[religion_name][data_class]['project']
  81. table = config_.RELIGION_USERS[religion_name][data_class]['table']
  82. # 查看当天更新的数据是否已准备好
  83. data_count = data_check(project=project, table=table, now_date=now_date)
  84. if data_count > 0:
  85. log_.info(f'religion_name = {religion_name}, religion_users_count = {data_count}')
  86. # 数据准备好,进行更新
  87. get_religion_users(now_date=now_date, project=project, table=table, key_name_prefix=key_name_prefix)
  88. # send_msg_to_feishu(
  89. # webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  90. # key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  91. # msg_text=f"rov-offline{config_.ENV_TEXT} - 宗教用户数据更新完成\n"
  92. # f"religion_name: {religion_name}\n"
  93. # f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}\n"
  94. # f"now_h: {now_h}\n"
  95. # )
  96. else:
  97. # 数据没准备好,1分钟后重新检查
  98. Timer(5 * 60, timer_check, args=[religion_name]).start()
  99. def main():
  100. try:
  101. religion_name = sys.argv[1]
  102. timer_check(religion_name)
  103. except Exception as e:
  104. log_.error(f"宗教用户数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
  105. send_msg_to_feishu(
  106. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  107. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  108. msg_text=f"rov-offline{config_.ENV_TEXT} - 宗教用户数据更新失败\n"
  109. f"exception: {e}\n"
  110. f"traceback: {traceback.format_exc()}"
  111. )
  112. if __name__ == '__main__':
  113. main()