# coding:utf-8 import pickle import os import requests import json import traceback from odps import ODPS from config import set_config from db_helper import HologresHelper, MysqlHelper, RedisHelper from log import Log config_, env = set_config() log_ = Log() def execute_sql_from_odps(project, sql, connect_timeout=3000, read_timeout=500000, pool_maxsize=1000, pool_connections=1000): odps = ODPS( access_id='LTAI4FtW5ZzxMvdw35aNkmcp', secret_access_key='0VKnydcaHK3ITjylbgUsLubX6rwiwc', project=project, endpoint='http://service.cn.maxcompute.aliyun.com/api', connect_timeout=connect_timeout, read_timeout=read_timeout, pool_maxsize=pool_maxsize, pool_connections=pool_connections ) records = odps.execute_sql(sql=sql) return records def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=500000, pool_maxsize=1000, pool_connections=1000): """ 从odps获取数据 :param date: 日期 type-string '%Y%m%d' :param project: type-string :param table: 表名 type-string :param connect_timeout: 连接超时设置 :param read_timeout: 读取超时设置 :param pool_maxsize: :param pool_connections: :return: records """ odps = ODPS( access_id='LTAI4FtW5ZzxMvdw35aNkmcp', secret_access_key='0VKnydcaHK3ITjylbgUsLubX6rwiwc', project=project, endpoint='http://service.cn.maxcompute.aliyun.com/api', connect_timeout=connect_timeout, read_timeout=read_timeout, pool_maxsize=pool_maxsize, pool_connections=pool_connections ) records = odps.read_table(name=table, partition='dt=%s' % date) return records def write_to_pickle(data, filename, filepath=config_.DATA_DIR_PATH): """ 将数据写入pickle文件中 :param data: 数据 :param filename: 写入的文件名 :param filepath: 文件存放路径,默认为config_.DATA_DIR_PATH :return: None """ if not os.path.exists(filepath): os.makedirs(filepath) file = os.path.join(filepath, filename) with open(file, 'wb') as wf: pickle.dump(data, wf) def read_from_pickle(filename, filepath=config_.DATA_DIR_PATH): """ 从pickle文件读取数据 :param filename: 文件名 :param filepath: 文件存放路径,默认为config_.DATA_DIR_PATH :return: data """ file = os.path.join(filepath, filename) if not os.path.exists(file): return None with open(file, 'rb') as rf: data = pickle.load(rf) return data def send_msg_to_feishu(msg_text): """发送消息到飞书""" # webhook地址 webhook = 'https://open.feishu.cn/open-apis/bot/v2/hook/8de4de35-30ed-4692-8854-7a154e89b2f2' # 自定义关键词key_word key_word = '服务报警' headers = {'Content-Type': 'application/json'} payload_message = { "msg_type": "text", "content": { "text": '{}: {}'.format(key_word, msg_text) } } response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message)) print(response.text) def request_post(request_url, request_data): """ post 请求 HTTP接口 :param request_url: 接口URL :param request_data: 请求参数 :return: res_data json格式 """ try: response = requests.post(url=request_url, json=request_data) if response.status_code == 200: res_data = json.loads(response.text) return res_data else: return None except Exception as e: log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc())) send_msg_to_feishu('rov-offline{} - 接口请求失败:{}, exception: {}'.format(config_.ENV_TEXT, request_url, e)) return None def data_normalization(data): """ 对结果做归一化处理(Min-Max Normalization),将分数控制在[0, 100] :param data: type-list :return: normal_data, type-list 归一化后的数据 """ x_max = max(data) x_min = min(data) normal_data = [(x-x_min)/(x_max-x_min)*100 for x in data] return normal_data def filter_video_status(video_ids): """ 对视频状态进行过滤 :param video_ids: 视频id列表 type-list :return: filtered_videos """ if len(video_ids) == 1: sql = "set hg_experimental_enable_shard_pruning=off; " \ "SELECT video_id " \ "FROM {} " \ "WHERE audit_status = 5 " \ "AND applet_rec_status IN (1, -6) " \ "AND open_status = 1 " \ "AND payment_status = 0 " \ "AND encryption_status != 5 " \ "AND transcoding_status = 3 " \ "AND video_id IN ({});".format(config_.VIDEO_STATUS, video_ids[0]) else: sql = "set hg_experimental_enable_shard_pruning=off; " \ "SELECT video_id " \ "FROM {} " \ "WHERE audit_status = 5 " \ "AND applet_rec_status IN (1, -6) " \ "AND open_status = 1 " \ "AND payment_status = 0 " \ "AND encryption_status != 5 " \ "AND transcoding_status = 3 " \ "AND video_id IN {};".format(config_.VIDEO_STATUS, tuple(video_ids)) hologres_helper = HologresHelper() data = hologres_helper.get_data(sql=sql) filtered_videos = [int(temp[0]) for temp in data] return filtered_videos def update_video_w_h_rate(video_ids, key_name): """ 获取横屏视频的宽高比,并存入redis中 (width/height>1) :param video_ids: videoId列表 type-list :param key_name: redis key :return: None """ # 获取数据 if len(video_ids) == 1: sql = "SELECT id, width, height, rotate FROM longvideo.wx_video WHERE id = {};".format(video_ids[0]) else: sql = "SELECT id, width, height, rotate FROM longvideo.wx_video WHERE id IN {};".format(tuple(video_ids)) mysql_helper = MysqlHelper() data = mysql_helper.get_data(sql=sql) # 更新到redis info_data = {} for video_id, width, height, rotate in data: if int(width) == 0 or int(height) == 0: continue # rotate 字段值为 90或270时,width和height的值相反 if int(rotate) in (90, 270): w_h_rate = int(height) / int(width) else: w_h_rate = int(width) / int(height) if w_h_rate > 1: info_data[int(video_id)] = w_h_rate redis_helper = RedisHelper() # 删除旧数据 redis_helper.del_keys(key_name=key_name) # 写入新数据 if len(info_data) > 0: redis_helper.add_data_with_zset(key_name=key_name, data=info_data) if __name__ == '__main__': # data_test = [9.20273281e+03, 7.00795065e+03, 5.54813112e+03, 9.97402494e-01, 9.96402495e-01, 9.96402494e-01] # data_normalization(data_test) request_post(request_url=config_.NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL, request_data={'videos': []})