| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655 | 
							
- import pickle
 
- import os
 
- import requests
 
- import json
 
- import traceback
 
- import pandas as pd
 
- from odps import ODPS
 
- from odps.df import DataFrame
 
- from my_config import set_config
 
- from db_helper import HologresHelper, MysqlHelper, RedisHelper
 
- from log import Log
 
- from collections import defaultdict
 
- config_, env = set_config()
 
- log_ = Log()
 
- def get_odps_instance(project):
 
-     odps = ODPS(
 
-         access_id=config_.ODPS_CONFIG['ACCESSID'],
 
-         secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
 
-         project=project,
 
-         endpoint=config_.ODPS_CONFIG['ENDPOINT'],
 
-     )
 
-     return odps
 
- def execute_sql_from_odps(project, sql, connect_timeout=3000, read_timeout=500000,
 
-                        pool_maxsize=1000, pool_connections=1000):
 
-     odps = ODPS(
 
-         access_id=config_.ODPS_CONFIG['ACCESSID'],
 
-         secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
 
-         project=project,
 
-         endpoint=config_.ODPS_CONFIG['ENDPOINT'],
 
-         connect_timeout=connect_timeout,
 
-         read_timeout=read_timeout,
 
-         pool_maxsize=pool_maxsize,
 
-         pool_connections=pool_connections
 
-     )
 
-     records = odps.execute_sql(sql=sql)
 
-     return records
 
- def exe_sql(project, sql, connect_timeout=3000, read_timeout=500000,
 
-                           pool_maxsize=1000, pool_connections=1000):
 
-     odps = ODPS(
 
-         access_id=config_.ODPS_CONFIG['ACCESSID'],
 
-         secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
 
-         project=project,
 
-         endpoint=config_.ODPS_CONFIG['ENDPOINT'],
 
-         connect_timeout=connect_timeout,
 
-         read_timeout=read_timeout,
 
-         pool_maxsize=pool_maxsize,
 
-         pool_connections=pool_connections
 
-     )
 
-     records = odps.execute_sql(sql)
 
-     return records
 
- def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=500000,
 
-                        pool_maxsize=1000, pool_connections=1000):
 
-     """
 
-     从odps获取数据
 
-     :param date: 日期 type-string '%Y%m%d'
 
-     :param project: type-string
 
-     :param table: 表名 type-string
 
-     :param connect_timeout: 连接超时设置
 
-     :param read_timeout: 读取超时设置
 
-     :param pool_maxsize:
 
-     :param pool_connections:
 
-     :return: records
 
-     """
 
-     odps = ODPS(
 
-         access_id=config_.ODPS_CONFIG['ACCESSID'],
 
-         secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
 
-         project=project,
 
-         endpoint=config_.ODPS_CONFIG['ENDPOINT'],
 
-         connect_timeout=connect_timeout,
 
-         read_timeout=read_timeout,
 
-         pool_maxsize=pool_maxsize,
 
-         pool_connections=pool_connections
 
-     )
 
-     records = odps.read_table(name=table, partition='dt=%s' % date)
 
-     return records
 
- def get_dataframe_from_odps(project, table, partition_spec_dict=None):
 
-     """
 
-     从odps获取数据
 
-     :param partition_spec_dict: 分区spec type-dict
 
-     :param project: type-string
 
-     :param table: 表名 type-string
 
-     :return: odps.DataFrame
 
-     """
 
-     odps = get_odps_instance(project)
 
-     if partition_spec_dict:
 
-         spec = ','.join(['{}={}'.format(k, partition_spec_dict[k]) for k in
 
-                          partition_spec_dict.keys()])
 
-         return DataFrame(odps.get_table(name=table)).filter_parts(spec)
 
-     else:
 
-         return DataFrame(odps.get_table(name=table))
 
- def get_odps_df_of_max_partition(project, table, rb_spec=None):
 
-     """
 
-     rb_spec: spec for right bound of partition names. type-dict
 
-     return odps.DataFrame
 
-     """
 
-     odps = get_odps_instance(project)
 
-     t = odps.get_table(table)
 
-     df = DataFrame(odps.get_table(table))
 
-     if rb_spec is None:
 
-         return df.filter_parts(t.get_max_partition().partition_spec)
 
-     else:
 
-         spec = ','.join(['{}<{}'.format(k, rb_spec[k]) for k in rb_spec.keys()])
 
-         part_iter = t.iterate_partitions(spec=spec, reverse=True)
 
-         try:
 
-             partition = next(part_iter)
 
-             return df.filter_parts(partition)
 
-         except StopIteration:
 
-             return None
 
- def get_odps_df_of_recent_partitions(project, table, n=1, rb_spec=None):
 
-     """
 
-     rb_spec: spec for right bound of partition names. type-dict
 
-     return odps.DataFrame
 
-     """
 
-     odps = get_odps_instance(project)
 
-     t = odps.get_table(table)
 
-     df = DataFrame(odps.get_table(table))
 
-     spec = None
 
-     if rb_spec:
 
-         spec = ','.join(['{}<{}'.format(k, rb_spec[k]) for k in rb_spec.keys()])
 
-     part_iter = t.iterate_partitions(spec=spec, reverse=True)
 
-     selected_parts = []
 
-     try:
 
-         for i in range(0, n):
 
-             partition = next(part_iter)
 
-             selected_parts.append(partition)
 
-             log_.info(f"table: {table}, selected part: {partition.name}")
 
-     except StopIteration:
 
-         log_.info(f"table: {table}, no more parts to iterate")
 
-     return df.filter_parts(selected_parts)
 
- def check_table_partition_exits(date, project, table, connect_timeout=3000, read_timeout=500000,
 
-                                 pool_maxsize=1000, pool_connections=1000):
 
-     """
 
-     判断表中是否存在这个分区
 
-     :param date: 日期 type-string '%Y%m%d'
 
-     :param project: type-string
 
-     :param table: 表名 type-string
 
-     :param connect_timeout: 连接超时设置
 
-     :param read_timeout: 读取超时设置
 
-     :param pool_maxsize:
 
-     :param pool_connections:
 
-     :return: records
 
-     """
 
-     odps = ODPS(
 
-         access_id=config_.ODPS_CONFIG['ACCESSID'],
 
-         secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
 
-         project=project,
 
-         endpoint=config_.ODPS_CONFIG['ENDPOINT'],
 
-         connect_timeout=connect_timeout,
 
-         read_timeout=read_timeout,
 
-         pool_maxsize=pool_maxsize,
 
-         pool_connections=pool_connections
 
-     )
 
-     t = odps.get_table(name=table)
 
-     return t.exist_partition(partition_spec=f'dt={date}')
 
- def check_table_partition_exits_v2(project, table, partition_spec_dict):
 
-     """
 
-     判断表中是否存在指定分区,并返回分区纪录数量
 
-     注:ODPS新版本移除了timeout等参数
 
-     :param project: 库名 type-string
 
-     :param table: 表名 type-string
 
-     :param partition_spec_dict: 分区spec type-dict
 
-     :return: if_exist, num_records
 
-     """
 
-     odps = get_odps_instance(project)
 
-     t = odps.get_table(name=table)
 
-     spec = ','.join(['{}={}'.format(k, partition_spec_dict[k]) for k in
 
-                      partition_spec_dict.keys()])
 
-     if t.exist_partition(partition_spec=spec):
 
-         with t.open_reader(partition=spec) as reader:
 
-             count = reader.count
 
-         return True, count
 
-     else:
 
-         return False, 0
 
- def write_to_pickle(data, filename, filepath=config_.DATA_DIR_PATH):
 
-     """
 
-     将数据写入pickle文件中
 
-     :param data: 数据
 
-     :param filename: 写入的文件名
 
-     :param filepath: 文件存放路径,默认为config_.DATA_DIR_PATH
 
-     :return: None
 
-     """
 
-     if not os.path.exists(filepath):
 
-         os.makedirs(filepath)
 
-     file = os.path.join(filepath, filename)
 
-     with open(file, 'wb') as wf:
 
-         pickle.dump(data, wf)
 
- def read_from_pickle(filename, filepath=config_.DATA_DIR_PATH):
 
-     """
 
-     从pickle文件读取数据
 
-     :param filename: 文件名
 
-     :param filepath: 文件存放路径,默认为config_.DATA_DIR_PATH
 
-     :return: data
 
-     """
 
-     file = os.path.join(filepath, filename)
 
-     if not os.path.exists(file):
 
-         return None
 
-     with open(file, 'rb') as rf:
 
-         data = pickle.load(rf)
 
-     return data
 
- def send_msg_to_feishu(webhook, key_word, msg_text):
 
-     """发送消息到飞书"""
 
-     headers = {'Content-Type': 'application/json'}
 
-     payload_message = {
 
-         "msg_type": "text",
 
-         "content": {
 
-             "text": '{}: {}'.format(key_word, msg_text)
 
-         }
 
-     }
 
-     response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
 
-     print(response.text)
 
- def send_msg_to_feishu_new(webhook, key_word, title, msg_list):
 
-     """发送消息到飞书"""
 
-     headers = {'Content-Type': 'application/json'}
 
-     content_list = [
 
-         [
 
-             {
 
-                 "tag": "text",
 
-                 "text": msg
 
-             }
 
-         ]
 
-         for msg in msg_list
 
-     ]
 
-     payload_message = {
 
-         "msg_type": "post",
 
-         "content": {
 
-             "post": {
 
-                 "zh_cn": {
 
-                     "title": f"{key_word}: {title}",
 
-                     "content": content_list,
 
-                 }
 
-             }
 
-         }
 
-     }
 
-     response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
 
-     print(response.text)
 
- def request_post(request_url, request_data=None, **kwargs):
 
-     """
 
-     post 请求 HTTP接口
 
-     :param request_url: 接口URL
 
-     :param request_data: 请求参数
 
-     :return: res_data json格式
 
-     """
 
-     try:
 
-         response = requests.post(url=request_url, json=request_data, **kwargs)
 
-         if response.status_code == 200:
 
-             res_data = json.loads(response.text)
 
-             return res_data
 
-         else:
 
-             log_.info(f"response.status_code: {response.status_code}")
 
-             return None
 
-     except Exception as e:
 
-         log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
 
-         send_msg_to_feishu(
 
-             webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
 
-             key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
 
-             msg_text='rov-offline{} - 接口请求失败:{}, exception: {}'.format(config_.ENV_TEXT, request_url, e)
 
-         )
 
-         return None
 
- def request_get(request_url):
 
-     """
 
-     get 请求 HTTP接口
 
-     :param request_url: 接口URL
 
-     :return: res_data json格式
 
-     """
 
-     try:
 
-         response = requests.get(url=request_url)
 
-         if response.status_code == 200:
 
-             res_data = json.loads(response.text)
 
-             return res_data
 
-         else:
 
-             log_.info(f"response.status_code: {response.status_code}")
 
-             return None
 
-     except Exception as e:
 
-         log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
 
-         send_msg_to_feishu(
 
-             webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
 
-             key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
 
-             msg_text='rov-offline{} - 接口请求失败:{}, exception: {}'.format(config_.ENV_TEXT, request_url, e)
 
-         )
 
-         return None
 
- def data_normalization(data):
 
-     """
 
-     对结果做归一化处理(Min-Max Normalization),将分数控制在[0, 100]
 
-     :param data: type-list
 
-     :return: normal_data, type-list 归一化后的数据
 
-     """
 
-     x_max = max(data)
 
-     x_min = min(data)
 
-     normal_data = [(x-x_min)/(x_max-x_min)*100 for x in data]
 
-     return normal_data
 
- def filter_video_status(video_ids):
 
-     """
 
-     对视频状态进行过滤
 
-     :param video_ids: 视频id列表 type-list
 
-     :return: filtered_videos
 
-     """
 
-     i = 0
 
-     while i < 3:
 
-         try:
 
-             mysql_helper = MysqlHelper(mysql_info=config_.FILTER_MYSQL_INFO)
 
-             video_status_sql = "SELECT t1.id AS 'video_id', " \
 
-                                "t1.transcode_status AS 'transcoding_status', " \
 
-                                "t2.audit_status AS 'audit_status', " \
 
-                                "t2.video_status AS 'open_status', " \
 
-                                "t2.recommend_status AS 'applet_rec_status', " \
 
-                                "t2.app_recommend_status AS 'app_rec_status', " \
 
-                                "t3.charge AS 'payment_status', " \
 
-                                "case when t4.max_validate_count is null then 0 else t4.max_validate_count end AS 'encryption_status' " \
 
-                                "FROM longvideo.wx_video t1 " \
 
-                                "LEFT JOIN longvideo.wx_video_status t2 ON t1.id= t2.video_id " \
 
-                                "LEFT JOIN longvideo.wx_video_detail t3 ON t1.id= t3.video_id " \
 
-                                "LEFT JOIN longvideo.wx_video_pwd t4 ON t1.id= t4.video_id"
 
-             if len(video_ids) == 1:
 
-                 sql = "SELECT video_id " \
 
-                       "FROM ({}) " \
 
-                       "WHERE audit_status = 5 " \
 
-                       "AND applet_rec_status IN (1, -6) " \
 
-                       "AND open_status = 1 " \
 
-                       "AND payment_status = 0 " \
 
-                       "AND encryption_status != 5 " \
 
-                       "AND transcoding_status = 3 " \
 
-                       "AND video_id IN ({});".format(video_status_sql, video_ids[0])
 
-                 data = mysql_helper.get_data(sql=sql)
 
-             else:
 
-                 data = []
 
-                 for i in range(len(video_ids) // 200 + 1):
 
-                     sql = "SELECT video_id " \
 
-                           "FROM ({}) " \
 
-                           "WHERE audit_status = 5 " \
 
-                           "AND applet_rec_status IN (1, -6) " \
 
-                           "AND open_status = 1 " \
 
-                           "AND payment_status = 0 " \
 
-                           "AND encryption_status != 5 " \
 
-                           "AND transcoding_status = 3 " \
 
-                           "AND video_id IN {};".format(video_status_sql, tuple(video_ids[i*200:(i+1)*200]))
 
-                     select_res = mysql_helper.get_data(sql=sql)
 
-                     if select_res is not None:
 
-                         data += select_res
 
-             filtered_videos = [int(temp[0]) for temp in data]
 
-             return filtered_videos
 
-         except Exception as e:
 
-             log_.error(f"过滤失败, exception: {e}, traceback: {traceback.format_exc()}")
 
-             send_msg_to_feishu(
 
-                 webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
 
-                 key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
 
-                 msg_text=f"rov-offline{config_.ENV_TEXT} - 过滤失败\n"
 
-                          f"retry count: {i}\n"
 
-                          f"exception: {e}\n"
 
-                          f"traceback: {traceback.format_exc()}"
 
-             )
 
-             i += 1
 
-             if i == 1:
 
-                 return video_ids
 
- def filter_video_status_with_applet_rec(video_ids, applet_rec_status):
 
-     """
 
-     对视频状态进行过滤
 
-     :param video_ids: 视频id列表 type-list
 
-     :param applet_rec_status: 小程序推荐状态 -6:待推荐 1:普通推荐
 
-     :return: filtered_videos
 
-     """
 
-     i = 0
 
-     while i < 3:
 
-         try:
 
-             mysql_helper = MysqlHelper(mysql_info=config_.FILTER_MYSQL_INFO)
 
-             video_status_sql = "SELECT t1.id AS 'video_id', " \
 
-                                "t1.transcode_status AS 'transcoding_status', " \
 
-                                "t2.audit_status AS 'audit_status', " \
 
-                                "t2.video_status AS 'open_status', " \
 
-                                "t2.recommend_status AS 'applet_rec_status', " \
 
-                                "t2.app_recommend_status AS 'app_rec_status', " \
 
-                                "t3.charge AS 'payment_status', " \
 
-                                "case when t4.max_validate_count is null then 0 else t4.max_validate_count end AS 'encryption_status' " \
 
-                                "FROM longvideo.wx_video t1 " \
 
-                                "LEFT JOIN longvideo.wx_video_status t2 ON t1.id= t2.video_id " \
 
-                                "LEFT JOIN longvideo.wx_video_detail t3 ON t1.id= t3.video_id " \
 
-                                "LEFT JOIN longvideo.wx_video_pwd t4 ON t1.id= t4.video_id"
 
-             if len(video_ids) == 1:
 
-                 sql = "SELECT video_id " \
 
-                       "FROM ({}) " \
 
-                       "WHERE audit_status = 5 " \
 
-                       "AND applet_rec_status = {} " \
 
-                       "AND open_status = 1 " \
 
-                       "AND payment_status = 0 " \
 
-                       "AND encryption_status != 5 " \
 
-                       "AND transcoding_status = 3 " \
 
-                       "AND video_id IN ({});".format(video_status_sql, applet_rec_status, video_ids[0])
 
-                 data = mysql_helper.get_data(sql=sql)
 
-             else:
 
-                 data = []
 
-                 for i in range(len(video_ids) // 200 + 1):
 
-                     sql = "SELECT video_id " \
 
-                           "FROM ({}) " \
 
-                           "WHERE audit_status = 5 " \
 
-                           "AND applet_rec_status = {} " \
 
-                           "AND open_status = 1 " \
 
-                           "AND payment_status = 0 " \
 
-                           "AND encryption_status != 5 " \
 
-                           "AND transcoding_status = 3 " \
 
-                           "AND video_id IN {};".format(video_status_sql, applet_rec_status,
 
-                                                        tuple(video_ids[i*200:(i+1)*200]))
 
-                     select_res = mysql_helper.get_data(sql=sql)
 
-                     if select_res is not None:
 
-                         data += select_res
 
-             filtered_videos = [int(temp[0]) for temp in data]
 
-             return filtered_videos
 
-         except Exception as e:
 
-             log_.error(f"过滤失败, exception: {e}, traceback: {traceback.format_exc()}")
 
-             send_msg_to_feishu(
 
-                 webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
 
-                 key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
 
-                 msg_text=f"rov-offline{config_.ENV_TEXT} - 过滤失败\n"
 
-                          f"retry count: {i}\n"
 
-                          f"exception: {e}\n"
 
-                          f"traceback: {traceback.format_exc()}"
 
-             )
 
-             i += 1
 
-             if i == 1:
 
-                 return video_ids
 
- def filter_video_status_app(video_ids):
 
-     """
 
-     对视频状态进行过滤 - app
 
-     :param video_ids: 视频id列表 type-list
 
-     :return: filtered_videos
 
-     """
 
-     i = 0
 
-     while i < 3:
 
-         try:
 
-             mysql_helper = MysqlHelper(mysql_info=config_.FILTER_MYSQL_INFO)
 
-             video_status_sql = "SELECT t1.id AS 'video_id', " \
 
-                                "t1.transcode_status AS 'transcoding_status', " \
 
-                                "t2.app_audit_status AS 'app_audit_status', " \
 
-                                "t2.original_status AS 'open_status', " \
 
-                                "t2.recommend_status AS 'applet_rec_status', " \
 
-                                "t2.app_recommend_status AS 'app_rec_status', " \
 
-                                "t3.charge AS 'payment_status', " \
 
-                                "case when t4.max_validate_count is null then 0 else t4.max_validate_count end AS 'encryption_status' " \
 
-                                "FROM longvideo.wx_video t1 " \
 
-                                "LEFT JOIN longvideo.wx_video_status t2 ON t1.id= t2.video_id " \
 
-                                "LEFT JOIN longvideo.wx_video_detail t3 ON t1.id= t3.video_id " \
 
-                                "LEFT JOIN longvideo.wx_video_pwd t4 ON t1.id= t4.video_id"
 
-             if len(video_ids) == 1:
 
-                 sql = "SELECT video_id " \
 
-                       "FROM ({}) " \
 
-                       "WHERE app_audit_status = 5 " \
 
-                       "AND app_rec_status IN (1, -6, 10) " \
 
-                       "AND open_status = 1 " \
 
-                       "AND payment_status = 0 " \
 
-                       "AND encryption_status != 5 " \
 
-                       "AND transcoding_status = 3 " \
 
-                       "AND video_id IN ({});".format(video_status_sql, video_ids[0])
 
-                 data = mysql_helper.get_data(sql=sql)
 
-             else:
 
-                 data = []
 
-                 for i in range(len(video_ids) // 200 + 1):
 
-                     sql = "SELECT video_id " \
 
-                           "FROM ({}) " \
 
-                           "WHERE app_audit_status = 5 " \
 
-                           "AND app_rec_status IN (1, -6, 10) " \
 
-                           "AND open_status = 1 " \
 
-                           "AND payment_status = 0 " \
 
-                           "AND encryption_status != 5 " \
 
-                           "AND transcoding_status = 3 " \
 
-                           "AND video_id IN {};".format(video_status_sql, tuple(video_ids[i*200:(i+1)*200]))
 
-                     select_res = mysql_helper.get_data(sql=sql)
 
-                     if select_res is not None:
 
-                         data += select_res
 
-             filtered_videos = [int(temp[0]) for temp in data]
 
-             return filtered_videos
 
-         except Exception as e:
 
-             log_.error(f"过滤失败, exception: {e}, traceback: {traceback.format_exc()}")
 
-             send_msg_to_feishu(
 
-                 webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
 
-                 key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
 
-                 msg_text=f"rov-offline{config_.ENV_TEXT} - 过滤失败\n"
 
-                          f"retry count: {i}\n"
 
-                          f"exception: {e}\n"
 
-                          f"traceback: {traceback.format_exc()}"
 
-             )
 
-             i += 1
 
-             if i == 1:
 
-                 return video_ids
 
- def filter_shield_video(video_ids, shield_key_name_list):
 
-     """
 
-     过滤屏蔽视频视频
 
-     :param video_ids: 需过滤的视频列表 type-list
 
-     :param shield_key_name_list: 过滤视频 redis-key
 
-     :return: filtered_videos  过滤后的列表  type-list
 
-     """
 
-     if len(video_ids) == 0:
 
-         return video_ids
 
-     
 
-     redis_helper = RedisHelper()
 
-     for shield_key_name in shield_key_name_list:
 
-         shield_videos_list = redis_helper.get_data_from_set(key_name=shield_key_name)
 
-         if not shield_videos_list:
 
-             continue
 
-         shield_videos = [int(video) for video in shield_videos_list]
 
-         video_ids = [int(video_id) for video_id in video_ids if int(video_id) not in shield_videos]
 
-     return video_ids
 
- def filter_political_videos(video_ids):
 
-     """
 
-     过滤涉政视频
 
-     :param video_ids: 需过滤的视频列表 type-list
 
-     :return: filtered_video_ids  过滤后的列表  type-list
 
-     """
 
-     if len(video_ids) == 0:
 
-         return video_ids
 
-     
 
-     redis_helper = RedisHelper()
 
-     political_key_name = config_.POLITICAL_VIDEOS_KEY_NAME
 
-     political_videos_list = redis_helper.get_data_from_set(key_name=political_key_name)
 
-     if not political_videos_list:
 
-         return video_ids
 
-     political_videos = [int(video) for video in political_videos_list]
 
-     filtered_video_ids = [int(video_id) for video_id in video_ids if int(video_id) not in political_videos]
 
-     return filtered_video_ids
 
- def update_video_w_h_rate(video_ids, key_name):
 
-     """
 
-     获取横屏视频的宽高比,并存入redis中 (width/height>1)
 
-     :param video_ids: videoId列表 type-list
 
-     :param key_name: redis key
 
-     :return: None
 
-     """
 
-     
 
-     if len(video_ids) == 1:
 
-         sql = "SELECT id, width, height, rotate FROM longvideo.wx_video WHERE id = {};".format(video_ids[0])
 
-     else:
 
-         sql = "SELECT id, width, height, rotate FROM longvideo.wx_video WHERE id IN {};".format(tuple(video_ids))
 
-     mysql_helper = MysqlHelper(mysql_info=config_.MYSQL_INFO)
 
-     data = mysql_helper.get_data(sql=sql)
 
-     
 
-     info_data = {}
 
-     for video_id, width, height, rotate in data:
 
-         if int(width) == 0 or int(height) == 0:
 
-             continue
 
-         
 
-         if int(rotate) in (90, 270):
 
-             w_h_rate = int(height) / int(width)
 
-         else:
 
-             w_h_rate = int(width) / int(height)
 
-         if w_h_rate > 1:
 
-             info_data[int(video_id)] = w_h_rate
 
-     redis_helper = RedisHelper()
 
-     
 
-     redis_helper.del_keys(key_name=key_name)
 
-     
 
-     if len(info_data) > 0:
 
-         redis_helper.add_data_with_zset(key_name=key_name, data=info_data)
 
- def data_check(project, table, dt):
 
-     """检查数据是否准备好"""
 
-     odps = ODPS(
 
-         access_id=config_.ODPS_CONFIG['ACCESSID'],
 
-         secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
 
-         project=project,
 
-         endpoint=config_.ODPS_CONFIG['ENDPOINT'],
 
-         connect_timeout=3000,
 
-         read_timeout=500000,
 
-         pool_maxsize=1000,
 
-         pool_connections=1000
 
-     )
 
-     try:
 
-         check_res = check_table_partition_exits(date=dt, project=project, table=table)
 
-         if check_res:
 
-             sql = f'select * from {project}.{table} where dt = {dt}'
 
-             with odps.execute_sql(sql=sql).open_reader() as reader:
 
-                 data_count = reader.count
 
-         else:
 
-             data_count = 0
 
-     except Exception as e:
 
-         data_count = 0
 
-     return data_count
 
- def get_feature_data(project, table, features, dt):
 
-     """获取特征数据"""
 
-     records = get_data_from_odps(date=dt, project=project, table=table)
 
-     feature_data = []
 
-     for record in records:
 
-         item = {}
 
-         for feature_name in features:
 
-             item[feature_name] = record[feature_name]
 
-         feature_data.append(item)
 
-     feature_df = pd.DataFrame(feature_data)
 
-     return feature_df
 
- if __name__ == '__main__':
 
-     
 
-     
 
-     
 
-     
 
-     
 
-     project = config_.PROJECT_24H_APP_TYPE
 
-     table = config_.TABLE_24H_APP_TYPE
 
-     dt = '2022080115'
 
-     check_res = check_table_partition_exits(date=dt, project=project, table=table)
 
-     print(check_res)
 
 
  |