rov_data_check.py 3.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. import datetime
  2. import os
  3. import traceback
  4. from odps import ODPS
  5. from datetime import datetime as dt
  6. from threading import Timer
  7. from my_config import set_config
  8. from log import Log
  9. from my_utils import send_msg_to_feishu
  10. config_, env = set_config()
  11. log_ = Log()
  12. def rov_train_recall_pool_update():
  13. # 训练数据和预测数据都准备好时,更新模型,预测
  14. os.system('cd {} && sh {}/rov_train_recall_pool_update.sh'.format(config_.PROJECT_PATH, config_.PROJECT_PATH))
  15. # # 将日志上传到oss
  16. # log_cmd = "ossutil cp -r -f {} oss://{}/{}".format(log_.logname, config_.BUCKET_NAME,
  17. # config_.OSS_FOLDER_LOGS + 'rov_recall_pool/')
  18. # os.system(log_cmd)
  19. # 生产环境 - 将预测得到的ROV数据文件predict.csv上传到oss
  20. if env == 'pro':
  21. data_cmd = "ossutil cp -r -f {} oss://{}/{}dt={}/{}".format("{}/data/predict.csv".format(config_.PROJECT_PATH),
  22. config_.BUCKET_NAME,
  23. config_.OSS_FOLDER_DATA,
  24. datetime.datetime.today().strftime('%Y%m%d'),
  25. 'predict.csv')
  26. os.system(data_cmd)
  27. def data_check(project, table, date):
  28. odps = ODPS(
  29. access_id=config_.ODPS_CONFIG['ACCESSID'],
  30. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  31. project=project,
  32. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  33. connect_timeout=3000,
  34. read_timeout=500000,
  35. pool_maxsize=1000,
  36. pool_connections=1000
  37. )
  38. try:
  39. sql = "select * from {}.{} where dt = {}".format(project, table, date)
  40. with odps.execute_sql(sql=sql).open_reader() as reader:
  41. feature_count = reader.count
  42. except Exception as e:
  43. feature_count = 0
  44. return feature_count
  45. def timer_check():
  46. # 当前日期
  47. now_date = datetime.datetime.today()
  48. # 训练数据 最近日期分区
  49. train_dt = now_date - datetime.timedelta(days=config_.TRAIN_DIFF)
  50. train_date = dt.strftime(train_dt, '%Y%m%d')
  51. # 预测数据 最近日期分区
  52. predict_dt = now_date - datetime.timedelta(days=config_.PREDICT_DIFF)
  53. predict_date = dt.strftime(predict_dt, '%Y%m%d')
  54. # 查看训练数据特征是否准备好
  55. train_feature_count = data_check(config_.TRAIN_PROJECT, config_.TRAIN_TABLE, train_date)
  56. # 查看训练数据特征是否准备好
  57. predict_feature_count = data_check(config_.PREDICT_PROJECT, config_.PREDICT_TABLE, predict_date)
  58. # 数据未准备好,1分钟后重新检查
  59. if train_feature_count == 0 or predict_feature_count == 0:
  60. # 数据未准备好,1分钟后重新检查
  61. Timer(60, timer_check).start()
  62. else:
  63. # 数据准备好,更新模型,预测
  64. rov_train_recall_pool_update()
  65. if __name__ == '__main__':
  66. try:
  67. timer_check()
  68. except Exception as e:
  69. log_.error('ROV召回池更新失败, exception: {}, traceback: {}'.format(e, traceback.format_exc()))
  70. send_msg_to_feishu(
  71. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  72. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  73. msg_text='rov-offline{} - ROV召回池更新失败, exception: {}'.format(config_.ENV_TEXT, e)
  74. )