rov_data_check.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. import datetime
  2. import os
  3. from odps import ODPS
  4. from datetime import datetime as dt
  5. from threading import Timer
  6. from config import set_config
  7. from log import Log
  8. config_ = set_config()
  9. log_ = Log()
  10. def rov_train_recall_pool_update():
  11. # 训练数据和预测数据都准备好时,更新模型,预测
  12. os.system('sh /data2/rov-offline/rov_train_recall_pool_update.sh')
  13. # # 将日志上传到oss
  14. # log_cmd = "ossutil cp -r -f {} oss://{}/{}".format(log_.logname, config_.BUCKET_NAME,
  15. # config_.OSS_FOLDER_LOGS + 'rov_recall_pool/')
  16. # os.system(log_cmd)
  17. # # 将data上传到oss
  18. # data_cmd = "ossutil cp -r -f {} oss://{}/{}".format("/data/rov-offline/data", config_.BUCKET_NAME,
  19. # config_.OSS_FOLDER_DATA)
  20. # os.system(data_cmd)
  21. def data_check(project, table, date):
  22. odps = ODPS(
  23. access_id='LTAI4FtW5ZzxMvdw35aNkmcp',
  24. secret_access_key='0VKnydcaHK3ITjylbgUsLubX6rwiwc',
  25. project=project,
  26. endpoint='http://service.cn.maxcompute.aliyun.com/api',
  27. connect_timeout=3000,
  28. read_timeout=500000,
  29. pool_maxsize=1000,
  30. pool_connections=1000
  31. )
  32. try:
  33. sql = "select * from {}.{} where dt = {}".format(project, table, date)
  34. with odps.execute_sql(sql=sql).open_reader() as reader:
  35. feature_count = reader.count
  36. except Exception as e:
  37. feature_count = 0
  38. return feature_count
  39. def timer_check():
  40. # 当前日期
  41. now_date = datetime.datetime.today()
  42. # 训练数据 最近日期分区
  43. train_dt = now_date - datetime.timedelta(days=config_.TRAIN_DIFF)
  44. train_date = dt.strftime(train_dt, '%Y%m%d')
  45. # 预测数据 最近日期分区
  46. predict_dt = now_date - datetime.timedelta(days=config_.PREDICT_DIFF)
  47. predict_date = dt.strftime(predict_dt, '%Y%m%d')
  48. # 查看训练数据特征是否准备好
  49. train_feature_count = data_check(config_.TRAIN_PROJECT, config_.TRAIN_TABLE, train_date)
  50. # 查看训练数据特征是否准备好
  51. predict_feature_count = data_check(config_.PREDICT_PROJECT, config_.PREDICT_TABLE, predict_date)
  52. # 数据未准备好,1分钟后重新检查
  53. if train_feature_count == 0 or predict_feature_count == 0:
  54. # 数据未准备好,1分钟后重新检查
  55. Timer(60, timer_check).start()
  56. else:
  57. # 数据准备好,更新模型,预测
  58. rov_train_recall_pool_update()
  59. if __name__ == '__main__':
  60. timer_check()