utils.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. import pickle
  2. import os
  3. import requests
  4. import json
  5. from odps import ODPS
  6. from config import set_config
  7. from db_helper import HologresHelper, MysqlHelper, RedisHelper
  8. config_ = set_config()
  9. def execute_sql_from_odps(project, sql, connect_timeout=3000, read_timeout=500000,
  10. pool_maxsize=1000, pool_connections=1000):
  11. odps = ODPS(
  12. access_id='LTAI4FtW5ZzxMvdw35aNkmcp',
  13. secret_access_key='0VKnydcaHK3ITjylbgUsLubX6rwiwc',
  14. project=project,
  15. endpoint='http://service.cn.maxcompute.aliyun.com/api',
  16. connect_timeout=connect_timeout,
  17. read_timeout=read_timeout,
  18. pool_maxsize=pool_maxsize,
  19. pool_connections=pool_connections
  20. )
  21. records = odps.execute_sql(sql=sql)
  22. return records
  23. def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=500000,
  24. pool_maxsize=1000, pool_connections=1000):
  25. """
  26. 从odps获取数据
  27. :param date: 日期 type-string '%Y%m%d'
  28. :param project: type-string
  29. :param table: 表名 type-string
  30. :param connect_timeout: 连接超时设置
  31. :param read_timeout: 读取超时设置
  32. :param pool_maxsize:
  33. :param pool_connections:
  34. :return: records
  35. """
  36. odps = ODPS(
  37. access_id='LTAI4FtW5ZzxMvdw35aNkmcp',
  38. secret_access_key='0VKnydcaHK3ITjylbgUsLubX6rwiwc',
  39. project=project,
  40. endpoint='http://service.cn.maxcompute.aliyun.com/api',
  41. connect_timeout=connect_timeout,
  42. read_timeout=read_timeout,
  43. pool_maxsize=pool_maxsize,
  44. pool_connections=pool_connections
  45. )
  46. records = odps.read_table(name=table, partition='dt=%s' % date)
  47. return records
  48. def write_to_pickle(data, filename, filepath=config_.DATA_DIR_PATH):
  49. """
  50. 将数据写入pickle文件中
  51. :param data: 数据
  52. :param filename: 写入的文件名
  53. :param filepath: 文件存放路径,默认为config_.DATA_DIR_PATH
  54. :return: None
  55. """
  56. if not os.path.exists(filepath):
  57. os.makedirs(filepath)
  58. file = os.path.join(filepath, filename)
  59. with open(file, 'wb') as wf:
  60. pickle.dump(data, wf)
  61. def read_from_pickle(filename, filepath=config_.DATA_DIR_PATH):
  62. """
  63. 从pickle文件读取数据
  64. :param filename: 文件名
  65. :param filepath: 文件存放路径,默认为config_.DATA_DIR_PATH
  66. :return: data
  67. """
  68. file = os.path.join(filepath, filename)
  69. if not os.path.exists(file):
  70. return None
  71. with open(file, 'rb') as rf:
  72. data = pickle.load(rf)
  73. return data
  74. def request_post(request_url, request_data):
  75. """
  76. post 请求 HTTP接口
  77. :param request_url: 接口URL
  78. :param request_data: 请求参数
  79. :return: res_data json格式
  80. """
  81. response = requests.post(url=request_url, json=request_data)
  82. if response.status_code == 200:
  83. res_data = json.loads(response.text)
  84. return res_data
  85. def data_normalization(data):
  86. """
  87. 对结果做归一化处理(Min-Max Normalization),将分数控制在[0, 100]
  88. :param data: type-list
  89. :return: normal_data, type-list 归一化后的数据
  90. """
  91. x_max = max(data)
  92. x_min = min(data)
  93. normal_data = [(x-x_min)/(x_max-x_min)*100 for x in data]
  94. return normal_data
  95. def filter_video_status(video_ids):
  96. """
  97. 对视频状态进行过滤
  98. :param video_ids: 视频id列表 type-list
  99. :return: filtered_videos
  100. """
  101. if len(video_ids) == 1:
  102. sql = "set hg_experimental_enable_shard_pruning=off; " \
  103. "SELECT video_id " \
  104. "FROM {} " \
  105. "WHERE audit_status = 5 " \
  106. "AND applet_rec_status IN (1, -6) " \
  107. "AND open_status = 1 " \
  108. "AND payment_status = 0 " \
  109. "AND encryption_status != 5 " \
  110. "AND transcoding_status = 3 " \
  111. "AND video_id IN ({});".format(config_.VIDEO_STATUS, video_ids[0])
  112. else:
  113. sql = "set hg_experimental_enable_shard_pruning=off; " \
  114. "SELECT video_id " \
  115. "FROM {} " \
  116. "WHERE audit_status = 5 " \
  117. "AND applet_rec_status IN (1, -6) " \
  118. "AND open_status = 1 " \
  119. "AND payment_status = 0 " \
  120. "AND encryption_status != 5 " \
  121. "AND transcoding_status = 3 " \
  122. "AND video_id IN {};".format(config_.VIDEO_STATUS, tuple(video_ids))
  123. hologres_helper = HologresHelper()
  124. data = hologres_helper.get_data(sql=sql)
  125. filtered_videos = [int(temp[0]) for temp in data]
  126. return filtered_videos
  127. def send_msg_to_feishu(msg_text):
  128. """发送消息到飞书"""
  129. # webhook地址
  130. webhook = 'https://open.feishu.cn/open-apis/bot/v2/hook/8de4de35-30ed-4692-8854-7a154e89b2f2'
  131. # 自定义关键词key_word
  132. key_word = '服务报警'
  133. headers = {'Content-Type': 'application/json'}
  134. payload_message = {
  135. "msg_type": "text",
  136. "content": {
  137. "text": '{}: {}'.format(key_word, msg_text)
  138. }
  139. }
  140. response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
  141. print(response.text)
  142. def update_video_w_h_rate(video_ids, key_name):
  143. """
  144. 获取横屏视频的宽高比,并存入redis中 (width/height>1)
  145. :param video_ids: videoId列表 type-list
  146. :param key_name: redis key
  147. :return: None
  148. """
  149. # 获取数据
  150. if len(video_ids) == 1:
  151. sql = "SELECT id, width/height w_h_rate " \
  152. "FROM longvideo.wx_video " \
  153. "WHERE width/height > 1 " \
  154. "AND id IN ({});".format(video_ids[0])
  155. else:
  156. sql = "SELECT id, width/height w_h_rate " \
  157. "FROM longvideo.wx_video " \
  158. "WHERE width/height > 1 " \
  159. "AND id IN {};".format(tuple(video_ids))
  160. mysql_helper = MysqlHelper()
  161. data = mysql_helper.get_data(sql=sql)
  162. # 更新到redis
  163. info_data = {}
  164. for video_id, w_h_rate in data:
  165. info_data[int(video_id)] = float(w_h_rate)
  166. redis_helper = RedisHelper()
  167. # 删除旧数据
  168. redis_helper.del_keys(key_name=key_name)
  169. # 写入新数据
  170. if len(info_data) > 0:
  171. redis_helper.add_data_with_zset(key_name=key_name, data=info_data)
  172. if __name__ == '__main__':
  173. # data_test = [9.20273281e+03, 7.00795065e+03, 5.54813112e+03, 9.97402494e-01, 9.96402495e-01, 9.96402494e-01]
  174. # data_normalization(data_test)
  175. # request_post(request_url=config_.NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL, request_data={'videos': []})
  176. video_ids = [110, 112, 113, 115, 116, 117, 118]
  177. # update_video_w_h_rate(video_ids=video_ids)