utils.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. #!/usr/bin/python
  2. # coding:utf-8
  3. import pickle
  4. import os
  5. import requests
  6. import json
  7. from odps import ODPS
  8. from config import set_config
  9. from db_helper import HologresHelper
  10. config_ = set_config()
  11. def execute_sql_from_odps(project, sql, connect_timeout=3000, read_timeout=500000,
  12. pool_maxsize=1000, pool_connections=1000):
  13. odps = ODPS(
  14. access_id='LTAI4FtW5ZzxMvdw35aNkmcp',
  15. secret_access_key='0VKnydcaHK3ITjylbgUsLubX6rwiwc',
  16. project=project,
  17. endpoint='http://service.cn.maxcompute.aliyun.com/api',
  18. connect_timeout=connect_timeout,
  19. read_timeout=read_timeout,
  20. pool_maxsize=pool_maxsize,
  21. pool_connections=pool_connections
  22. )
  23. records = odps.execute_sql(sql=sql)
  24. return records
  25. def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=500000,
  26. pool_maxsize=1000, pool_connections=1000):
  27. """
  28. 从odps获取数据
  29. :param date: 日期 type-string '%Y%m%d'
  30. :param project: type-string
  31. :param table: 表名 type-string
  32. :param connect_timeout: 连接超时设置
  33. :param read_timeout: 读取超时设置
  34. :param pool_maxsize:
  35. :param pool_connections:
  36. :return: records
  37. """
  38. odps = ODPS(
  39. access_id='LTAI4FtW5ZzxMvdw35aNkmcp',
  40. secret_access_key='0VKnydcaHK3ITjylbgUsLubX6rwiwc',
  41. project=project,
  42. endpoint='http://service.cn.maxcompute.aliyun.com/api',
  43. connect_timeout=connect_timeout,
  44. read_timeout=read_timeout,
  45. pool_maxsize=pool_maxsize,
  46. pool_connections=pool_connections
  47. )
  48. records = odps.read_table(name=table, partition='dt=%s' % date)
  49. return records
  50. def write_to_pickle(data, filename, filepath=config_.DATA_DIR_PATH):
  51. """
  52. 将数据写入pickle文件中
  53. :param data: 数据
  54. :param filename: 写入的文件名
  55. :param filepath: 文件存放路径,默认为config_.DATA_DIR_PATH
  56. :return: None
  57. """
  58. if not os.path.exists(filepath):
  59. os.makedirs(filepath)
  60. file = os.path.join(filepath, filename)
  61. with open(file, 'wb') as wf:
  62. pickle.dump(data, wf)
  63. def read_from_pickle(filename, filepath=config_.DATA_DIR_PATH):
  64. """
  65. 从pickle文件读取数据
  66. :param filename: 文件名
  67. :param filepath: 文件存放路径,默认为config_.DATA_DIR_PATH
  68. :return: data
  69. """
  70. file = os.path.join(filepath, filename)
  71. if not os.path.exists(file):
  72. return None
  73. with open(file, 'rb') as rf:
  74. data = pickle.load(rf)
  75. return data
  76. def request_post(request_url, request_data):
  77. """
  78. post 请求 HTTP接口
  79. :param request_url: 接口URL
  80. :param request_data: 请求参数
  81. :return: res_data json格式
  82. """
  83. response = requests.post(url=request_url, json=request_data)
  84. if response.status_code == 200:
  85. res_data = json.loads(response.text)
  86. return res_data
  87. def data_normalization(data):
  88. """
  89. 对结果做归一化处理(Min-Max Normalization),将分数控制在[0, 100]
  90. :param data: type-list
  91. :return: normal_data, type-list 归一化后的数据
  92. """
  93. x_max = max(data)
  94. x_min = min(data)
  95. normal_data = [(x-x_min)/(x_max-x_min)*100 for x in data]
  96. return normal_data
  97. def filter_video_status(video_ids):
  98. """
  99. 对视频状态进行过滤
  100. :param video_ids: 视频id列表 type-list
  101. :return: filtered_videos
  102. """
  103. if len(video_ids) == 1:
  104. sql = "set hg_experimental_enable_shard_pruning=off; " \
  105. "SELECT video_id " \
  106. "FROM {} " \
  107. "WHERE audit_status = 5 " \
  108. "AND applet_rec_status IN (1, -6) " \
  109. "AND open_status = 1 " \
  110. "AND payment_status = 0 " \
  111. "AND encryption_status IS NULL " \
  112. "AND transcoding_status = 3 " \
  113. "AND video_id IN ({});".format(config_.VIDEO_STATUS, video_ids[0])
  114. else:
  115. sql = "set hg_experimental_enable_shard_pruning=off; " \
  116. "SELECT video_id " \
  117. "FROM {} " \
  118. "WHERE audit_status = 5 " \
  119. "AND applet_rec_status IN (1, -6) " \
  120. "AND open_status = 1 " \
  121. "AND payment_status = 0 " \
  122. "AND encryption_status IS NULL " \
  123. "AND transcoding_status = 3 " \
  124. "AND video_id IN {};".format(config_.VIDEO_STATUS, tuple(video_ids))
  125. hologres_helper = HologresHelper()
  126. data = hologres_helper.get_data(sql=sql)
  127. filtered_videos = [temp[0] for temp in data]
  128. return filtered_videos
  129. if __name__ == '__main__':
  130. # data_test = [9.20273281e+03, 7.00795065e+03, 5.54813112e+03, 9.97402494e-01, 9.96402495e-01, 9.96402494e-01]
  131. # data_normalization(data_test)
  132. request_post(request_url=config_.NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL, request_data={'videos': []})