浏览代码

抓取流程增加 piaoquan 平台

luojunhui 2 月之前
父节点
当前提交
91a11b3c14

+ 1 - 1
applications/api/apollo_api.py

@@ -5,7 +5,7 @@ class ApolloApi:
     def __init__(self, app_id="LongArticlesJob", env="pre"):
         match env:
             case "pre":
-                config_server_url = 'https://preapolloconfig-internal.piaoquantv.com/'
+                config_server_url = 'http://preapolloconfig-internal.piaoquantv.com/'
             case "dev":
                 config_server_url = 'https://devapolloconfig-internal.piaoquantv.com/'
             case "prod":

+ 2 - 0
applications/const/crawler_video_const.py

@@ -29,3 +29,5 @@ class CrawlerPiaoQuanVideosConst(CrawlerVideoConst):
 
     PLATFORM = "piaoquan"
 
+    MAX_LOCK_SECOND = 3600
+

+ 3 - 1
applications/utils/item.py

@@ -25,7 +25,9 @@ default_single_video_table_fields = {
     "tags": None,
     "video_oss_path": None,
     "audit_status": 0,
-    "category_status": 0
+    "category_status": 0,
+    "audit_video_id": None,
+    "mini_program_title": None
 }
 
 

+ 7 - 3
applications/utils/save_to_db.py

@@ -12,9 +12,11 @@ def insert_into_single_video_source_table(db_client, video_item):
     """
     insert_sql = f"""
         INSERT INTO publish_single_video_source
-        (content_trace_id, article_title, out_account_id, out_account_name, read_cnt, like_cnt, article_url, cover_url, video_oss_path, publish_timestamp, crawler_timestamp, url_unique_md5, category, tags, platform, source_account, category_status, audit_status)
+        (content_trace_id, article_title, out_account_id, out_account_name, read_cnt, like_cnt, article_url, cover_url, 
+        video_oss_path, publish_timestamp, crawler_timestamp, url_unique_md5, category, tags, platform, source_account, 
+        category_status, audit_status, audit_video_id, mini_program_title)
         values
-        (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+        (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
     """
     try:
         db_client.save(
@@ -37,7 +39,9 @@ def insert_into_single_video_source_table(db_client, video_item):
                 video_item["platform"],
                 video_item["source_account"],
                 video_item["category_status"],
-                video_item["audit_status"]
+                video_item["audit_status"],
+                video_item["audit_video_id"],
+                video_item["mini_program_title"],
             ),
         )
     except Exception as e:

+ 62 - 5
tasks/crawler_tasks/crawler_video/crawler_piaoquan_videos.py

@@ -20,7 +20,7 @@ from applications.utils import insert_into_single_video_source_table
 from config import long_articles_config
 
 const = CrawlerPiaoQuanVideosConst()
-apollo_api = ApolloApi()
+apollo_api = ApolloApi(env="prod")
 pq_long_articles_category_mapping = json.loads(apollo_api.get_config_value("pq_long_articles_category_mapping"))
 
 
@@ -31,7 +31,7 @@ class CrawlerPiaoQuanVideos:
 
     def get_piaoquan_top_video_list(self) -> list[dict]:
         fetch_query = f"""
-            select id, video_id, title
+            select id, video_id, title, category
             from {const.PIAOQUAN_TOP_VIDEO_TABLE}
             where status = {const.INIT_STATUS};
         """
@@ -43,10 +43,23 @@ class CrawlerPiaoQuanVideos:
     ) -> int:
         update_query = f"""
             update {const.PIAOQUAN_TOP_VIDEO_TABLE}
-            set status = %s
+            set status = %s, status_update_timestamp = %s
             where id = %s and status = %s;
         """
-        return self.db_client.save(update_query, (pool_id, ori_status, new_status))
+
+        return self.db_client.save(update_query, params=(new_status, int(time.time()), pool_id, ori_status))
+
+    def rollback_lock_tasks(self) -> int:
+        # roll back lock task which has been locked for more than 1 hour
+        rollback_query = f"""
+            update {const.PIAOQUAN_TOP_VIDEO_TABLE}
+            set status = %s
+            where status = %s and status_update_timestamp < %s;
+        """
+        return self.db_client.save(
+            rollback_query,
+            (const.INIT_STATUS, const.PROCESSING_STATUS, int(time.time() - const.MAX_LOCK_SECOND))
+        )
 
     def crawler_each_video(self, video_data: dict) -> None:
         """
@@ -58,6 +71,7 @@ class CrawlerPiaoQuanVideos:
             ori_status=const.INIT_STATUS,
             new_status=const.PROCESSING_STATUS,
         )
+
         if not lock_acquired:
             return
 
@@ -74,7 +88,9 @@ class CrawlerPiaoQuanVideos:
         video_item.add("url_unique_md5", video_data["video_id"])
         video_item.add("article_title", video_data["title"])
         video_item.add("out_account_id", video_detail["uid"])
-        video_item.add("out_account_name", video_data["user"]["nickName"])
+        video_item.add("out_account_name", video_detail["user"]["nickName"])
+        video_item.add("mini_program_title", video_data["title"])
+        video_item.add("cover_url", video_detail["shareImgPath"])
         video_item.add(
             "publish_timestamp", int(video_detail["gmtCreateTimestamp"] / 1000)
         )
@@ -87,6 +103,7 @@ class CrawlerPiaoQuanVideos:
         video_item.add("crawler_timestamp", int(time.time()))
         video_item.add("video_oss_path", video_detail["ossVideoPath"])
         video_item.add("audit_status", video_detail["auditStatus"])
+        video_item.add("audit_video_id", video_data["video_id"])
         category = pq_long_articles_category_mapping.get(video_data["category"])
         if category:
             video_item.add("category", category)
@@ -102,6 +119,17 @@ class CrawlerPiaoQuanVideos:
                 insert_into_single_video_source_table(
                     db_client=self.db_client, video_item=item_with_oss_path
                 )
+                self.update_piaoquan_top_video_status(
+                    pool_id=video_data["id"],
+                    ori_status=const.PROCESSING_STATUS,
+                    new_status=const.SUCCESS_STATUS
+                )
+            else:
+                self.update_piaoquan_top_video_status(
+                    pool_id=video_data["id"],
+                    ori_status=const.PROCESSING_STATUS,
+                    new_status=const.FAIL_STATUS
+                )
 
         except Exception as e:
             detail = {
@@ -109,6 +137,11 @@ class CrawlerPiaoQuanVideos:
                 "error": str(e),
                 "traceback": traceback.format_exc(),
             }
+            self.update_piaoquan_top_video_status(
+                pool_id=video_data["id"],
+                ori_status=const.PROCESSING_STATUS,
+                new_status=const.FAIL_STATUS
+            )
             log(
                 task="crawler_piaoquan_videos",
                 function="crawler_each_video",
@@ -116,3 +149,27 @@ class CrawlerPiaoQuanVideos:
                 status="failed",
                 data=detail,
             )
+
+    def deal(self):
+        # roll back lock task
+        self.rollback_lock_tasks()
+
+        # get video_list
+        video_list = self.get_piaoquan_top_video_list()
+
+        for video_data in tqdm(video_list, desc="video_list"):
+            try:
+                self.crawler_each_video(video_data)
+
+            except Exception as e:
+                log(
+                    task="crawler_piaoquan_videos",
+                    function="crawler_each_video",
+                    message="crawler_piaoquan_videos failed",
+                    status="failed",
+                    data={
+                        "video_data": video_data,
+                        "error": str(e),
+                        "traceback": traceback.format_exc(),
+                    }
+                )