import datetime import os import traceback from odps import ODPS from datetime import datetime as dt from threading import Timer from config import set_config from log import Log from utils import send_msg_to_feishu config_, env = set_config() log_ = Log() def rov_train_recall_pool_update(): # 训练数据和预测数据都准备好时,更新模型,预测 os.system('cd {} && sh {}/rov_train_recall_pool_update.sh'.format(config_.PROJECT_PATH, config_.PROJECT_PATH)) # # 将日志上传到oss # log_cmd = "ossutil cp -r -f {} oss://{}/{}".format(log_.logname, config_.BUCKET_NAME, # config_.OSS_FOLDER_LOGS + 'rov_recall_pool/') # os.system(log_cmd) # 生产环境 - 将预测得到的ROV数据文件predict.csv上传到oss if env == 'pro': data_cmd = "ossutil cp -r -f {} oss://{}/{}dt={}/{}".format("{}/data/predict.csv".format(config_.PROJECT_PATH), config_.BUCKET_NAME, config_.OSS_FOLDER_DATA, datetime.datetime.today().strftime('%Y%m%d'), 'predict.csv') os.system(data_cmd) def data_check(project, table, date): odps = ODPS( access_id=config_.ODPS_CONFIG['ACCESSID'], secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'], project=project, endpoint=config_.ODPS_CONFIG['ENDPOINT'], connect_timeout=3000, read_timeout=500000, pool_maxsize=1000, pool_connections=1000 ) try: sql = "select * from {}.{} where dt = {}".format(project, table, date) with odps.execute_sql(sql=sql).open_reader() as reader: feature_count = reader.count except Exception as e: feature_count = 0 return feature_count def timer_check(): # 当前日期 now_date = datetime.datetime.today() # 训练数据 最近日期分区 train_dt = now_date - datetime.timedelta(days=config_.TRAIN_DIFF) train_date = dt.strftime(train_dt, '%Y%m%d') # 预测数据 最近日期分区 predict_dt = now_date - datetime.timedelta(days=config_.PREDICT_DIFF) predict_date = dt.strftime(predict_dt, '%Y%m%d') # 查看训练数据特征是否准备好 train_feature_count = data_check(config_.TRAIN_PROJECT, config_.TRAIN_TABLE, train_date) # 查看训练数据特征是否准备好 predict_feature_count = data_check(config_.PREDICT_PROJECT, config_.PREDICT_TABLE, predict_date) # 数据未准备好,1分钟后重新检查 if train_feature_count == 0 or predict_feature_count == 0: # 数据未准备好,1分钟后重新检查 Timer(60, timer_check).start() else: # 数据准备好,更新模型,预测 rov_train_recall_pool_update() if __name__ == '__main__': try: timer_check() except Exception as e: log_.error('ROV召回池更新失败, exception: {}, traceback: {}'.format(e, traceback.format_exc())) send_msg_to_feishu( webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'), key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'), msg_text='rov-offline{} - ROV召回池更新失败, exception: {}'.format(config_.ENV_TEXT, e) )