Quellcode durchsuchen

Merge branch 'filter-update-20220928' into test

liqian vor 2 Jahren
Ursprung
Commit
b6405b5208
4 geänderte Dateien mit 275 neuen und 205 gelöschten Zeilen
  1. 51 33
      region_rule_rank_h.py
  2. 36 24
      region_rule_rank_h_by24h.py
  3. 36 20
      rule_rank_h_by_24h.py
  4. 152 128
      utils.py

+ 51 - 33
region_rule_rank_h.py

@@ -7,6 +7,7 @@
 import multiprocessing
 import os
 import sys
+import traceback
 
 import gevent
 import datetime
@@ -16,7 +17,7 @@ from functools import reduce
 from odps import ODPS
 from threading import Timer, Thread
 from utils import MysqlHelper, RedisHelper, get_data_from_odps, filter_video_status, filter_shield_video, \
-    check_table_partition_exits, filter_video_status_app
+    check_table_partition_exits, filter_video_status_app, send_msg_to_feishu
 from config import set_config
 from log import Log
 from check_video_limit_distribute import update_limit_video_score
@@ -714,38 +715,55 @@ def h_rank_bottom(now_date, now_h, rule_params, region_code_list, rule_rank_h_fl
 
 
 def h_timer_check():
-    rule_rank_h_flag = sys.argv[1]
-    if rule_rank_h_flag == '48h':
-        rule_params = config_.RULE_PARAMS_REGION_APP_TYPE_48H
-    else:
-        rule_params = config_.RULE_PARAMS_REGION_APP_TYPE
-    project = config_.PROJECT_REGION_APP_TYPE
-    table = config_.TABLE_REGION_APP_TYPE
-    region_code_list = [code for region, code in region_code.items()]
-    now_date = datetime.datetime.today()
-    log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}, rule_rank_h_flag: {rule_rank_h_flag}")
-    now_h = datetime.datetime.now().hour
-    now_min = datetime.datetime.now().minute
-    if now_h == 0:
-        h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params, region_code_list=region_code_list,
-                      rule_rank_h_flag=rule_rank_h_flag)
-        return
-    # 查看当前小时更新的数据是否已准备好
-    h_data_count = h_data_check(project=project, table=table, now_date=now_date)
-    if h_data_count > 0:
-        log_.info(f'region_h_data_count = {h_data_count}')
-        # 数据准备好,进行更新
-        rank_by_h(now_date=now_date, now_h=now_h, rule_params=rule_params,
-                  project=project, table=table, region_code_list=region_code_list, rule_rank_h_flag=rule_rank_h_flag)
-        log_.info(f"region_h_data end!")
-    elif now_min > 50:
-        log_.info('h_recall data is None, use bottom data!')
-        h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params, region_code_list=region_code_list,
-                      rule_rank_h_flag=rule_rank_h_flag)
-        log_.info(f"region_h_data end!")
-    else:
-        # 数据没准备好,1分钟后重新检查
-        Timer(60, h_timer_check).start()
+    try:
+        rule_rank_h_flag = sys.argv[1]
+        if rule_rank_h_flag == '48h':
+            rule_params = config_.RULE_PARAMS_REGION_APP_TYPE_48H
+        else:
+            rule_params = config_.RULE_PARAMS_REGION_APP_TYPE
+        project = config_.PROJECT_REGION_APP_TYPE
+        table = config_.TABLE_REGION_APP_TYPE
+        region_code_list = [code for region, code in region_code.items()]
+        now_date = datetime.datetime.today()
+        log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}, rule_rank_h_flag: {rule_rank_h_flag}")
+        now_h = datetime.datetime.now().hour
+        now_min = datetime.datetime.now().minute
+        if now_h == 0:
+            h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params, region_code_list=region_code_list,
+                          rule_rank_h_flag=rule_rank_h_flag)
+            return
+        # 查看当前小时更新的数据是否已准备好
+        h_data_count = h_data_check(project=project, table=table, now_date=now_date)
+        if h_data_count > 0:
+            log_.info(f'region_h_data_count = {h_data_count}')
+            # 数据准备好,进行更新
+            rank_by_h(now_date=now_date, now_h=now_h, rule_params=rule_params,
+                      project=project, table=table, region_code_list=region_code_list, rule_rank_h_flag=rule_rank_h_flag)
+            log_.info(f"region_h_data end!")
+        elif now_min > 50:
+            log_.info('h_recall data is None, use bottom data!')
+            h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params, region_code_list=region_code_list,
+                          rule_rank_h_flag=rule_rank_h_flag)
+            log_.info(f"region_h_data end!")
+        else:
+            # 数据没准备好,1分钟后重新检查
+            Timer(60, h_timer_check).start()
+        send_msg_to_feishu(
+            webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
+            key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
+            msg_text=f"rov-offline{config_.ENV_TEXT} - 推荐视频数据更新完成\n "
+                     f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}\n"
+                     f"now_h: {now_h}"
+        )
+    except Exception as e:
+        log_.error(f"地域分组小时级数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
+        send_msg_to_feishu(
+            webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
+            key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
+            msg_text=f"rov-offline{config_.ENV_TEXT} - 地域分组小时级数据更新失败\n "
+                     f"exception: {e}\n "
+                     f"traceback: {traceback.format_exc()}"
+        )
 
 
 if __name__ == '__main__':

+ 36 - 24
region_rule_rank_h_by24h.py

@@ -7,6 +7,7 @@
 import time
 import multiprocessing
 import os
+import traceback
 import gevent
 import datetime
 import pandas as pd
@@ -15,7 +16,7 @@ from functools import reduce
 from odps import ODPS
 from threading import Timer, Thread
 from utils import RedisHelper, get_data_from_odps, filter_video_status, check_table_partition_exits, \
-    filter_video_status_app
+    filter_video_status_app, send_msg_to_feishu
 from config import set_config
 from log import Log
 
@@ -473,29 +474,40 @@ def h_rank_bottom(now_date, now_h, rule_params, region_code_list):
 
 
 def h_timer_check():
-    rule_params = config_.RULE_PARAMS_REGION_24H_APP_TYPE
-    project = config_.PROJECT_REGION_24H_APP_TYPE
-    table = config_.TABLE_REGION_24H_APP_TYPE
-    region_code_list = [code for region, code in region_code.items() if code != '-1']
-    now_date = datetime.datetime.today()
-    now_h = datetime.datetime.now().hour
-    now_min = datetime.datetime.now().minute
-    log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
-    # 查看当天更新的数据是否已准备好
-    h_data_count = data_check(project=project, table=table, now_date=now_date)
-    if h_data_count > 0:
-        log_.info(f'region_24h_data_count = {h_data_count}')
-        # 数据准备好,进行更新
-        rank_by_24h(now_date=now_date, now_h=now_h, rule_params=rule_params,
-                    project=project, table=table, region_code_list=region_code_list)
-        log_.info(f"region_24h_data end!")
-    elif now_min > 50:
-        log_.info('24h_recall data is None, use bottom data!')
-        h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params, region_code_list=region_code_list)
-        log_.info(f"region_24h_data end!")
-    else:
-        # 数据没准备好,1分钟后重新检查
-        Timer(60, h_timer_check).start()
+    try:
+        rule_params = config_.RULE_PARAMS_REGION_24H_APP_TYPE
+        project = config_.PROJECT_REGION_24H_APP_TYPE
+        table = config_.TABLE_REGION_24H_APP_TYPE
+        region_code_list = [code for region, code in region_code.items() if code != '-1']
+        now_date = datetime.datetime.today()
+        now_h = datetime.datetime.now().hour
+        now_min = datetime.datetime.now().minute
+        log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
+        # 查看当天更新的数据是否已准备好
+        h_data_count = data_check(project=project, table=table, now_date=now_date)
+        if h_data_count > 0:
+            log_.info(f'region_24h_data_count = {h_data_count}')
+            # 数据准备好,进行更新
+            rank_by_24h(now_date=now_date, now_h=now_h, rule_params=rule_params,
+                        project=project, table=table, region_code_list=region_code_list)
+            log_.info(f"region_24h_data end!")
+        elif now_min > 50:
+            log_.info('24h_recall data is None, use bottom data!')
+            h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params, region_code_list=region_code_list)
+            log_.info(f"region_24h_data end!")
+        else:
+            # 数据没准备好,1分钟后重新检查
+            Timer(60, h_timer_check).start()
+
+    except Exception as e:
+        log_.error(f"地域分组24h数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
+        send_msg_to_feishu(
+            webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
+            key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
+            msg_text=f"rov-offline{config_.ENV_TEXT} - 地域分组24h数据更新失败\n "
+                     f"exception: {e}\n "
+                     f"traceback: {traceback.format_exc()}"
+        )
 
 
 if __name__ == '__main__':

+ 36 - 20
rule_rank_h_by_24h.py

@@ -1,12 +1,14 @@
 import pandas as pd
 import math
+import traceback
 from functools import reduce
 from odps import ODPS
 from threading import Timer
 from datetime import datetime, timedelta
 from get_data import get_data_from_odps
 from db_helper import RedisHelper
-from utils import filter_video_status, check_table_partition_exits, filter_video_status_app, request_post
+from utils import filter_video_status, check_table_partition_exits, filter_video_status_app, \
+    request_post, send_msg_to_feishu
 from config import set_config
 from log import Log
 
@@ -430,26 +432,40 @@ def h_rank_bottom(now_date, now_h, rule_params):
 
 
 def h_timer_check():
-    project = config_.PROJECT_24H_APP_TYPE
-    table = config_.TABLE_24H_APP_TYPE
-    rule_params = config_.RULE_PARAMS_24H_APP_TYPE
-    now_date = datetime.today()
-    log_.info(f"now_date: {datetime.strftime(now_date, '%Y%m%d%H')}")
-    now_min = datetime.now().minute
-    now_h = datetime.now().hour
-    # 查看当前天级更新的数据是否已准备好
-    h_data_count = h_data_check(project=project, table=table, now_date=now_date, now_h=now_h)
-    if h_data_count > 0:
-        log_.info(f'h_by24h_data_count = {h_data_count}')
-        # 数据准备好,进行更新
-        rank_by_h(now_date=now_date, now_h=now_h, rule_params=rule_params, project=project, table=table)
-    elif now_min > 45:
-        log_.info('h_by24h_recall data is None!')
-        h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params)
-    else:
-        # 数据没准备好,1分钟后重新检查
-        Timer(60, h_timer_check).start()
+    try:
+        project = config_.PROJECT_24H_APP_TYPE
+        table = config_.TABLE_24H_APP_TYPE
+        rule_params = config_.RULE_PARAMS_24H_APP_TYPE
+        now_date = datetime.today()
+        log_.info(f"now_date: {datetime.strftime(now_date, '%Y%m%d%H')}")
+        now_min = datetime.now().minute
+        now_h = datetime.now().hour
+        # 查看当前天级更新的数据是否已准备好
+        h_data_count = h_data_check(project=project, table=table, now_date=now_date, now_h=now_h)
+        if h_data_count > 0:
+            log_.info(f'h_by24h_data_count = {h_data_count}')
+            # 数据准备好,进行更新
+            rank_by_h(now_date=now_date, now_h=now_h, rule_params=rule_params, project=project, table=table)
+            log_.info(f"24h_data end!")
+        elif now_min > 45:
+            log_.info('h_by24h_recall data is None, use bottom data!')
+            h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params)
+            log_.info(f"24h_data end!")
+        else:
+            # 数据没准备好,1分钟后重新检查
+            Timer(60, h_timer_check).start()
+
+    except Exception as e:
+        log_.error(f"不区分地域24h数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
+        send_msg_to_feishu(
+            webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
+            key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
+            msg_text=f"rov-offline{config_.ENV_TEXT} - 不区分地域24h数据更新失败\n "
+                     f"exception: {e}\n "
+                     f"traceback: {traceback.format_exc()}"
+        )
 
 
 if __name__ == '__main__':
+    log_.info(f"24h_data start...")
     h_timer_check()

+ 152 - 128
utils.py

@@ -170,48 +170,56 @@ def filter_video_status(video_ids):
     :param video_ids: 视频id列表 type-list
     :return: filtered_videos
     """
-    mysql_helper = MysqlHelper(mysql_info=config_.FILTER_MYSQL_INFO)
-    video_status_sql = "SELECT t1.id AS 'video_id', " \
-                       "t1.transcode_status AS 'transcoding_status', " \
-                       "t2.audit_status AS 'audit_status', " \
-                       "t2.video_status AS 'open_status', " \
-                       "t2.recommend_status AS 'applet_rec_status', " \
-                       "t2.app_recommend_status AS 'app_rec_status', " \
-                       "t3.charge AS 'payment_status', " \
-                       "case when t4.max_validate_count is null then 0 else t4.max_validate_count end AS 'encryption_status' " \
-                       "FROM longvideo.wx_video t1 " \
-                       "LEFT JOIN longvideo.wx_video_status t2 ON t1.id= t2.video_id " \
-                       "LEFT JOIN longvideo.wx_video_detail t3 ON t1.id= t3.video_id " \
-                       "LEFT JOIN longvideo.wx_video_pwd t4 ON t1.id= t4.video_id"
-    if len(video_ids) == 1:
-        sql = "SELECT video_id " \
-              "FROM ({}) " \
-              "WHERE audit_status = 5 " \
-              "AND applet_rec_status IN (1, -6) " \
-              "AND open_status = 1 " \
-              "AND payment_status = 0 " \
-              "AND encryption_status != 5 " \
-              "AND transcoding_status = 3 " \
-              "AND video_id IN ({});".format(video_status_sql, video_ids[0])
-        data = mysql_helper.get_data(sql=sql)
-
-    else:
-        data = []
-        for i in range(len(video_ids) // 2000 + 1):
-            sql = "SELECT video_id " \
-                  "FROM ({}) " \
-                  "WHERE audit_status = 5 " \
-                  "AND applet_rec_status IN (1, -6) " \
-                  "AND open_status = 1 " \
-                  "AND payment_status = 0 " \
-                  "AND encryption_status != 5 " \
-                  "AND transcoding_status = 3 " \
-                  "AND video_id IN {};".format(video_status_sql, tuple(video_ids[i*2000:(i+1)*2000]))
-            select_res = mysql_helper.get_data(sql=sql)
-            if select_res is not None:
-                data += select_res
-    filtered_videos = [int(temp[0]) for temp in data]
-    return filtered_videos
+    i = 0
+    while i < 3:
+        try:
+            mysql_helper = MysqlHelper(mysql_info=config_.FILTER_MYSQL_INFO)
+            video_status_sql = "SELECT t1.id AS 'video_id', " \
+                               "t1.transcode_status AS 'transcoding_status', " \
+                               "t2.audit_status AS 'audit_status', " \
+                               "t2.video_status AS 'open_status', " \
+                               "t2.recommend_status AS 'applet_rec_status', " \
+                               "t2.app_recommend_status AS 'app_rec_status', " \
+                               "t3.charge AS 'payment_status', " \
+                               "case when t4.max_validate_count is null then 0 else t4.max_validate_count end AS 'encryption_status' " \
+                               "FROM longvideo.wx_video t1 " \
+                               "LEFT JOIN longvideo.wx_video_status t2 ON t1.id= t2.video_id " \
+                               "LEFT JOIN longvideo.wx_video_detail t3 ON t1.id= t3.video_id " \
+                               "LEFT JOIN longvideo.wx_video_pwd t4 ON t1.id= t4.video_id"
+            if len(video_ids) == 1:
+                sql = "SELECT video_id " \
+                      "FROM ({}) " \
+                      "WHERE audit_status = 5 " \
+                      "AND applet_rec_status IN (1, -6) " \
+                      "AND open_status = 1 " \
+                      "AND payment_status = 0 " \
+                      "AND encryption_status != 5 " \
+                      "AND transcoding_status = 3 " \
+                      "AND video_id IN ({});".format(video_status_sql, video_ids[0])
+                data = mysql_helper.get_data(sql=sql)
+
+            else:
+                data = []
+                for i in range(len(video_ids) // 2000 + 1):
+                    sql = "SELECT video_id " \
+                          "FROM ({}) " \
+                          "WHERE audit_status = 5 " \
+                          "AND applet_rec_status IN (1, -6) " \
+                          "AND open_status = 1 " \
+                          "AND payment_status = 0 " \
+                          "AND encryption_status != 5 " \
+                          "AND transcoding_status = 3 " \
+                          "AND video_id IN {};".format(video_status_sql, tuple(video_ids[i*2000:(i+1)*2000]))
+                    select_res = mysql_helper.get_data(sql=sql)
+                    if select_res is not None:
+                        data += select_res
+            filtered_videos = [int(temp[0]) for temp in data]
+            return filtered_videos
+
+        except Exception as e:
+            i += 1
+            if i == 3:
+                return video_ids
 
 
 def filter_video_status_with_applet_rec(video_ids, applet_rec_status):
@@ -221,48 +229,56 @@ def filter_video_status_with_applet_rec(video_ids, applet_rec_status):
     :param applet_rec_status: 小程序推荐状态 -6:待推荐 1:普通推荐
     :return: filtered_videos
     """
-    mysql_helper = MysqlHelper(mysql_info=config_.FILTER_MYSQL_INFO)
-    video_status_sql = "SELECT t1.id AS 'video_id', " \
-                       "t1.transcode_status AS 'transcoding_status', " \
-                       "t2.audit_status AS 'audit_status', " \
-                       "t2.video_status AS 'open_status', " \
-                       "t2.recommend_status AS 'applet_rec_status', " \
-                       "t2.app_recommend_status AS 'app_rec_status', " \
-                       "t3.charge AS 'payment_status', " \
-                       "case when t4.max_validate_count is null then 0 else t4.max_validate_count end AS 'encryption_status' " \
-                       "FROM longvideo.wx_video t1 " \
-                       "LEFT JOIN longvideo.wx_video_status t2 ON t1.id= t2.video_id " \
-                       "LEFT JOIN longvideo.wx_video_detail t3 ON t1.id= t3.video_id " \
-                       "LEFT JOIN longvideo.wx_video_pwd t4 ON t1.id= t4.video_id"
-    if len(video_ids) == 1:
-        sql = "SELECT video_id " \
-              "FROM ({}) " \
-              "WHERE audit_status = 5 " \
-              "AND applet_rec_status = {} " \
-              "AND open_status = 1 " \
-              "AND payment_status = 0 " \
-              "AND encryption_status != 5 " \
-              "AND transcoding_status = 3 " \
-              "AND video_id IN ({});".format(video_status_sql, applet_rec_status, video_ids[0])
-        data = mysql_helper.get_data(sql=sql)
-
-    else:
-        data = []
-        for i in range(len(video_ids) // 2000 + 1):
-            sql = "SELECT video_id " \
-                  "FROM ({}) " \
-                  "WHERE audit_status = 5 " \
-                  "AND applet_rec_status = {} " \
-                  "AND open_status = 1 " \
-                  "AND payment_status = 0 " \
-                  "AND encryption_status != 5 " \
-                  "AND transcoding_status = 3 " \
-                  "AND video_id IN {};".format(video_status_sql, applet_rec_status, tuple(video_ids[i*2000:(i+1)*2000]))
-            select_res = mysql_helper.get_data(sql=sql)
-            if select_res is not None:
-                data += select_res
-    filtered_videos = [int(temp[0]) for temp in data]
-    return filtered_videos
+    i = 0
+    while i < 3:
+        try:
+            mysql_helper = MysqlHelper(mysql_info=config_.FILTER_MYSQL_INFO)
+            video_status_sql = "SELECT t1.id AS 'video_id', " \
+                               "t1.transcode_status AS 'transcoding_status', " \
+                               "t2.audit_status AS 'audit_status', " \
+                               "t2.video_status AS 'open_status', " \
+                               "t2.recommend_status AS 'applet_rec_status', " \
+                               "t2.app_recommend_status AS 'app_rec_status', " \
+                               "t3.charge AS 'payment_status', " \
+                               "case when t4.max_validate_count is null then 0 else t4.max_validate_count end AS 'encryption_status' " \
+                               "FROM longvideo.wx_video t1 " \
+                               "LEFT JOIN longvideo.wx_video_status t2 ON t1.id= t2.video_id " \
+                               "LEFT JOIN longvideo.wx_video_detail t3 ON t1.id= t3.video_id " \
+                               "LEFT JOIN longvideo.wx_video_pwd t4 ON t1.id= t4.video_id"
+            if len(video_ids) == 1:
+                sql = "SELECT video_id " \
+                      "FROM ({}) " \
+                      "WHERE audit_status = 5 " \
+                      "AND applet_rec_status = {} " \
+                      "AND open_status = 1 " \
+                      "AND payment_status = 0 " \
+                      "AND encryption_status != 5 " \
+                      "AND transcoding_status = 3 " \
+                      "AND video_id IN ({});".format(video_status_sql, applet_rec_status, video_ids[0])
+                data = mysql_helper.get_data(sql=sql)
+
+            else:
+                data = []
+                for i in range(len(video_ids) // 2000 + 1):
+                    sql = "SELECT video_id " \
+                          "FROM ({}) " \
+                          "WHERE audit_status = 5 " \
+                          "AND applet_rec_status = {} " \
+                          "AND open_status = 1 " \
+                          "AND payment_status = 0 " \
+                          "AND encryption_status != 5 " \
+                          "AND transcoding_status = 3 " \
+                          "AND video_id IN {};".format(video_status_sql, applet_rec_status, tuple(video_ids[i*2000:(i+1)*2000]))
+                    select_res = mysql_helper.get_data(sql=sql)
+                    if select_res is not None:
+                        data += select_res
+            filtered_videos = [int(temp[0]) for temp in data]
+            return filtered_videos
+
+        except Exception as e:
+            i += 1
+            if i == 3:
+                return video_ids
 
 
 def filter_video_status_app(video_ids):
@@ -271,50 +287,58 @@ def filter_video_status_app(video_ids):
     :param video_ids: 视频id列表 type-list
     :return: filtered_videos
     """
-    mysql_helper = MysqlHelper(mysql_info=config_.FILTER_MYSQL_INFO)
-    video_status_sql = "SELECT t1.id AS 'video_id', " \
-                       "t1.transcode_status AS 'transcoding_status', " \
-                       "t2.app_audit_status AS 'app_audit_status', " \
-                       "t2.original_status AS 'open_status', " \
-                       "t2.recommend_status AS 'applet_rec_status', " \
-                       "t2.app_recommend_status AS 'app_rec_status', " \
-                       "t3.charge AS 'payment_status', " \
-                       "case when t4.max_validate_count is null then 0 else t4.max_validate_count end AS 'encryption_status' " \
-                       "FROM longvideo.wx_video t1 " \
-                       "LEFT JOIN longvideo.wx_video_status t2 ON t1.id= t2.video_id " \
-                       "LEFT JOIN longvideo.wx_video_detail t3 ON t1.id= t3.video_id " \
-                       "LEFT JOIN longvideo.wx_video_pwd t4 ON t1.id= t4.video_id"
-
-    if len(video_ids) == 1:
-        sql = "SELECT video_id " \
-              "FROM ({}) " \
-              "WHERE app_audit_status = 5 " \
-              "AND app_rec_status IN (1, -6, 10) " \
-              "AND open_status = 1 " \
-              "AND payment_status = 0 " \
-              "AND encryption_status != 5 " \
-              "AND transcoding_status = 3 " \
-              "AND video_id IN ({});".format(video_status_sql, video_ids[0])
-        data = mysql_helper.get_data(sql=sql)
-
-    else:
-        data = []
-        for i in range(len(video_ids) // 2000 + 1):
-            sql = "SELECT video_id " \
-                  "FROM ({}) " \
-                  "WHERE app_audit_status = 5 " \
-                  "AND app_rec_status IN (1, -6, 10) " \
-                  "AND open_status = 1 " \
-                  "AND payment_status = 0 " \
-                  "AND encryption_status != 5 " \
-                  "AND transcoding_status = 3 " \
-                  "AND video_id IN {};".format(video_status_sql, tuple(video_ids[i*2000:(i+1)*2000]))
-            select_res = mysql_helper.get_data(sql=sql)
-            if select_res is not None:
-                data += select_res
-
-    filtered_videos = [int(temp[0]) for temp in data]
-    return filtered_videos
+    i = 0
+    while i < 3:
+        try:
+            mysql_helper = MysqlHelper(mysql_info=config_.FILTER_MYSQL_INFO)
+            video_status_sql = "SELECT t1.id AS 'video_id', " \
+                               "t1.transcode_status AS 'transcoding_status', " \
+                               "t2.app_audit_status AS 'app_audit_status', " \
+                               "t2.original_status AS 'open_status', " \
+                               "t2.recommend_status AS 'applet_rec_status', " \
+                               "t2.app_recommend_status AS 'app_rec_status', " \
+                               "t3.charge AS 'payment_status', " \
+                               "case when t4.max_validate_count is null then 0 else t4.max_validate_count end AS 'encryption_status' " \
+                               "FROM longvideo.wx_video t1 " \
+                               "LEFT JOIN longvideo.wx_video_status t2 ON t1.id= t2.video_id " \
+                               "LEFT JOIN longvideo.wx_video_detail t3 ON t1.id= t3.video_id " \
+                               "LEFT JOIN longvideo.wx_video_pwd t4 ON t1.id= t4.video_id"
+
+            if len(video_ids) == 1:
+                sql = "SELECT video_id " \
+                      "FROM ({}) " \
+                      "WHERE app_audit_status = 5 " \
+                      "AND app_rec_status IN (1, -6, 10) " \
+                      "AND open_status = 1 " \
+                      "AND payment_status = 0 " \
+                      "AND encryption_status != 5 " \
+                      "AND transcoding_status = 3 " \
+                      "AND video_id IN ({});".format(video_status_sql, video_ids[0])
+                data = mysql_helper.get_data(sql=sql)
+
+            else:
+                data = []
+                for i in range(len(video_ids) // 2000 + 1):
+                    sql = "SELECT video_id " \
+                          "FROM ({}) " \
+                          "WHERE app_audit_status = 5 " \
+                          "AND app_rec_status IN (1, -6, 10) " \
+                          "AND open_status = 1 " \
+                          "AND payment_status = 0 " \
+                          "AND encryption_status != 5 " \
+                          "AND transcoding_status = 3 " \
+                          "AND video_id IN {};".format(video_status_sql, tuple(video_ids[i*2000:(i+1)*2000]))
+                    select_res = mysql_helper.get_data(sql=sql)
+                    if select_res is not None:
+                        data += select_res
+
+            filtered_videos = [int(temp[0]) for temp in data]
+            return filtered_videos
+
+        except Exception as e:
+            i += 1
+            if i == 3:
+                return video_ids
 
 
 def filter_shield_video(video_ids, shield_key_name_list):