utils.py 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. import pickle
  2. import os
  3. from odps import ODPS
  4. from config import set_config
  5. config_ = set_config()
  6. def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=500000,
  7. pool_maxsize=1000, pool_connections=1000):
  8. """
  9. 从odps获取数据
  10. :param date: 日期 type-string '%Y%m%d'
  11. :param project: type-string
  12. :param table: 表名 type-string
  13. :param connect_timeout: 连接超时设置
  14. :param read_timeout: 读取超时设置
  15. :param pool_maxsize:
  16. :param pool_connections:
  17. :return: records
  18. """
  19. odps = ODPS(
  20. access_id='LTAI4FtW5ZzxMvdw35aNkmcp',
  21. secret_access_key='0VKnydcaHK3ITjylbgUsLubX6rwiwc',
  22. project=project,
  23. endpoint='http://service.cn.maxcompute.aliyun.com/api',
  24. connect_timeout=connect_timeout,
  25. read_timeout=read_timeout,
  26. pool_maxsize=pool_maxsize,
  27. pool_connections=pool_connections
  28. )
  29. records = odps.read_table(name=table, partition='dt=%s' % date)
  30. return records
  31. def write_to_pickle(data, filename, filepath=config_.DATA_DIR_PATH):
  32. """
  33. 将数据写入pickle文件中
  34. :param data: 数据
  35. :param filename: 写入的文件名
  36. :param filepath: 文件存放路径,默认为config_.DATA_DIR_PATH
  37. :return: None
  38. """
  39. if not os.path.exists(filepath):
  40. os.makedirs(filepath)
  41. file = os.path.join(filepath, filename)
  42. with open(file, 'wb') as wf:
  43. pickle.dump(data, wf)
  44. def read_from_pickle(filename, filepath=config_.DATA_DIR_PATH):
  45. """
  46. 从pickle文件读取数据
  47. :param filename: 文件名
  48. :param filepath: 文件存放路径,默认为config_.DATA_DIR_PATH
  49. :return: data
  50. """
  51. file = os.path.join(filepath, filename)
  52. if not os.path.exists(file):
  53. return None
  54. with open(file, 'rb') as rf:
  55. data = pickle.load(rf)
  56. return data