Browse Source

抓取部分执行任务

luojunhui 10 tháng trước cách đây
mục cha
commit
2b069242a6
1 tập tin đã thay đổi với 69 bổ sung27 xóa
  1. 69 27
      coldStartTasks/publish/publish_video_to_pq_for_audit.py

+ 69 - 27
coldStartTasks/publish/publish_video_to_pq_for_audit.py

@@ -76,7 +76,7 @@ class PublishVideosForAudit(object):
         response = self.db.select(select_sql, cursor_type=DictCursor)
         return response[0]['total_count']
 
-    def publish_each_video(self, video_obj: Dict) -> None:
+    def publish_each_video(self, video_obj: Dict) -> Dict:
         """
         发布视频到pq
         :param video_obj:
@@ -100,11 +100,27 @@ class PublishVideosForAudit(object):
                 params=(const.VIDEO_AUDIT_PROCESSING_STATUS, video_id, int(time.time()), video_obj['id'])
             )
             if affected_rows:
-                print("视频发布成功--{}".format(video_id))
+                result = {
+                    "status": "success",
+                    "video_id": video_id
+                }
+                return result
             else:
-                print("视频发布失败--{}".format(video_id))
+                result = {
+                    "status": "fail",
+                    "video_id": video_id,
+                    "error_msg": "抢占锁失败,update执行操作修改0行"
+                }
+                return result
         else:
-            print("视频发布失败--{}".format(video_obj.get("video_oss_path")))
+            result = {
+                "status": "fail",
+                "error_msg": "发布到pq失败",
+                "title": video_obj.get("article_title"),
+                "oss_path": video_obj.get("video_oss_path"),
+                "response": response_json
+            }
+            return result
 
     def get_check_article_list(self) -> List[Dict]:
         """
@@ -115,7 +131,7 @@ class PublishVideosForAudit(object):
         response = self.db.select(sql, cursor_type=DictCursor)
         return response
 
-    def check_video_status(self, video_id: int) -> bool:
+    def check_video_status(self, video_id: int) -> Dict:
         """
         检查视频的状态,若视频审核通过or不通过,修改记录状态
         :param video_id:
@@ -123,28 +139,31 @@ class PublishVideosForAudit(object):
         """
         response = pq_functions.getPQVideoListDetail([video_id])
         audit_status = response.get("data")[0].get("auditStatus")
+        # 请求成功了
         if audit_status == const.PQ_AUDIT_SUCCESS_STATUS:
             affected_rows = self.update_audit_status(
                 video_id=video_id,
                 ori_audit_status=const.VIDEO_AUDIT_PROCESSING_STATUS,
                 new_audit_status=const.VIDEO_AUDIT_SUCCESS_STATUS
             )
-            if affected_rows:
-                return True
-            else:
-                return False
         elif audit_status in {const.PQ_AUDIT_SELF_VISIBLE_STATUS, const.PQ_AUDIT_FAIL_STATUS}:
             affected_rows = self.update_audit_status(
                 video_id=video_id,
                 ori_audit_status=const.VIDEO_AUDIT_PROCESSING_STATUS,
                 new_audit_status=const.VIDEO_AUDIT_FAIL_STATUS
             )
-            if affected_rows:
-                return True
-            else:
-                return False
+        elif audit_status == const.PQ_AUDIT_PROCESSING_STATUS:
+            # 视频正在审核中,不做处理
+            affected_rows = 0
         else:
-            return False
+            # 其他情况,暂时不做处理
+            affected_rows = 0
+        result = {
+            "affected_rows": affected_rows,
+            "video_id": video_id,
+            "audit_status": audit_status
+        }
+        return result
 
     def publish_job(self):
         """
@@ -154,21 +173,32 @@ class PublishVideosForAudit(object):
         video_list = self.get_publish_video_list()
         for video_obj in tqdm(video_list, desc="视频发布"):
             try:
-                self.publish_each_video(video_obj)
-                log(
-                    task="publish_video_for_audit",
-                    message="成功发送至pq",
-                    function="publish_each_video",
-                    data={
-                        "video_obj": video_obj
-                    }
-                )
-
+                response = self.publish_each_video(video_obj)
+                if response.get("status") == "success":
+                    log(
+                        task="publish_video_for_audit",
+                        message="发送至PQ成功",
+                        function="publish_each_video",
+                        data={
+                            "video_id": response.get("video_id")
+                        }
+                    )
+                else:
+                    log(
+                        task="publish_video_for_audit",
+                        message=response.get('error_msg'),
+                        function="publish_each_video",
+                        status="fail",
+                        data={
+                            "response": response,
+                            "video_obj": video_obj
+                        }
+                    )
             except Exception as e:
                 error_msg = traceback.format_exc()
                 log(
                     task="publish_video_for_audit",
-                    message="发送至PQ失败",
+                    message="发送至PQ代码执行失败",
                     function="publish_each_video",
                     status="fail",
                     data={
@@ -187,12 +217,24 @@ class PublishVideosForAudit(object):
         for video_obj in tqdm(video_list, desc="视频检查"):
             video_id = video_obj.get("audit_video_id")
             try:
-                self.check_video_status(video_id)
+                response = self.check_video_status(video_id)
+                if response.get("affected_rows"):
+                    continue
+                else:
+                    log(
+                        task="publish_video_for_audit",
+                        function="check_each_video",
+                        message="修改行数为0",
+                        data={
+                            "video_id": video_id,
+                            "audit_status": response['audit_status']
+                        }
+                    )
             except Exception as e:
                 error_msg = traceback.format_exc()
                 log(
                     task="publish_video_for_audit",
-                    message="查询状态失败",
+                    message="查询状态执行失败",
                     function="check_each_video",
                     status="fail",
                     data={