rov_data_check.py 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. import datetime
  2. import os
  3. import traceback
  4. from odps import ODPS
  5. from datetime import datetime as dt
  6. from threading import Timer
  7. from config import set_config
  8. from log import Log
  9. from utils import send_msg_to_feishu
  10. config_ = set_config()
  11. log_ = Log()
  12. def rov_train_recall_pool_update():
  13. # 训练数据和预测数据都准备好时,更新模型,预测
  14. os.system('sh /data/rov-offline/rov_train_recall_pool_update.sh')
  15. # # 将日志上传到oss
  16. # log_cmd = "ossutil cp -r -f {} oss://{}/{}".format(log_.logname, config_.BUCKET_NAME,
  17. # config_.OSS_FOLDER_LOGS + 'rov_recall_pool/')
  18. # os.system(log_cmd)
  19. # 将预测得到的ROV数据文件predict.csv上传到oss
  20. data_cmd = "ossutil cp -r -f {} oss://{}/{}dt={}/{}".format("/data/rov-offline/data/predict.csv",
  21. config_.BUCKET_NAME,
  22. config_.OSS_FOLDER_DATA,
  23. datetime.datetime.today().strftime('%Y%m%d'),
  24. 'predict.csv')
  25. os.system(data_cmd)
  26. def data_check(project, table, date):
  27. odps = ODPS(
  28. access_id='LTAI4FtW5ZzxMvdw35aNkmcp',
  29. secret_access_key='0VKnydcaHK3ITjylbgUsLubX6rwiwc',
  30. project=project,
  31. endpoint='http://service.cn.maxcompute.aliyun.com/api',
  32. connect_timeout=3000,
  33. read_timeout=500000,
  34. pool_maxsize=1000,
  35. pool_connections=1000
  36. )
  37. try:
  38. sql = "select * from {}.{} where dt = {}".format(project, table, date)
  39. with odps.execute_sql(sql=sql).open_reader() as reader:
  40. feature_count = reader.count
  41. except Exception as e:
  42. feature_count = 0
  43. return feature_count
  44. def timer_check():
  45. # 当前日期
  46. now_date = datetime.datetime.today()
  47. # 训练数据 最近日期分区
  48. train_dt = now_date - datetime.timedelta(days=config_.TRAIN_DIFF)
  49. train_date = dt.strftime(train_dt, '%Y%m%d')
  50. # 预测数据 最近日期分区
  51. predict_dt = now_date - datetime.timedelta(days=config_.PREDICT_DIFF)
  52. predict_date = dt.strftime(predict_dt, '%Y%m%d')
  53. # 查看训练数据特征是否准备好
  54. train_feature_count = data_check(config_.TRAIN_PROJECT, config_.TRAIN_TABLE, train_date)
  55. # 查看训练数据特征是否准备好
  56. predict_feature_count = data_check(config_.PREDICT_PROJECT, config_.PREDICT_TABLE, predict_date)
  57. # 数据未准备好,1分钟后重新检查
  58. if train_feature_count == 0 or predict_feature_count == 0:
  59. # 数据未准备好,1分钟后重新检查
  60. Timer(60, timer_check).start()
  61. else:
  62. # 数据准备好,更新模型,预测
  63. rov_train_recall_pool_update()
  64. if __name__ == '__main__':
  65. try:
  66. timer_check()
  67. except Exception as e:
  68. log_.error('ROV召回池更新失败, exception: {}, traceback: {}'.format(e, traceback.format_exc()))
  69. send_msg_to_feishu('rov-offline生产环境 - ROV召回池更新失败, exception: {}'.format(e))