Browse Source

票圈视频接入每日抓取流程

luojunhui 6 months ago
parent
commit
92b09cea8e

+ 2 - 1
applications/api/__init__.py

@@ -6,4 +6,5 @@ from .deep_seek_api_by_volcanoengine import fetch_deepseek_response
 from .moon_shot_api import fetch_moon_shot_response
 from .moon_shot_api import fetch_moon_shot_response
 from .nlp_api import similarity_between_title_list
 from .nlp_api import similarity_between_title_list
 from .gewe_api import WechatChannelAPI
 from .gewe_api import WechatChannelAPI
-from .google_ai_api import GoogleAIAPI
+from .google_ai_api import GoogleAIAPI
+from .piaoquan_api import fetch_piaoquan_video_list_detail

+ 32 - 0
applications/api/piaoquan_api.py

@@ -0,0 +1,32 @@
+import json
+import requests
+from tenacity import retry
+from requests.exceptions import RequestException
+from typing import Optional, Dict, List
+
+from applications.utils import request_retry
+
+retry_desc = request_retry(retry_times=3, min_retry_delay=2, max_retry_delay=30)
+
+
+@retry(**retry_desc)
+def fetch_piaoquan_video_list_detail(video_id_list: List[int]) -> Optional[Dict]:
+    """
+    获取票圈视频详情信息
+    :param: video_list: 视频id 列表
+    :return: Detail
+    """
+    url = "https://longvideoapi.piaoquantv.com/longvideoapi/openapi/video/batchSelectVideoInfo"
+    data = {"videoIdList": video_id_list}
+    header = {
+        "Content-Type": "application/json",
+    }
+    try:
+        response = requests.post(url, headers=header, json=data, timeout=60)
+        response.raise_for_status()
+        return response.json()
+    except RequestException as e:
+        print(f"API请求失败: {e}")
+    except json.JSONDecodeError as e:
+        print(f"响应解析失败: {e}")
+    return None

+ 0 - 2
applications/const/__init__.py

@@ -422,8 +422,6 @@ class SohuVideoCrawlerConst:
     PAGE_LIST = [i for i in range(1, 8)]
     PAGE_LIST = [i for i in range(1, 8)]
 
 
 
 
-
-
 class SingleVideoPoolPublishTaskConst:
 class SingleVideoPoolPublishTaskConst:
     """
     """
     const for single video pool publish task
     const for single video pool publish task

+ 31 - 0
applications/const/crawler_video_const.py

@@ -0,0 +1,31 @@
+class CrawlerVideoConst:
+    """
+    视频抓取常量
+    """
+
+    INIT_STATUS = 0
+    PROCESSING_STATUS = 1
+    SUCCESS_STATUS = 2
+    FAIL_STATUS = 99
+
+    # title length min
+    MIN_TITLE_LENGTH = 10
+
+    # max video length(second)
+    MAX_VIDEO_LENGTH = 600
+
+    # sleep second
+    SLEEP_SECOND = 3
+
+    #
+    NO_SOURCE_ACCOUNT = 0
+
+
+class CrawlerPiaoQuanVideosConst(CrawlerVideoConst):
+    """
+    票圈视频抓取任务常量
+    """
+    PIAOQUAN_TOP_VIDEO_TABLE = 'piaoquan_source_video_pool'
+
+    PLATFORM = "piaoquan"
+

+ 4 - 0
applications/pipeline/crawler_pipeline.py

@@ -73,6 +73,10 @@ def scrape_video_entities_process(video_item, db_client) -> dict:
             video_path = ""
             video_path = ""
         case "sohu":
         case "sohu":
             video_path = download_sohu_video(article_url)
             video_path = download_sohu_video(article_url)
+        case "piaoquan":
+            oss_path = ""
+            video_item["video_oss_path"] = oss_path
+            return video_item
         case _:
         case _:
             return empty_dict
             return empty_dict
 
 

+ 135 - 0
tasks/crawler_tasks/crawler_video/crawler_piaoquan_videos.py

@@ -0,0 +1,135 @@
+from __future__ import annotations
+
+import json
+import time
+import traceback
+
+from pymysql.cursors import DictCursor
+from tqdm import tqdm
+
+from applications import log
+from applications.api import fetch_piaoquan_video_list_detail
+from applications.const.crawler_video_const import CrawlerPiaoQuanVideosConst
+from applications.db import DatabaseConnector
+from applications.pipeline import scrape_video_entities_process
+from applications.utils import Item
+from applications.utils import str_to_md5
+from applications.utils import insert_into_single_video_source_table
+
+from config import long_articles_config
+
+const = CrawlerPiaoQuanVideosConst()
+
+category_map = {
+    "知识科普": "知识科普",
+    "生活技巧科普": "知识科普",
+    "老年相关法律科普": "知识科普",
+    "中国战争史": "军事历史",
+    "中国历史影像": "军事历史",
+    "正能量剧情": "家长里短",
+    "人财诈骗": "社会法治",
+    "贪污腐败": "社会法治",
+    "罕见画面": "奇闻趣事",
+    "惊奇事件": "奇闻趣事",
+    "动物萌宠": "奇闻趣事",
+    "老明星": "名人八卦",
+    "健康知识": "健康养生",
+    "饮食健康": "健康养生",
+    "人生忠告": "情感故事",
+    "老年生活": "情感故事",
+    "国际军事": "政治新闻",
+    "他国政策": "政治新闻",
+    "国际时政": "政治新闻",
+    "历史名人": "历史人物",
+}
+
+
+class CrawlerPiaoQuanVideos:
+    def __init__(self):
+        self.db_client = DatabaseConnector(long_articles_config)
+        self.db_client.connect()
+
+    def get_piaoquan_top_video_list(self) -> list[dict]:
+        fetch_query = f"""
+            select id, video_id, title
+            from {const.PIAOQUAN_TOP_VIDEO_TABLE}
+            where status = {const.INIT_STATUS};
+        """
+        task_list = self.db_client.fetch(fetch_query, cursor_type=DictCursor)
+        return task_list
+
+    def update_piaoquan_top_video_status(
+        self, pool_id: int, ori_status: int, new_status: int
+    ) -> int:
+        update_query = f"""
+            update {const.PIAOQUAN_TOP_VIDEO_TABLE}
+            set status = %s
+            where id = %s and status = %s;
+        """
+        return self.db_client.save(update_query, (pool_id, ori_status, new_status))
+
+    def crawler_each_video(self, video_data: dict) -> None:
+        """
+        crawler each video data
+        """
+        # lock video id
+        lock_acquired = self.update_piaoquan_top_video_status(
+            pool_id=video_data["id"],
+            ori_status=const.INIT_STATUS,
+            new_status=const.PROCESSING_STATUS,
+        )
+        if not lock_acquired:
+            return
+
+        # get video detail from piaoquan
+        response_from_piaoquan = fetch_piaoquan_video_list_detail(
+            [video_data["video_id"]]
+        )
+        video_detail = response_from_piaoquan["data"][0]
+        video_item = Item()
+        unique_id = f"{const.PLATFORM}-{video_data['video_id']}"
+
+        # add info into item
+        video_item.add("content_trace_id", "video{}".format(str_to_md5(unique_id)))
+        video_item.add("url_unique_md5", video_data["video_id"])
+        video_item.add("article_title", video_data["title"])
+        video_item.add("out_account_id", video_detail["uid"])
+        video_item.add("out_account_name", video_data["user"]["nickName"])
+        video_item.add(
+            "publish_timestamp", int(video_detail["gmtCreateTimestamp"] / 1000)
+        )
+        video_item.add("platform", const.PLATFORM)
+        video_item.add(
+            "article_url",
+            f"https://admin.piaoquantv.com/cms/post-detail/{video_data['video_id']}/detail",
+        )
+        video_item.add("source_account", const.NO_SOURCE_ACCOUNT)
+        video_item.add("crawler_timestamp", int(time.time()))
+        video_item.add("oss_path", video_detail["ossVideoPath"])
+        video_item.add("audit_status", video_detail["auditStatus"])
+        video_item.add("category", category_map.get(video_data["category"]))
+
+        # check item before insert
+        video_item.check(source="video")
+        try:
+            item_with_oss_path = scrape_video_entities_process(
+                video_item=video_item.item, db_client=self.db_client
+            )
+            if item_with_oss_path:
+                insert_into_single_video_source_table(
+                    db_client=self.db_client, video_item=item_with_oss_path
+                )
+
+        except Exception as e:
+            detail = {
+                "video_item": video_item.item,
+                "error": str(e),
+                "traceback": traceback.format_exc(),
+            }
+            log(
+                task="crawler_piaoquan_videos",
+                function="crawler_each_video",
+                message="crawler_piaoquan_videos failed",
+                status="failed",
+                data=detail,
+            )