utils.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. # coding:utf-8
  2. import pickle
  3. import os
  4. import requests
  5. import json
  6. import traceback
  7. from odps import ODPS
  8. from config import set_config
  9. from db_helper import HologresHelper, MysqlHelper, RedisHelper
  10. from log import Log
  11. config_, env = set_config()
  12. log_ = Log()
  13. def execute_sql_from_odps(project, sql, connect_timeout=3000, read_timeout=500000,
  14. pool_maxsize=1000, pool_connections=1000):
  15. odps = ODPS(
  16. access_id=config_.ODPS_CONFIG['ACCESSID'],
  17. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  18. project=project,
  19. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  20. connect_timeout=connect_timeout,
  21. read_timeout=read_timeout,
  22. pool_maxsize=pool_maxsize,
  23. pool_connections=pool_connections
  24. )
  25. records = odps.execute_sql(sql=sql)
  26. return records
  27. def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=500000,
  28. pool_maxsize=1000, pool_connections=1000):
  29. """
  30. 从odps获取数据
  31. :param date: 日期 type-string '%Y%m%d'
  32. :param project: type-string
  33. :param table: 表名 type-string
  34. :param connect_timeout: 连接超时设置
  35. :param read_timeout: 读取超时设置
  36. :param pool_maxsize:
  37. :param pool_connections:
  38. :return: records
  39. """
  40. odps = ODPS(
  41. access_id=config_.ODPS_CONFIG['ACCESSID'],
  42. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  43. project=project,
  44. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  45. connect_timeout=connect_timeout,
  46. read_timeout=read_timeout,
  47. pool_maxsize=pool_maxsize,
  48. pool_connections=pool_connections
  49. )
  50. records = odps.read_table(name=table, partition='dt=%s' % date)
  51. return records
  52. def write_to_pickle(data, filename, filepath=config_.DATA_DIR_PATH):
  53. """
  54. 将数据写入pickle文件中
  55. :param data: 数据
  56. :param filename: 写入的文件名
  57. :param filepath: 文件存放路径,默认为config_.DATA_DIR_PATH
  58. :return: None
  59. """
  60. if not os.path.exists(filepath):
  61. os.makedirs(filepath)
  62. file = os.path.join(filepath, filename)
  63. with open(file, 'wb') as wf:
  64. pickle.dump(data, wf)
  65. def read_from_pickle(filename, filepath=config_.DATA_DIR_PATH):
  66. """
  67. 从pickle文件读取数据
  68. :param filename: 文件名
  69. :param filepath: 文件存放路径,默认为config_.DATA_DIR_PATH
  70. :return: data
  71. """
  72. file = os.path.join(filepath, filename)
  73. if not os.path.exists(file):
  74. return None
  75. with open(file, 'rb') as rf:
  76. data = pickle.load(rf)
  77. return data
  78. def send_msg_to_feishu(webhook, key_word, msg_text):
  79. """发送消息到飞书"""
  80. headers = {'Content-Type': 'application/json'}
  81. payload_message = {
  82. "msg_type": "text",
  83. "content": {
  84. "text": '{}: {}'.format(key_word, msg_text)
  85. }
  86. }
  87. response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
  88. print(response.text)
  89. def request_post(request_url, request_data):
  90. """
  91. post 请求 HTTP接口
  92. :param request_url: 接口URL
  93. :param request_data: 请求参数
  94. :return: res_data json格式
  95. """
  96. try:
  97. response = requests.post(url=request_url, json=request_data)
  98. if response.status_code == 200:
  99. res_data = json.loads(response.text)
  100. return res_data
  101. else:
  102. return None
  103. except Exception as e:
  104. log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
  105. send_msg_to_feishu(
  106. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  107. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  108. msg_text='rov-offline{} - 接口请求失败:{}, exception: {}'.format(config_.ENV_TEXT, request_url, e)
  109. )
  110. return None
  111. def data_normalization(data):
  112. """
  113. 对结果做归一化处理(Min-Max Normalization),将分数控制在[0, 100]
  114. :param data: type-list
  115. :return: normal_data, type-list 归一化后的数据
  116. """
  117. x_max = max(data)
  118. x_min = min(data)
  119. normal_data = [(x-x_min)/(x_max-x_min)*100 for x in data]
  120. return normal_data
  121. def filter_video_status(video_ids):
  122. """
  123. 对视频状态进行过滤
  124. :param video_ids: 视频id列表 type-list
  125. :return: filtered_videos
  126. """
  127. if len(video_ids) == 1:
  128. sql = "set hg_experimental_enable_shard_pruning=off; " \
  129. "SELECT video_id " \
  130. "FROM {} " \
  131. "WHERE audit_status = 5 " \
  132. "AND applet_rec_status IN (1, -6) " \
  133. "AND open_status = 1 " \
  134. "AND payment_status = 0 " \
  135. "AND encryption_status != 5 " \
  136. "AND transcoding_status = 3 " \
  137. "AND video_id IN ({});".format(config_.VIDEO_STATUS, video_ids[0])
  138. else:
  139. sql = "set hg_experimental_enable_shard_pruning=off; " \
  140. "SELECT video_id " \
  141. "FROM {} " \
  142. "WHERE audit_status = 5 " \
  143. "AND applet_rec_status IN (1, -6) " \
  144. "AND open_status = 1 " \
  145. "AND payment_status = 0 " \
  146. "AND encryption_status != 5 " \
  147. "AND transcoding_status = 3 " \
  148. "AND video_id IN {};".format(config_.VIDEO_STATUS, tuple(video_ids))
  149. hologres_helper = HologresHelper()
  150. data = hologres_helper.get_data(sql=sql)
  151. filtered_videos = [int(temp[0]) for temp in data]
  152. return filtered_videos
  153. def filter_video_status_app(video_ids):
  154. """
  155. 对视频状态进行过滤 - app
  156. :param video_ids: 视频id列表 type-list
  157. :return: filtered_videos
  158. """
  159. if len(video_ids) == 1:
  160. sql = "set hg_experimental_enable_shard_pruning=off; " \
  161. "SELECT video_id " \
  162. "FROM {} " \
  163. "WHERE audit_status = 5 " \
  164. "AND app_rec_status IN (1, -6, 10) " \
  165. "AND open_status = 1 " \
  166. "AND payment_status = 0 " \
  167. "AND encryption_status != 5 " \
  168. "AND transcoding_status = 3 " \
  169. "AND video_id IN ({});".format(config_.VIDEO_STATUS, video_ids[0])
  170. else:
  171. sql = "set hg_experimental_enable_shard_pruning=off; " \
  172. "SELECT video_id " \
  173. "FROM {} " \
  174. "WHERE audit_status = 5 " \
  175. "AND app_rec_status IN (1, -6, 10) " \
  176. "AND open_status = 1 " \
  177. "AND payment_status = 0 " \
  178. "AND encryption_status != 5 " \
  179. "AND transcoding_status = 3 " \
  180. "AND video_id IN {};".format(config_.VIDEO_STATUS, tuple(video_ids))
  181. hologres_helper = HologresHelper()
  182. data = hologres_helper.get_data(sql=sql)
  183. filtered_videos = [int(temp[0]) for temp in data]
  184. return filtered_videos
  185. def update_video_w_h_rate(video_ids, key_name):
  186. """
  187. 获取横屏视频的宽高比,并存入redis中 (width/height>1)
  188. :param video_ids: videoId列表 type-list
  189. :param key_name: redis key
  190. :return: None
  191. """
  192. # 获取数据
  193. if len(video_ids) == 1:
  194. sql = "SELECT id, width, height, rotate FROM longvideo.wx_video WHERE id = {};".format(video_ids[0])
  195. else:
  196. sql = "SELECT id, width, height, rotate FROM longvideo.wx_video WHERE id IN {};".format(tuple(video_ids))
  197. mysql_helper = MysqlHelper()
  198. data = mysql_helper.get_data(sql=sql)
  199. # 更新到redis
  200. info_data = {}
  201. for video_id, width, height, rotate in data:
  202. if int(width) == 0 or int(height) == 0:
  203. continue
  204. # rotate 字段值为 90或270时,width和height的值相反
  205. if int(rotate) in (90, 270):
  206. w_h_rate = int(height) / int(width)
  207. else:
  208. w_h_rate = int(width) / int(height)
  209. if w_h_rate > 1:
  210. info_data[int(video_id)] = w_h_rate
  211. redis_helper = RedisHelper()
  212. # 删除旧数据
  213. redis_helper.del_keys(key_name=key_name)
  214. # 写入新数据
  215. if len(info_data) > 0:
  216. redis_helper.add_data_with_zset(key_name=key_name, data=info_data)
  217. if __name__ == '__main__':
  218. # data_test = [9.20273281e+03, 7.00795065e+03, 5.54813112e+03, 9.97402494e-01, 9.96402495e-01, 9.96402494e-01]
  219. # data_normalization(data_test)
  220. # request_post(request_url=config_.NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL, request_data={'videos': []})
  221. video_ids = [110, 112, 113, 115, 116, 117, 8289883]
  222. update_video_w_h_rate(video_ids=video_ids, key_name='')