# coding:utf-8 import pickle import os import requests import json import traceback import pandas as pd from odps import ODPS from odps.df import DataFrame from my_config import set_config from db_helper import HologresHelper, MysqlHelper, RedisHelper from log import Log from collections import defaultdict config_, env = set_config() log_ = Log() def get_odps_instance(project): odps = ODPS( access_id=config_.ODPS_CONFIG['ACCESSID'], secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'], project=project, endpoint=config_.ODPS_CONFIG['ENDPOINT'], ) return odps def execute_sql_from_odps(project, sql, connect_timeout=3000, read_timeout=500000, pool_maxsize=1000, pool_connections=1000): odps = ODPS( access_id=config_.ODPS_CONFIG['ACCESSID'], secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'], project=project, endpoint=config_.ODPS_CONFIG['ENDPOINT'], connect_timeout=connect_timeout, read_timeout=read_timeout, pool_maxsize=pool_maxsize, pool_connections=pool_connections ) records = odps.execute_sql(sql=sql) return records def exe_sql(project, sql, connect_timeout=3000, read_timeout=500000, pool_maxsize=1000, pool_connections=1000): odps = ODPS( access_id=config_.ODPS_CONFIG['ACCESSID'], secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'], project=project, endpoint=config_.ODPS_CONFIG['ENDPOINT'], connect_timeout=connect_timeout, read_timeout=read_timeout, pool_maxsize=pool_maxsize, pool_connections=pool_connections ) records = odps.execute_sql(sql) return records def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=500000, pool_maxsize=1000, pool_connections=1000): """ 从odps获取数据 :param date: 日期 type-string '%Y%m%d' :param project: type-string :param table: 表名 type-string :param connect_timeout: 连接超时设置 :param read_timeout: 读取超时设置 :param pool_maxsize: :param pool_connections: :return: records """ odps = ODPS( access_id=config_.ODPS_CONFIG['ACCESSID'], secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'], project=project, endpoint=config_.ODPS_CONFIG['ENDPOINT'], connect_timeout=connect_timeout, read_timeout=read_timeout, pool_maxsize=pool_maxsize, pool_connections=pool_connections ) records = odps.read_table(name=table, partition='dt=%s' % date) return records def get_dataframe_from_odps(project, table, partition_spec_dict=None): """ 从odps获取数据 :param partition_spec_dict: 分区spec type-dict :param project: type-string :param table: 表名 type-string :return: odps.DataFrame """ odps = get_odps_instance(project) if partition_spec_dict: spec = ','.join(['{}={}'.format(k, partition_spec_dict[k]) for k in partition_spec_dict.keys()]) return DataFrame(odps.get_table(name=table)).filter_parts(spec) else: return DataFrame(odps.get_table(name=table)) def get_odps_df_of_max_partition(project, table, rb_spec=None): """ rb_spec: spec for right bound of partition names. type-dict return odps.DataFrame """ odps = get_odps_instance(project) t = odps.get_table(table) df = DataFrame(odps.get_table(table)) if rb_spec is None: return df.filter_parts(t.get_max_partition().partition_spec) else: spec = ','.join(['{}<{}'.format(k, rb_spec[k]) for k in rb_spec.keys()]) part_iter = t.iterate_partitions(spec=spec, reverse=True) try: partition = next(part_iter) return df.filter_parts(partition) except StopIteration: return None def get_odps_df_of_recent_partitions(project, table, n=1, rb_spec=None): """ rb_spec: spec for right bound of partition names. type-dict return odps.DataFrame """ odps = get_odps_instance(project) t = odps.get_table(table) df = DataFrame(odps.get_table(table)) spec = None if rb_spec: spec = ','.join(['{}<{}'.format(k, rb_spec[k]) for k in rb_spec.keys()]) part_iter = t.iterate_partitions(spec=spec, reverse=True) selected_parts = [] try: for i in range(0, n): partition = next(part_iter) selected_parts.append(partition) log_.info(f"table: {table}, selected part: {partition.name}") except StopIteration: log_.info(f"table: {table}, no more parts to iterate") return df.filter_parts(selected_parts) def check_table_partition_exits(date, project, table, connect_timeout=3000, read_timeout=500000, pool_maxsize=1000, pool_connections=1000): """ 判断表中是否存在这个分区 :param date: 日期 type-string '%Y%m%d' :param project: type-string :param table: 表名 type-string :param connect_timeout: 连接超时设置 :param read_timeout: 读取超时设置 :param pool_maxsize: :param pool_connections: :return: records """ odps = ODPS( access_id=config_.ODPS_CONFIG['ACCESSID'], secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'], project=project, endpoint=config_.ODPS_CONFIG['ENDPOINT'], connect_timeout=connect_timeout, read_timeout=read_timeout, pool_maxsize=pool_maxsize, pool_connections=pool_connections ) t = odps.get_table(name=table) return t.exist_partition(partition_spec=f'dt={date}') def check_table_partition_exits_v2(project, table, partition_spec_dict): """ 判断表中是否存在指定分区,并返回分区纪录数量 注:ODPS新版本移除了timeout等参数 :param project: 库名 type-string :param table: 表名 type-string :param partition_spec_dict: 分区spec type-dict :return: if_exist, num_records """ odps = get_odps_instance(project) t = odps.get_table(name=table) spec = ','.join(['{}={}'.format(k, partition_spec_dict[k]) for k in partition_spec_dict.keys()]) if t.exist_partition(partition_spec=spec): with t.open_reader(partition=spec) as reader: count = reader.count return True, count else: return False, 0 def write_to_pickle(data, filename, filepath=config_.DATA_DIR_PATH): """ 将数据写入pickle文件中 :param data: 数据 :param filename: 写入的文件名 :param filepath: 文件存放路径,默认为config_.DATA_DIR_PATH :return: None """ if not os.path.exists(filepath): os.makedirs(filepath) file = os.path.join(filepath, filename) with open(file, 'wb') as wf: pickle.dump(data, wf) def read_from_pickle(filename, filepath=config_.DATA_DIR_PATH): """ 从pickle文件读取数据 :param filename: 文件名 :param filepath: 文件存放路径,默认为config_.DATA_DIR_PATH :return: data """ file = os.path.join(filepath, filename) if not os.path.exists(file): return None with open(file, 'rb') as rf: data = pickle.load(rf) return data def send_msg_to_feishu(webhook, key_word, msg_text): """发送消息到飞书""" headers = {'Content-Type': 'application/json'} payload_message = { "msg_type": "text", "content": { "text": '{}: {}'.format(key_word, msg_text) } } response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message)) print(response.text) def send_msg_to_feishu_new(webhook, key_word, title, msg_list): """发送消息到飞书""" headers = {'Content-Type': 'application/json'} content_list = [ [ { "tag": "text", "text": msg } ] for msg in msg_list ] payload_message = { "msg_type": "post", "content": { "post": { "zh_cn": { "title": f"{key_word}: {title}", "content": content_list, } } } } response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message)) print(response.text) def request_post(request_url, request_data=None, **kwargs): """ post 请求 HTTP接口 :param request_url: 接口URL :param request_data: 请求参数 :return: res_data json格式 """ try: response = requests.post(url=request_url, json=request_data, **kwargs) if response.status_code == 200: res_data = json.loads(response.text) return res_data else: log_.info(f"response.status_code: {response.status_code}") return None except Exception as e: log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc())) send_msg_to_feishu( webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'), key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'), msg_text='rov-offline{} - 接口请求失败:{}, exception: {}'.format(config_.ENV_TEXT, request_url, e) ) return None def request_get(request_url): """ get 请求 HTTP接口 :param request_url: 接口URL :return: res_data json格式 """ try: response = requests.get(url=request_url) if response.status_code == 200: res_data = json.loads(response.text) return res_data else: log_.info(f"response.status_code: {response.status_code}") return None except Exception as e: log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc())) send_msg_to_feishu( webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'), key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'), msg_text='rov-offline{} - 接口请求失败:{}, exception: {}'.format(config_.ENV_TEXT, request_url, e) ) return None def data_normalization(data): """ 对结果做归一化处理(Min-Max Normalization),将分数控制在[0, 100] :param data: type-list :return: normal_data, type-list 归一化后的数据 """ x_max = max(data) x_min = min(data) normal_data = [(x-x_min)/(x_max-x_min)*100 for x in data] return normal_data def filter_video_status(video_ids): """ 对视频状态进行过滤 :param video_ids: 视频id列表 type-list :return: filtered_videos """ i = 0 while i < 3: try: mysql_helper = MysqlHelper(mysql_info=config_.FILTER_MYSQL_INFO) video_status_sql = "SELECT t1.id AS 'video_id', " \ "t1.transcode_status AS 'transcoding_status', " \ "t2.audit_status AS 'audit_status', " \ "t2.video_status AS 'open_status', " \ "t2.recommend_status AS 'applet_rec_status', " \ "t2.app_recommend_status AS 'app_rec_status', " \ "t3.charge AS 'payment_status', " \ "case when t4.max_validate_count is null then 0 else t4.max_validate_count end AS 'encryption_status' " \ "FROM longvideo.wx_video t1 " \ "LEFT JOIN longvideo.wx_video_status t2 ON t1.id= t2.video_id " \ "LEFT JOIN longvideo.wx_video_detail t3 ON t1.id= t3.video_id " \ "LEFT JOIN longvideo.wx_video_pwd t4 ON t1.id= t4.video_id" if len(video_ids) == 1: sql = "SELECT video_id " \ "FROM ({}) " \ "WHERE audit_status = 5 " \ "AND applet_rec_status IN (1, -6) " \ "AND open_status = 1 " \ "AND payment_status = 0 " \ "AND encryption_status != 5 " \ "AND transcoding_status = 3 " \ "AND video_id IN ({});".format(video_status_sql, video_ids[0]) data = mysql_helper.get_data(sql=sql) else: data = [] for i in range(len(video_ids) // 200 + 1): sql = "SELECT video_id " \ "FROM ({}) " \ "WHERE audit_status = 5 " \ "AND applet_rec_status IN (1, -6) " \ "AND open_status = 1 " \ "AND payment_status = 0 " \ "AND encryption_status != 5 " \ "AND transcoding_status = 3 " \ "AND video_id IN {};".format(video_status_sql, tuple(video_ids[i*200:(i+1)*200])) select_res = mysql_helper.get_data(sql=sql) if select_res is not None: data += select_res filtered_videos = [int(temp[0]) for temp in data] return filtered_videos except Exception as e: log_.error(f"过滤失败, exception: {e}, traceback: {traceback.format_exc()}") send_msg_to_feishu( webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'), key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'), msg_text=f"rov-offline{config_.ENV_TEXT} - 过滤失败\n" f"retry count: {i}\n" f"exception: {e}\n" f"traceback: {traceback.format_exc()}" ) i += 1 if i == 1: return video_ids def filter_video_status_with_applet_rec(video_ids, applet_rec_status): """ 对视频状态进行过滤 :param video_ids: 视频id列表 type-list :param applet_rec_status: 小程序推荐状态 -6:待推荐 1:普通推荐 :return: filtered_videos """ i = 0 while i < 3: try: mysql_helper = MysqlHelper(mysql_info=config_.FILTER_MYSQL_INFO) video_status_sql = "SELECT t1.id AS 'video_id', " \ "t1.transcode_status AS 'transcoding_status', " \ "t2.audit_status AS 'audit_status', " \ "t2.video_status AS 'open_status', " \ "t2.recommend_status AS 'applet_rec_status', " \ "t2.app_recommend_status AS 'app_rec_status', " \ "t3.charge AS 'payment_status', " \ "case when t4.max_validate_count is null then 0 else t4.max_validate_count end AS 'encryption_status' " \ "FROM longvideo.wx_video t1 " \ "LEFT JOIN longvideo.wx_video_status t2 ON t1.id= t2.video_id " \ "LEFT JOIN longvideo.wx_video_detail t3 ON t1.id= t3.video_id " \ "LEFT JOIN longvideo.wx_video_pwd t4 ON t1.id= t4.video_id" if len(video_ids) == 1: sql = "SELECT video_id " \ "FROM ({}) " \ "WHERE audit_status = 5 " \ "AND applet_rec_status = {} " \ "AND open_status = 1 " \ "AND payment_status = 0 " \ "AND encryption_status != 5 " \ "AND transcoding_status = 3 " \ "AND video_id IN ({});".format(video_status_sql, applet_rec_status, video_ids[0]) data = mysql_helper.get_data(sql=sql) else: data = [] for i in range(len(video_ids) // 200 + 1): sql = "SELECT video_id " \ "FROM ({}) " \ "WHERE audit_status = 5 " \ "AND applet_rec_status = {} " \ "AND open_status = 1 " \ "AND payment_status = 0 " \ "AND encryption_status != 5 " \ "AND transcoding_status = 3 " \ "AND video_id IN {};".format(video_status_sql, applet_rec_status, tuple(video_ids[i*200:(i+1)*200])) select_res = mysql_helper.get_data(sql=sql) if select_res is not None: data += select_res filtered_videos = [int(temp[0]) for temp in data] return filtered_videos except Exception as e: log_.error(f"过滤失败, exception: {e}, traceback: {traceback.format_exc()}") send_msg_to_feishu( webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'), key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'), msg_text=f"rov-offline{config_.ENV_TEXT} - 过滤失败\n" f"retry count: {i}\n" f"exception: {e}\n" f"traceback: {traceback.format_exc()}" ) i += 1 if i == 1: return video_ids def filter_video_status_app(video_ids): """ 对视频状态进行过滤 - app :param video_ids: 视频id列表 type-list :return: filtered_videos """ i = 0 while i < 3: try: mysql_helper = MysqlHelper(mysql_info=config_.FILTER_MYSQL_INFO) video_status_sql = "SELECT t1.id AS 'video_id', " \ "t1.transcode_status AS 'transcoding_status', " \ "t2.app_audit_status AS 'app_audit_status', " \ "t2.original_status AS 'open_status', " \ "t2.recommend_status AS 'applet_rec_status', " \ "t2.app_recommend_status AS 'app_rec_status', " \ "t3.charge AS 'payment_status', " \ "case when t4.max_validate_count is null then 0 else t4.max_validate_count end AS 'encryption_status' " \ "FROM longvideo.wx_video t1 " \ "LEFT JOIN longvideo.wx_video_status t2 ON t1.id= t2.video_id " \ "LEFT JOIN longvideo.wx_video_detail t3 ON t1.id= t3.video_id " \ "LEFT JOIN longvideo.wx_video_pwd t4 ON t1.id= t4.video_id" if len(video_ids) == 1: sql = "SELECT video_id " \ "FROM ({}) " \ "WHERE app_audit_status = 5 " \ "AND app_rec_status IN (1, -6, 10) " \ "AND open_status = 1 " \ "AND payment_status = 0 " \ "AND encryption_status != 5 " \ "AND transcoding_status = 3 " \ "AND video_id IN ({});".format(video_status_sql, video_ids[0]) data = mysql_helper.get_data(sql=sql) else: data = [] for i in range(len(video_ids) // 200 + 1): sql = "SELECT video_id " \ "FROM ({}) " \ "WHERE app_audit_status = 5 " \ "AND app_rec_status IN (1, -6, 10) " \ "AND open_status = 1 " \ "AND payment_status = 0 " \ "AND encryption_status != 5 " \ "AND transcoding_status = 3 " \ "AND video_id IN {};".format(video_status_sql, tuple(video_ids[i*200:(i+1)*200])) select_res = mysql_helper.get_data(sql=sql) if select_res is not None: data += select_res filtered_videos = [int(temp[0]) for temp in data] return filtered_videos except Exception as e: log_.error(f"过滤失败, exception: {e}, traceback: {traceback.format_exc()}") send_msg_to_feishu( webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'), key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'), msg_text=f"rov-offline{config_.ENV_TEXT} - 过滤失败\n" f"retry count: {i}\n" f"exception: {e}\n" f"traceback: {traceback.format_exc()}" ) i += 1 if i == 1: return video_ids def filter_shield_video(video_ids, shield_key_name_list): """ 过滤屏蔽视频视频 :param video_ids: 需过滤的视频列表 type-list :param shield_key_name_list: 过滤视频 redis-key :return: filtered_videos 过滤后的列表 type-list """ if len(video_ids) == 0: return video_ids # 根据Redis缓存中的数据过滤 redis_helper = RedisHelper() for shield_key_name in shield_key_name_list: shield_videos_list = redis_helper.get_data_from_set(key_name=shield_key_name) if not shield_videos_list: continue shield_videos = [int(video) for video in shield_videos_list] video_ids = [int(video_id) for video_id in video_ids if int(video_id) not in shield_videos] return video_ids def filter_political_videos(video_ids): """ 过滤涉政视频 :param video_ids: 需过滤的视频列表 type-list :return: filtered_video_ids 过滤后的列表 type-list """ if len(video_ids) == 0: return video_ids # 根据Redis缓存中的数据过滤 redis_helper = RedisHelper() political_key_name = config_.POLITICAL_VIDEOS_KEY_NAME political_videos_list = redis_helper.get_data_from_set(key_name=political_key_name) if not political_videos_list: return video_ids political_videos = [int(video) for video in political_videos_list] filtered_video_ids = [int(video_id) for video_id in video_ids if int(video_id) not in political_videos] return filtered_video_ids def update_video_w_h_rate(video_ids, key_name): """ 获取横屏视频的宽高比,并存入redis中 (width/height>1) :param video_ids: videoId列表 type-list :param key_name: redis key :return: None """ # 获取数据 if len(video_ids) == 1: sql = "SELECT id, width, height, rotate FROM longvideo.wx_video WHERE id = {};".format(video_ids[0]) else: sql = "SELECT id, width, height, rotate FROM longvideo.wx_video WHERE id IN {};".format(tuple(video_ids)) mysql_helper = MysqlHelper(mysql_info=config_.MYSQL_INFO) data = mysql_helper.get_data(sql=sql) # 更新到redis info_data = {} for video_id, width, height, rotate in data: if int(width) == 0 or int(height) == 0: continue # rotate 字段值为 90或270时,width和height的值相反 if int(rotate) in (90, 270): w_h_rate = int(height) / int(width) else: w_h_rate = int(width) / int(height) if w_h_rate > 1: info_data[int(video_id)] = w_h_rate redis_helper = RedisHelper() # 删除旧数据 redis_helper.del_keys(key_name=key_name) # 写入新数据 if len(info_data) > 0: redis_helper.add_data_with_zset(key_name=key_name, data=info_data) def data_check(project, table, dt): """检查数据是否准备好""" odps = ODPS( access_id=config_.ODPS_CONFIG['ACCESSID'], secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'], project=project, endpoint=config_.ODPS_CONFIG['ENDPOINT'], connect_timeout=3000, read_timeout=500000, pool_maxsize=1000, pool_connections=1000 ) try: check_res = check_table_partition_exits(date=dt, project=project, table=table) if check_res: sql = f'select * from {project}.{table} where dt = {dt}' with odps.execute_sql(sql=sql).open_reader() as reader: data_count = reader.count else: data_count = 0 except Exception as e: data_count = 0 return data_count def get_feature_data(project, table, features, dt): """获取特征数据""" records = get_data_from_odps(date=dt, project=project, table=table) feature_data = [] for record in records: item = {} for feature_name in features: item[feature_name] = record[feature_name] feature_data.append(item) feature_df = pd.DataFrame(feature_data) return feature_df if __name__ == '__main__': # data_test = [9.20273281e+03, 7.00795065e+03, 5.54813112e+03, 9.97402494e-01, 9.96402495e-01, 9.96402494e-01] # data_normalization(data_test) # request_post(request_url=config_.NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL, request_data={'videos': []}) # video_ids = [110, 112, 113, 115, 116, 117, 8289883] # update_video_w_h_rate(video_ids=video_ids, key_name='') project = config_.PROJECT_24H_APP_TYPE table = config_.TABLE_24H_APP_TYPE dt = '2022080115' check_res = check_table_partition_exits(date=dt, project=project, table=table) print(check_res)