# coding:utf-8
import pickle
import os
import requests
import json
import traceback
from odps import ODPS
from config import set_config
from db_helper import HologresHelper, MysqlHelper, RedisHelper
from log import Log
config_ = set_config()
log_ = Log()
def execute_sql_from_odps(project, sql, connect_timeout=3000, read_timeout=500000,
pool_maxsize=1000, pool_connections=1000):
odps = ODPS(
access_id='LTAI4FtW5ZzxMvdw35aNkmcp',
secret_access_key='0VKnydcaHK3ITjylbgUsLubX6rwiwc',
project=project,
endpoint='http://service.cn.maxcompute.aliyun.com/api',
connect_timeout=connect_timeout,
read_timeout=read_timeout,
pool_maxsize=pool_maxsize,
pool_connections=pool_connections
)
records = odps.execute_sql(sql=sql)
return records
def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=500000,
pool_maxsize=1000, pool_connections=1000):
"""
从odps获取数据
:param date: 日期 type-string '%Y%m%d'
:param project: type-string
:param table: 表名 type-string
:param connect_timeout: 连接超时设置
:param read_timeout: 读取超时设置
:param pool_maxsize:
:param pool_connections:
:return: records
"""
odps = ODPS(
access_id='LTAI4FtW5ZzxMvdw35aNkmcp',
secret_access_key='0VKnydcaHK3ITjylbgUsLubX6rwiwc',
project=project,
endpoint='http://service.cn.maxcompute.aliyun.com/api',
connect_timeout=connect_timeout,
read_timeout=read_timeout,
pool_maxsize=pool_maxsize,
pool_connections=pool_connections
)
records = odps.read_table(name=table, partition='dt=%s' % date)
return records
def write_to_pickle(data, filename, filepath=config_.DATA_DIR_PATH):
"""
将数据写入pickle文件中
:param data: 数据
:param filename: 写入的文件名
:param filepath: 文件存放路径,默认为config_.DATA_DIR_PATH
:return: None
"""
if not os.path.exists(filepath):
os.makedirs(filepath)
file = os.path.join(filepath, filename)
with open(file, 'wb') as wf:
pickle.dump(data, wf)
def read_from_pickle(filename, filepath=config_.DATA_DIR_PATH):
"""
从pickle文件读取数据
:param filename: 文件名
:param filepath: 文件存放路径,默认为config_.DATA_DIR_PATH
:return: data
"""
file = os.path.join(filepath, filename)
if not os.path.exists(file):
return None
with open(file, 'rb') as rf:
data = pickle.load(rf)
return data
def send_msg_to_feishu(msg_text):
"""发送消息到飞书"""
# webhook地址
webhook = 'https://open.feishu.cn/open-apis/bot/v2/hook/8de4de35-30ed-4692-8854-7a154e89b2f2'
# 自定义关键词key_word
key_word = '服务报警'
headers = {'Content-Type': 'application/json'}
payload_message = {
"msg_type": "text",
"content": {
"text": '{}: {}'.format(key_word, msg_text)
}
}
response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
print(response.text)
def request_post(request_url, request_data):
"""
post 请求 HTTP接口
:param request_url: 接口URL
:param request_data: 请求参数
:return: res_data json格式
"""
try:
response = requests.post(url=request_url, json=request_data)
if response.status_code == 200:
res_data = json.loads(response.text)
return res_data
else:
return None
except Exception as e:
log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
send_msg_to_feishu('rov-offline生产环境 - 接口请求失败:{}, exception: {}'.format(request_url, e))
return None
def data_normalization(data):
"""
对结果做归一化处理(Min-Max Normalization),将分数控制在[0, 100]
:param data: type-list
:return: normal_data, type-list 归一化后的数据
"""
x_max = max(data)
x_min = min(data)
normal_data = [(x-x_min)/(x_max-x_min)*100 for x in data]
return normal_data
def filter_video_status(video_ids):
"""
对视频状态进行过滤
:param video_ids: 视频id列表 type-list
:return: filtered_videos
"""
if len(video_ids) == 1:
sql = "set hg_experimental_enable_shard_pruning=off; " \
"SELECT video_id " \
"FROM {} " \
"WHERE audit_status = 5 " \
"AND applet_rec_status IN (1, -6) " \
"AND open_status = 1 " \
"AND payment_status = 0 " \
"AND encryption_status != 5 " \
"AND transcoding_status = 3 " \
"AND video_id IN ({});".format(config_.VIDEO_STATUS, video_ids[0])
else:
sql = "set hg_experimental_enable_shard_pruning=off; " \
"SELECT video_id " \
"FROM {} " \
"WHERE audit_status = 5 " \
"AND applet_rec_status IN (1, -6) " \
"AND open_status = 1 " \
"AND payment_status = 0 " \
"AND encryption_status != 5 " \
"AND transcoding_status = 3 " \
"AND video_id IN {};".format(config_.VIDEO_STATUS, tuple(video_ids))
hologres_helper = HologresHelper()
data = hologres_helper.get_data(sql=sql)
filtered_videos = [int(temp[0]) for temp in data]
return filtered_videos
def update_video_w_h_rate(video_ids, key_name):
"""
获取横屏视频的宽高比,并存入redis中 (width/height>1)
:param video_ids: videoId列表 type-list
:param key_name: redis key
:return: None
"""
# 获取数据
if len(video_ids) == 1:
sql = "SELECT id, width, height, rotate FROM longvideo.wx_video WHERE id = {};".format(video_ids[0])
else:
sql = "SELECT id, width, height, rotate FROM longvideo.wx_video WHERE id IN {};".format(tuple(video_ids))
mysql_helper = MysqlHelper()
data = mysql_helper.get_data(sql=sql)
# 更新到redis
info_data = {}
for video_id, width, height, rotate in data:
# rotate 字段值为 90或270时,width和height的值相反
if int(rotate) in (90, 270):
w_h_rate = int(height) / int(width)
else:
w_h_rate = int(width) / int(height)
if w_h_rate > 1:
info_data[int(video_id)] = w_h_rate
redis_helper = RedisHelper()
# 删除旧数据
redis_helper.del_keys(key_name=key_name)
# 写入新数据
if len(info_data) > 0:
redis_helper.add_data_with_zset(key_name=key_name, data=info_data)
if __name__ == '__main__':
# data_test = [9.20273281e+03, 7.00795065e+03, 5.54813112e+03, 9.97402494e-01, 9.96402495e-01, 9.96402494e-01]
# data_normalization(data_test)
request_post(request_url=config_.NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL, request_data={'videos': []})