luojunhui 10 ماه پیش
والد
کامیت
523a21d460
3فایلهای تغییر یافته به همراه113 افزوده شده و 5 حذف شده
  1. 9 0
      applications/const.py
  2. 47 5
      coldStartTasks/publish/publish_video_to_pq_for_audit.py
  3. 57 0
      run_video_publish_and_audit.py

+ 9 - 0
applications/const.py

@@ -124,6 +124,15 @@ class WeixinVideoCrawlerConst:
     # 默认账号
     DEFAULT_ACCOUNT_UID = 76862180
 
+    # 任务执行
+    PUBLISH_TASK = "PUBLISH_TASK"
+    PUBLISH_TASK_SLEEP_TEME = 60 * 24 * 60
+    CHECK_TASK = "CHECK_TASK"
+    CHECK_TASK_SLEEP_TEME = 60
+
+    # 每天发送的审核视频数量
+    MAX_VIDEO_NUM = 500
+
 
 
 

+ 47 - 5
coldStartTasks/publish/publish_video_to_pq_for_audit.py

@@ -2,11 +2,13 @@
 @author: luojunhui
 将抓取的视频发送至pq获取视频的审核结果
 """
+import traceback
 from typing import List, Dict
 
 from tqdm import tqdm
 from pymysql.cursors import DictCursor
 
+from applications import log
 from applications import PQAPI
 from applications import longArticlesMySQL
 from applications.const import WeixinVideoCrawlerConst
@@ -28,7 +30,12 @@ class PublishVideosForAudit(object):
         获取视频的信息
         :return:
         """
-        sql = f"""SELECT id, article_title, video_oss_path FROM publish_single_video_source WHERE audit_status = {const.VIDEO_AUDIT_INIT_STATUS};"""
+        sql = f"""
+            SELECT id, article_title, video_oss_path 
+            FROM publish_single_video_source 
+            WHERE audit_status = {const.VIDEO_AUDIT_INIT_STATUS}
+            LIMIT {const.MAX_VIDEO_NUM};
+            """
         response = self.db.select(sql, cursor_type=DictCursor)
         return response
 
@@ -129,8 +136,30 @@ class PublishVideosForAudit(object):
         """
         video_list = self.get_publish_video_list()
         for video_obj in tqdm(video_list, desc="视频发布"):
-            self.publish_each_video(video_obj)
-        print("视频发布完成")
+            try:
+                self.publish_each_video(video_obj)
+                log(
+                    task="publish_video_for_audit",
+                    message="成功发送至pq",
+                    function="publish_each_video",
+                    data={
+                        "video_obj": video_obj
+                    }
+                )
+
+            except Exception as e:
+                error_msg = traceback.format_exc()
+                log(
+                    task="publish_video_for_audit",
+                    message="发送至PQ失败",
+                    function="publish_each_video",
+                    status="fail",
+                    data={
+                        "error_msg": error_msg,
+                        "video_obj": video_obj,
+                        "error": str(e)
+                    }
+                )
 
     def check_job(self):
         """
@@ -140,6 +169,19 @@ class PublishVideosForAudit(object):
         video_list = self.get_check_article_list()
         for video_obj in tqdm(video_list, desc="视频检查"):
             video_id = video_obj.get("audit_video_id")
-            self.check_video_status(video_id)
-        print("视频检查完成")
+            try:
+                self.check_video_status(video_id)
+            except Exception as e:
+                error_msg = traceback.format_exc()
+                log(
+                    task="publish_video_for_audit",
+                    message="查询状态失败",
+                    function="check_each_video",
+                    status="fail",
+                    data={
+                        "error_msg": error_msg,
+                        "video_obj": video_obj,
+                        "error": str(e)
+                    }
+                )
 

+ 57 - 0
run_video_publish_and_audit.py

@@ -0,0 +1,57 @@
+"""
+@author: luojunhui
+"""
+import time
+import threading
+from concurrent.futures import ThreadPoolExecutor, as_completed
+
+from coldStartTasks.publish import PublishVideosForAudit
+from applications.const import WeixinVideoCrawlerConst
+
+const = WeixinVideoCrawlerConst()
+pub = PublishVideosForAudit()
+
+
+def run_thread(task_type: str, wait_time: int, stop_event: threading.Event):
+    """
+    运行线程
+    :param task_type:
+    :param wait_time:
+    :param stop_event:
+    :return:
+    """
+
+    while not stop_event.is_set():
+        if task_type == const.PUBLISH_TASK:
+            # 发布视频
+            pub.publish_job()
+            time.sleep(wait_time)
+        elif task_type == const.CHECK_TASK:
+            # 检查视频
+            pub.check_job()
+            time.sleep(wait_time)
+
+
+def main():
+    """
+    主函数
+    :return:
+    """
+    stop_event = threading.Event()
+
+    # 启动两个线程,分别执行两个函数
+    with ThreadPoolExecutor(max_workers=2) as executor:
+        futures = [
+            executor.submit(run_thread, const.PUBLISH_TASK, const.PUBLISH_TASK_SLEEP_TEME, stop_event),
+            executor.submit(run_thread, const.CHECK_TASK, const.CHECK_TASK_SLEEP_TEME, stop_event)
+        ]
+        try:
+            for future in as_completed(futures):
+                future.result()
+        except KeyboardInterrupt:
+            print("Stopping all threads...")
+            stop_event.set()
+
+
+if __name__ == '__main__':
+    main()