| 
					
				 | 
			
			
				@@ -0,0 +1,550 @@ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+# coding:utf-8
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import pickle
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import os
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import requests
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import json
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import traceback
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import pandas as pd
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+from odps import ODPS
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+from config import set_config
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+from db_help import  MysqlHelper, RedisHelper
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+#from log import Log
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+config_ = set_config()
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+#log_ = Log()
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+def execute_sql_from_odps(project, sql, connect_timeout=3000, read_timeout=500000,
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                       pool_maxsize=1000, pool_connections=1000):
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    odps = ODPS(
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        access_id=config_.ODPS_CONFIG['ACCESSID'],
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        project=project,
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        endpoint=config_.ODPS_CONFIG['ENDPOINT'],
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        connect_timeout=connect_timeout,
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        read_timeout=read_timeout,
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        pool_maxsize=pool_maxsize,
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        pool_connections=pool_connections
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    )
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    records = odps.execute_sql(sql=sql)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    return records
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=500000,
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                       pool_maxsize=1000, pool_connections=1000):
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    """
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    从odps获取数据
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :param date: 日期 type-string '%Y%m%d'
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :param project: type-string
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :param table: 表名 type-string
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :param connect_timeout: 连接超时设置
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :param read_timeout: 读取超时设置
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :param pool_maxsize:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :param pool_connections:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :return: records
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    """
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    odps = ODPS(
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        access_id=config_.ODPS_CONFIG['ACCESSID'],
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        project=project,
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        endpoint=config_.ODPS_CONFIG['ENDPOINT'],
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        connect_timeout=connect_timeout,
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        read_timeout=read_timeout,
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        pool_maxsize=pool_maxsize,
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        pool_connections=pool_connections
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    )
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    records = odps.read_table(name=table, partition='dt=%s' % date)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    return records
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+def check_table_partition_exits(date, project, table, connect_timeout=3000, read_timeout=500000,
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                pool_maxsize=1000, pool_connections=1000):
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    """
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    判断表中是否存在这个分区
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :param date: 日期 type-string '%Y%m%d'
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :param project: type-string
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :param table: 表名 type-string
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :param connect_timeout: 连接超时设置
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :param read_timeout: 读取超时设置
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :param pool_maxsize:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :param pool_connections:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :return: records
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    """
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    odps = ODPS(
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        access_id=config_.ODPS_CONFIG['ACCESSID'],
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        project=project,
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        endpoint=config_.ODPS_CONFIG['ENDPOINT'],
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        connect_timeout=connect_timeout,
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        read_timeout=read_timeout,
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        pool_maxsize=pool_maxsize,
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        pool_connections=pool_connections
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    )
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    t = odps.get_table(name=table)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    return t.exist_partition(partition_spec=f'dt={date}')
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+'''def write_to_pickle(data, filename, filepath=config_.DATA_DIR_PATH):
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    """
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    将数据写入pickle文件中
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :param data: 数据
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :param filename: 写入的文件名
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :param filepath: 文件存放路径,默认为config_.DATA_DIR_PATH
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :return: None
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    """
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    if not os.path.exists(filepath):
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        os.makedirs(filepath)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    file = os.path.join(filepath, filename)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    with open(file, 'wb') as wf:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        pickle.dump(data, wf)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+def read_from_pickle(filename, filepath=config_.DATA_DIR_PATH):
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    """
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    从pickle文件读取数据
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :param filename: 文件名
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :param filepath: 文件存放路径,默认为config_.DATA_DIR_PATH
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :return: data
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    """
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    file = os.path.join(filepath, filename)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    if not os.path.exists(file):
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        return None
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    with open(file, 'rb') as rf:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        data = pickle.load(rf)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    return data '''
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+def send_msg_to_feishu(webhook, key_word, msg_text):
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    """发送消息到飞书"""
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    headers = {'Content-Type': 'application/json'}
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    payload_message = {
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        "msg_type": "text",
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        "content": {
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            "text": '{}: {}'.format(key_word, msg_text)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        }
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    }
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    print(response.text)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+def send_msg_to_feishu_new(webhook, key_word, title, msg_list):
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    """发送消息到飞书"""
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    headers = {'Content-Type': 'application/json'}
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    content_list = [
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        [
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            {
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                "tag": "text",
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                "text": msg
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            }
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        ]
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        for msg in msg_list
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    ]
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    payload_message = {
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        "msg_type": "post",
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        "content": {
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            "post": {
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                "zh_cn": {
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    "title": f"{key_word}: {title}",
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    "content": content_list,
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                }
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            }
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        }
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    }
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    print(response.text)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+def request_post(request_url, request_data=None, **kwargs):
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    """
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    post 请求 HTTP接口
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :param request_url: 接口URL
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :param request_data: 请求参数
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :return: res_data json格式
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    """
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    try:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        response = requests.post(url=request_url, json=request_data, **kwargs)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if response.status_code == 200:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            res_data = json.loads(response.text)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            return res_data
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        else:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            #log_.info(f"response.status_code: {response.status_code}")
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            return None
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    except Exception as e:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        #log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        send_msg_to_feishu(
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            msg_text='rov-offline{} - 接口请求失败:{}, exception: {}'.format(config_.ENV_TEXT, request_url, e)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        )
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        return None
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+def request_get(request_url):
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    """
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    get 请求 HTTP接口
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :param request_url: 接口URL
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :return: res_data json格式
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    """
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    try:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        response = requests.get(url=request_url)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if response.status_code == 200:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            res_data = json.loads(response.text)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            return res_data
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        else:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            #log_.info(f"response.status_code: {response.status_code}")
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            return None
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    except Exception as e:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        #log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        send_msg_to_feishu(
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            msg_text='rov-offline{} - 接口请求失败:{}, exception: {}'.format(config_.ENV_TEXT, request_url, e)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        )
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        return None
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+def data_normalization(data):
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    """
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    对结果做归一化处理(Min-Max Normalization),将分数控制在[0, 100]
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :param data: type-list
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :return: normal_data, type-list 归一化后的数据
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    """
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    x_max = max(data)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    x_min = min(data)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    normal_data = [(x-x_min)/(x_max-x_min)*100 for x in data]
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    return normal_data
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+def filter_video_status(video_ids):
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    """
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    对视频状态进行过滤
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :param video_ids: 视频id列表 type-list
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :return: filtered_videos
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    """
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    i = 0
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    while i < 3:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        try:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            mysql_helper = MysqlHelper(mysql_info=config_.FILTER_MYSQL_INFO)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            video_status_sql = "SELECT t1.id AS 'video_id', " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                               "t1.transcode_status AS 'transcoding_status', " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                               "t2.audit_status AS 'audit_status', " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                               "t2.video_status AS 'open_status', " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                               "t2.recommend_status AS 'applet_rec_status', " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                               "t2.app_recommend_status AS 'app_rec_status', " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                               "t3.charge AS 'payment_status', " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                               "case when t4.max_validate_count is null then 0 else t4.max_validate_count end AS 'encryption_status' " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                               "FROM longvideo.wx_video t1 " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                               "LEFT JOIN longvideo.wx_video_status t2 ON t1.id= t2.video_id " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                               "LEFT JOIN longvideo.wx_video_detail t3 ON t1.id= t3.video_id " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                               "LEFT JOIN longvideo.wx_video_pwd t4 ON t1.id= t4.video_id"
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if len(video_ids) == 1:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                sql = "SELECT video_id " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                      "FROM ({}) " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                      "WHERE audit_status = 5 " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                      "AND applet_rec_status IN (1, -6) " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                      "AND open_status = 1 " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                      "AND payment_status = 0 " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                      "AND encryption_status != 5 " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                      "AND transcoding_status = 3 " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                      "AND video_id IN ({});".format(video_status_sql, video_ids[0])
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                data = mysql_helper.get_data(sql=sql)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            else:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                data = []
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                for i in range(len(video_ids) // 200 + 1):
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    sql = "SELECT video_id " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                          "FROM ({}) " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                          "WHERE audit_status = 5 " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                          "AND applet_rec_status IN (1, -6) " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                          "AND open_status = 1 " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                          "AND payment_status = 0 " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                          "AND encryption_status != 5 " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                          "AND transcoding_status = 3 " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                          "AND video_id IN {};".format(video_status_sql, tuple(video_ids[i*200:(i+1)*200]))
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    select_res = mysql_helper.get_data(sql=sql)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    if select_res is not None:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        data += select_res
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            filtered_videos = [int(temp[0]) for temp in data]
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            return filtered_videos
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        except Exception as e:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            #log_.error(f"过滤失败, exception: {e}, traceback: {traceback.format_exc()}")
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            send_msg_to_feishu(
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                msg_text=f"rov-offline{config_.ENV_TEXT} - 过滤失败\n"
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                         f"retry count: {i}\n"
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                         f"exception: {e}\n"
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                         f"traceback: {traceback.format_exc()}"
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            )
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            i += 1
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if i == 1:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                return video_ids
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+def filter_video_status_with_applet_rec(video_ids, applet_rec_status):
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    """
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    对视频状态进行过滤
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :param video_ids: 视频id列表 type-list
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :param applet_rec_status: 小程序推荐状态 -6:待推荐 1:普通推荐
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :return: filtered_videos
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    """
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    i = 0
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    while i < 3:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        try:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            mysql_helper = MysqlHelper(mysql_info=config_.FILTER_MYSQL_INFO)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            video_status_sql = "SELECT t1.id AS 'video_id', " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                               "t1.transcode_status AS 'transcoding_status', " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                               "t2.audit_status AS 'audit_status', " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                               "t2.video_status AS 'open_status', " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                               "t2.recommend_status AS 'applet_rec_status', " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                               "t2.app_recommend_status AS 'app_rec_status', " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                               "t3.charge AS 'payment_status', " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                               "case when t4.max_validate_count is null then 0 else t4.max_validate_count end AS 'encryption_status' " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                               "FROM longvideo.wx_video t1 " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                               "LEFT JOIN longvideo.wx_video_status t2 ON t1.id= t2.video_id " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                               "LEFT JOIN longvideo.wx_video_detail t3 ON t1.id= t3.video_id " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                               "LEFT JOIN longvideo.wx_video_pwd t4 ON t1.id= t4.video_id"
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if len(video_ids) == 1:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                sql = "SELECT video_id " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                      "FROM ({}) " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                      "WHERE audit_status = 5 " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                      "AND applet_rec_status = {} " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                      "AND open_status = 1 " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                      "AND payment_status = 0 " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                      "AND encryption_status != 5 " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                      "AND transcoding_status = 3 " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                      "AND video_id IN ({});".format(video_status_sql, applet_rec_status, video_ids[0])
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                data = mysql_helper.get_data(sql=sql)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            else:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                data = []
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                for i in range(len(video_ids) // 200 + 1):
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    sql = "SELECT video_id " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                          "FROM ({}) " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                          "WHERE audit_status = 5 " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                          "AND applet_rec_status = {} " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                          "AND open_status = 1 " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                          "AND payment_status = 0 " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                          "AND encryption_status != 5 " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                          "AND transcoding_status = 3 " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                          "AND video_id IN {};".format(video_status_sql, applet_rec_status,
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                                       tuple(video_ids[i*200:(i+1)*200]))
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    select_res = mysql_helper.get_data(sql=sql)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    if select_res is not None:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        data += select_res
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            filtered_videos = [int(temp[0]) for temp in data]
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            return filtered_videos
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        except Exception as e:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            #log_.error(f"过滤失败, exception: {e}, traceback: {traceback.format_exc()}")
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            send_msg_to_feishu(
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                msg_text=f"rov-offline{config_.ENV_TEXT} - 过滤失败\n"
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                         f"retry count: {i}\n"
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                         f"exception: {e}\n"
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                         f"traceback: {traceback.format_exc()}"
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            )
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            i += 1
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if i == 1:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                return video_ids
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+def filter_video_status_app(video_ids):
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    """
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    对视频状态进行过滤 - app
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :param video_ids: 视频id列表 type-list
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :return: filtered_videos
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    """
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    i = 0
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    while i < 3:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        try:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            mysql_helper = MysqlHelper(mysql_info=config_.FILTER_MYSQL_INFO)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            video_status_sql = "SELECT t1.id AS 'video_id', " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                               "t1.transcode_status AS 'transcoding_status', " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                               "t2.app_audit_status AS 'app_audit_status', " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                               "t2.original_status AS 'open_status', " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                               "t2.recommend_status AS 'applet_rec_status', " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                               "t2.app_recommend_status AS 'app_rec_status', " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                               "t3.charge AS 'payment_status', " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                               "case when t4.max_validate_count is null then 0 else t4.max_validate_count end AS 'encryption_status' " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                               "FROM longvideo.wx_video t1 " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                               "LEFT JOIN longvideo.wx_video_status t2 ON t1.id= t2.video_id " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                               "LEFT JOIN longvideo.wx_video_detail t3 ON t1.id= t3.video_id " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                               "LEFT JOIN longvideo.wx_video_pwd t4 ON t1.id= t4.video_id"
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if len(video_ids) == 1:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                sql = "SELECT video_id " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                      "FROM ({}) " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                      "WHERE app_audit_status = 5 " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                      "AND app_rec_status IN (1, -6, 10) " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                      "AND open_status = 1 " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                      "AND payment_status = 0 " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                      "AND encryption_status != 5 " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                      "AND transcoding_status = 3 " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                      "AND video_id IN ({});".format(video_status_sql, video_ids[0])
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                data = mysql_helper.get_data(sql=sql)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            else:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                data = []
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                for i in range(len(video_ids) // 200 + 1):
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    sql = "SELECT video_id " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                          "FROM ({}) " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                          "WHERE app_audit_status = 5 " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                          "AND app_rec_status IN (1, -6, 10) " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                          "AND open_status = 1 " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                          "AND payment_status = 0 " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                          "AND encryption_status != 5 " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                          "AND transcoding_status = 3 " \
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                          "AND video_id IN {};".format(video_status_sql, tuple(video_ids[i*200:(i+1)*200]))
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    select_res = mysql_helper.get_data(sql=sql)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    if select_res is not None:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        data += select_res
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            filtered_videos = [int(temp[0]) for temp in data]
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            return filtered_videos
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        except Exception as e:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            #log_.error(f"过滤失败, exception: {e}, traceback: {traceback.format_exc()}")
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            send_msg_to_feishu(
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                msg_text=f"rov-offline{config_.ENV_TEXT} - 过滤失败\n"
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                         f"retry count: {i}\n"
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                         f"exception: {e}\n"
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                         f"traceback: {traceback.format_exc()}"
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            )
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            i += 1
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if i == 1:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                return video_ids
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+def filter_shield_video(video_ids, shield_key_name_list):
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    """
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    过滤屏蔽视频视频
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :param video_ids: 需过滤的视频列表 type-list
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :param shield_key_name_list: 过滤视频 redis-key
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :return: filtered_videos  过滤后的列表  type-list
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    """
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    if len(video_ids) == 0:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        return video_ids
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    # 根据Redis缓存中的数据过滤
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    redis_helper = RedisHelper()
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    for shield_key_name in shield_key_name_list:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        shield_videos_list = redis_helper.get_data_from_set(key_name=shield_key_name)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if not shield_videos_list:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            continue
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        shield_videos = [int(video) for video in shield_videos_list]
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        video_ids = [int(video_id) for video_id in video_ids if int(video_id) not in shield_videos]
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    return video_ids
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+def filter_political_videos(video_ids):
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    """
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    过滤涉政视频
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :param video_ids: 需过滤的视频列表 type-list
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :return: filtered_video_ids  过滤后的列表  type-list
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    """
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    if len(video_ids) == 0:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        return video_ids
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    # 根据Redis缓存中的数据过滤
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    redis_helper = RedisHelper()
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    political_key_name = config_.POLITICAL_VIDEOS_KEY_NAME
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    political_videos_list = redis_helper.get_data_from_set(key_name=political_key_name)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    if not political_videos_list:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        return video_ids
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    political_videos = [int(video) for video in political_videos_list]
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    filtered_video_ids = [int(video_id) for video_id in video_ids if int(video_id) not in political_videos]
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    return filtered_video_ids
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+def update_video_w_h_rate(video_ids, key_name):
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    """
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    获取横屏视频的宽高比,并存入redis中 (width/height>1)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :param video_ids: videoId列表 type-list
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :param key_name: redis key
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    :return: None
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    """
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    # 获取数据
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    if len(video_ids) == 1:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        sql = "SELECT id, width, height, rotate FROM longvideo.wx_video WHERE id = {};".format(video_ids[0])
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    else:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        sql = "SELECT id, width, height, rotate FROM longvideo.wx_video WHERE id IN {};".format(tuple(video_ids))
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    mysql_helper = MysqlHelper(mysql_info=config_.MYSQL_INFO)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    data = mysql_helper.get_data(sql=sql)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    # 更新到redis
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    info_data = {}
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    for video_id, width, height, rotate in data:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if int(width) == 0 or int(height) == 0:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            continue
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        # rotate 字段值为 90或270时,width和height的值相反
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if int(rotate) in (90, 270):
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            w_h_rate = int(height) / int(width)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        else:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            w_h_rate = int(width) / int(height)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if w_h_rate > 1:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            info_data[int(video_id)] = w_h_rate
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    redis_helper = RedisHelper()
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    # 删除旧数据
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    redis_helper.del_keys(key_name=key_name)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    # 写入新数据
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    if len(info_data) > 0:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        redis_helper.add_data_with_zset(key_name=key_name, data=info_data)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+def data_check(project, table, dt):
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    """检查数据是否准备好"""
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    odps = ODPS(
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        access_id=config_.ODPS_CONFIG['ACCESSID'],
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        project=project,
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        endpoint=config_.ODPS_CONFIG['ENDPOINT'],
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        connect_timeout=3000,
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        read_timeout=500000,
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        pool_maxsize=1000,
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        pool_connections=1000
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    )
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    try:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        check_res = check_table_partition_exits(date=dt, project=project, table=table)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if check_res:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            sql = f'select * from {project}.{table} where dt = {dt}'
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            with odps.execute_sql(sql=sql).open_reader() as reader:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                data_count = reader.count
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        else:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            data_count = 0
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    except Exception as e:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        data_count = 0
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    return data_count
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+def get_feature_data(project, table, features, dt):
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    """获取特征数据"""
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    records = get_data_from_odps(date=dt, project=project, table=table)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    feature_data = []
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    for record in records:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        item = {}
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        for feature_name in features:
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            item[feature_name] = record[feature_name]
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        feature_data.append(item)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    feature_df = pd.DataFrame(feature_data)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    return feature_df
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+if __name__ == '__main__':
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    # data_test = [9.20273281e+03, 7.00795065e+03, 5.54813112e+03, 9.97402494e-01, 9.96402495e-01, 9.96402494e-01]
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    # data_normalization(data_test)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    # request_post(request_url=config_.NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL, request_data={'videos': []})
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    # video_ids = [110, 112, 113, 115, 116, 117, 8289883]
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    # update_video_w_h_rate(video_ids=video_ids, key_name='')
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    project = config_.PROJECT_24H_APP_TYPE
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    table = config_.TABLE_24H_APP_TYPE
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    dt = '2022080115'
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    check_res = check_table_partition_exits(date=dt, project=project, table=table)
 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    print(check_res)
 
			 |