rov_data_check.py 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. import datetime
  2. import os
  3. from odps import ODPS
  4. from datetime import datetime as dt
  5. from threading import Timer
  6. from config import set_config
  7. config_ = set_config()
  8. def rov_train_recall_pool_update():
  9. # 训练数据和预测数据都准备好时,更新模型,预测
  10. os.system('sh /data/rov-offline/rov_train_recall_pool_update.sh')
  11. def data_check(project, table, date):
  12. odps = ODPS(
  13. access_id='LTAI4FtW5ZzxMvdw35aNkmcp',
  14. secret_access_key='0VKnydcaHK3ITjylbgUsLubX6rwiwc',
  15. project=project,
  16. endpoint='http://service.cn.maxcompute.aliyun.com/api',
  17. connect_timeout=3000,
  18. read_timeout=500000,
  19. pool_maxsize=1000,
  20. pool_connections=1000
  21. )
  22. try:
  23. sql = "select * from {}.{} where dt = {}".format(project, table, date)
  24. with odps.execute_sql(sql=sql).open_reader() as reader:
  25. feature_count = reader.count
  26. except Exception as e:
  27. feature_count = 0
  28. return feature_count
  29. def timer_check():
  30. # 当前日期
  31. now_date = datetime.datetime.today()
  32. # 训练数据 最近日期分区
  33. train_dt = now_date - datetime.timedelta(days=config_.TRAIN_DIFF)
  34. train_date = dt.strftime(train_dt, '%Y%m%d')
  35. # 预测数据 最近日期分区
  36. predict_dt = now_date - datetime.timedelta(days=config_.PREDICT_DIFF)
  37. predict_date = dt.strftime(predict_dt, '%Y%m%d')
  38. # 查看训练数据特征是否准备好
  39. train_feature_count = data_check(config_.TRAIN_PROJECT, config_.TRAIN_TABLE, train_date)
  40. # 查看训练数据特征是否准备好
  41. predict_feature_count = data_check(config_.PREDICT_PROJECT, config_.PREDICT_TABLE, predict_date)
  42. # 数据未准备好,1分钟后重新检查
  43. if train_feature_count == 0 or predict_feature_count == 0:
  44. # 数据未准备好,1分钟后重新检查
  45. Timer(60, timer_check).start()
  46. else:
  47. # 数据准备好,更新模型,预测
  48. rov_train_recall_pool_update()
  49. if __name__ == '__main__':
  50. timer_check()