rov_data_check.py 3.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. import datetime
  2. import os
  3. import traceback
  4. from odps import ODPS
  5. from datetime import datetime as dt
  6. from threading import Timer
  7. from config import set_config
  8. from log import Log
  9. from my_utils import send_msg_to_feishu
  10. config_, env = set_config()
  11. log_ = Log()
  12. def rov_train_recall_pool_update():
  13. # 训练数据和预测数据都准备好时,更新模型,预测
  14. os.system('cd {} && sh {}/rov_train_recall_pool_update.sh'.format(config_.PROJECT_PATH, config_.PROJECT_PATH))
  15. # # 将日志上传到oss
  16. # log_cmd = "ossutil cp -r -f {} oss://{}/{}".format(log_.logname, config_.BUCKET_NAME,
  17. # config_.OSS_FOLDER_LOGS + 'rov_recall_pool/')
  18. # os.system(log_cmd)
  19. # 生产环境 - 将预测得到的ROV数据文件predict.csv上传到oss
  20. if env == 'pro':
  21. data_cmd = "ossutil cp -r -f {} oss://{}/{}dt={}/{}".format("{}/data/predict.csv".format(config_.PROJECT_PATH),
  22. config_.BUCKET_NAME,
  23. config_.OSS_FOLDER_DATA,
  24. datetime.datetime.today().strftime('%Y%m%d'),
  25. 'predict.csv')
  26. os.system(data_cmd)
  27. def data_check(project, table, date):
  28. odps = ODPS(
  29. access_id=config_.ODPS_CONFIG['ACCESSID'],
  30. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  31. project=project,
  32. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  33. connect_timeout=3000,
  34. read_timeout=500000,
  35. pool_maxsize=1000,
  36. pool_connections=1000
  37. )
  38. try:
  39. sql = "select * from {}.{} where dt = {}".format(project, table, date)
  40. with odps.execute_sql(sql=sql).open_reader() as reader:
  41. feature_count = reader.count
  42. except Exception as e:
  43. feature_count = 0
  44. return feature_count
  45. def timer_check():
  46. # 当前日期
  47. now_date = datetime.datetime.today()
  48. # 训练数据 最近日期分区
  49. train_dt = now_date - datetime.timedelta(days=config_.TRAIN_DIFF)
  50. train_date = dt.strftime(train_dt, '%Y%m%d')
  51. # 预测数据 最近日期分区
  52. predict_dt = now_date - datetime.timedelta(days=config_.PREDICT_DIFF)
  53. predict_date = dt.strftime(predict_dt, '%Y%m%d')
  54. # 查看训练数据特征是否准备好
  55. train_feature_count = data_check(config_.TRAIN_PROJECT, config_.TRAIN_TABLE, train_date)
  56. # 查看训练数据特征是否准备好
  57. predict_feature_count = data_check(config_.PREDICT_PROJECT, config_.PREDICT_TABLE, predict_date)
  58. # 数据未准备好,1分钟后重新检查
  59. if train_feature_count == 0 or predict_feature_count == 0:
  60. # 数据未准备好,1分钟后重新检查
  61. Timer(60, timer_check).start()
  62. else:
  63. # 数据准备好,更新模型,预测
  64. rov_train_recall_pool_update()
  65. if __name__ == '__main__':
  66. try:
  67. timer_check()
  68. except Exception as e:
  69. log_.error('ROV召回池更新失败, exception: {}, traceback: {}'.format(e, traceback.format_exc()))
  70. send_msg_to_feishu(
  71. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  72. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  73. msg_text='rov-offline{} - ROV召回池更新失败, exception: {}'.format(config_.ENV_TEXT, e)
  74. )