import hashlib import datetime import sys import traceback import pandas as pd from odps import ODPS from threading import Timer from get_data import get_data_from_odps from my_utils import send_msg_to_feishu from db_helper import RedisHelper from my_config import set_config from log import Log config_, env = set_config() log_ = Log() features = ['mid'] def data_check(project, table, now_date): """检查数据是否准备好""" odps = ODPS( access_id=config_.ODPS_CONFIG['ACCESSID'], secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'], project=project, endpoint=config_.ODPS_CONFIG['ENDPOINT'], connect_timeout=3000, read_timeout=500000, pool_maxsize=1000, pool_connections=1000 ) try: dt = datetime.datetime.strftime(now_date, '%Y%m%d') sql = f'select * from {project}.{table} where dt = {dt}' with odps.execute_sql(sql=sql).open_reader() as reader: data_count = reader.count except Exception as e: data_count = 0 return data_count def get_religion_users(now_date, project, table, key_name_prefix): """获取宗教用户列表""" # 获取mid dt = datetime.datetime.strftime(now_date, '%Y%m%d') records = get_data_from_odps(date=dt, project=project, table=table) feature_data = [] for record in records: item = {} for feature_name in features: item[feature_name] = record[feature_name] feature_data.append(item) feature_df = pd.DataFrame(feature_data) mid_list = feature_df['mid'].tolist() # 对mid哈希,写入对应的key中 hash_result = {} hash_tag_list = [] for mid in mid_list: hash_mid = hashlib.md5(mid.encode('utf-8')).hexdigest() hash_tag = hash_mid[-1:] if hash_tag in hash_tag_list: hash_result[hash_tag].append(mid) else: hash_result[hash_tag] = [mid] hash_tag_list.append(hash_tag) log_.info(f"hash_result_count: {len(hash_result)}") # 写入对应的redis if len(hash_result) == 0: return redis_helper = RedisHelper() for key, val in hash_result.items(): log_.info(f"key: {key}, val_count: {len(val)}") if len(val) > 0: key_name = f"{key_name_prefix}{key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}" redis_helper.add_data_with_set(key_name=key_name, values=val, expire_time=2 * 24 * 3600) def timer_check(religion_name): key_name_prefix = config_.RELIGION_USERS[religion_name]['key_name_prefix'] now_date = datetime.datetime.today() now_h = datetime.datetime.now().hour log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}, now_h: {now_h}") if now_h == 3: data_class = 'day' else: data_class = 'hour' log_.info(f"data_class: {data_class}") project = config_.RELIGION_USERS[religion_name][data_class]['project'] table = config_.RELIGION_USERS[religion_name][data_class]['table'] # 查看当天更新的数据是否已准备好 data_count = data_check(project=project, table=table, now_date=now_date) if data_count > 0: log_.info(f'religion_name = {religion_name}, religion_users_count = {data_count}') # 数据准备好,进行更新 get_religion_users(now_date=now_date, project=project, table=table, key_name_prefix=key_name_prefix) # send_msg_to_feishu( # webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'), # key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'), # msg_text=f"rov-offline{config_.ENV_TEXT} - 宗教用户数据更新完成\n" # f"religion_name: {religion_name}\n" # f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}\n" # f"now_h: {now_h}\n" # ) else: # 数据没准备好,1分钟后重新检查 Timer(5 * 60, timer_check, args=[religion_name]).start() def main(): try: religion_name = sys.argv[1] timer_check(religion_name) except Exception as e: log_.error(f"宗教用户数据更新失败, exception: {e}, traceback: {traceback.format_exc()}") send_msg_to_feishu( webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'), key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'), msg_text=f"rov-offline{config_.ENV_TEXT} - 宗教用户数据更新失败\n" f"exception: {e}\n" f"traceback: {traceback.format_exc()}" ) if __name__ == '__main__': main()