فهرست منبع

Merge branch '2025-04-27-add-piaoquan-platform' of luojunhui/LongArticlesJob into master

luojunhui 5 ماه پیش
والد
کامیت
81df755506

+ 3 - 1
applications/api/__init__.py

@@ -2,8 +2,10 @@
 @author: luojunhui
 """
 from .aigc_system_api import AigcSystemApi
+from .apollo_api import ApolloApi
 from .deep_seek_api_by_volcanoengine import fetch_deepseek_response
 from .moon_shot_api import fetch_moon_shot_response
 from .nlp_api import similarity_between_title_list
 from .gewe_api import WechatChannelAPI
-from .google_ai_api import GoogleAIAPI
+from .google_ai_api import GoogleAIAPI
+from .piaoquan_api import fetch_piaoquan_video_list_detail

+ 24 - 0
applications/api/apollo_api.py

@@ -0,0 +1,24 @@
+import pyapollos
+
+
+class ApolloApi:
+    def __init__(self, app_id="LongArticlesJob", env="pre"):
+        match env:
+            case "pre":
+                config_server_url = 'http://preapolloconfig-internal.piaoquantv.com/'
+            case "dev":
+                config_server_url = 'https://devapolloconfig-internal.piaoquantv.com/'
+            case "prod":
+                config_server_url = 'https://apolloconfig-internal.piaoquantv.com/'
+            case _:
+                raise ValueError("env must be 'pre' or 'dev' or 'prod'")
+
+        self.apollo_connection = pyapollos.ApolloClient(
+            app_id=app_id,
+            config_server_url=config_server_url,
+            timeout=10
+        )
+
+    def get_config_value(self, key):
+        return self.apollo_connection.get_value(key)
+

+ 32 - 0
applications/api/piaoquan_api.py

@@ -0,0 +1,32 @@
+import json
+import requests
+from tenacity import retry
+from requests.exceptions import RequestException
+from typing import Optional, Dict, List
+
+from applications.utils import request_retry
+
+retry_desc = request_retry(retry_times=3, min_retry_delay=2, max_retry_delay=30)
+
+
+@retry(**retry_desc)
+def fetch_piaoquan_video_list_detail(video_id_list: List[int]) -> Optional[Dict]:
+    """
+    获取票圈视频详情信息
+    :param: video_list: 视频id 列表
+    :return: Detail
+    """
+    url = "https://longvideoapi.piaoquantv.com/longvideoapi/openapi/video/batchSelectVideoInfo"
+    data = {"videoIdList": video_id_list}
+    header = {
+        "Content-Type": "application/json",
+    }
+    try:
+        response = requests.post(url, headers=header, json=data, timeout=60)
+        response.raise_for_status()
+        return response.json()
+    except RequestException as e:
+        print(f"API请求失败: {e}")
+    except json.JSONDecodeError as e:
+        print(f"响应解析失败: {e}")
+    return None

+ 0 - 2
applications/const/__init__.py

@@ -422,8 +422,6 @@ class SohuVideoCrawlerConst:
     PAGE_LIST = [i for i in range(1, 8)]
 
 
-
-
 class SingleVideoPoolPublishTaskConst:
     """
     const for single video pool publish task

+ 35 - 0
applications/const/crawler_video_const.py

@@ -0,0 +1,35 @@
+class CrawlerVideoConst:
+    """
+    视频抓取常量
+    """
+
+    INIT_STATUS = 0
+    PROCESSING_STATUS = 1
+    SUCCESS_STATUS = 2
+    FAIL_STATUS = 99
+
+    # title length min
+    MIN_TITLE_LENGTH = 10
+
+    # max video length(second)
+    MAX_VIDEO_LENGTH = 600
+
+    # sleep second
+    SLEEP_SECOND = 3
+
+    #
+    NO_SOURCE_ACCOUNT = 0
+
+
+class CrawlerPiaoQuanVideosConst(CrawlerVideoConst):
+    """
+    票圈视频抓取任务常量
+    """
+    PIAOQUAN_TOP_VIDEO_TABLE = 'piaoquan_source_video_pool'
+
+    PLATFORM = "piaoquan"
+
+    MAX_LOCK_SECOND = 3600
+
+    AUDITING_STATUS = -1
+

+ 2 - 0
applications/pipeline/crawler_pipeline.py

@@ -73,6 +73,8 @@ def scrape_video_entities_process(video_item, db_client) -> dict:
             video_path = ""
         case "sohu":
             video_path = download_sohu_video(article_url)
+        case "piaoquan":
+            return video_item
         case _:
             return empty_dict
 

+ 4 - 0
applications/utils/item.py

@@ -24,6 +24,10 @@ default_single_video_table_fields = {
     "bad_status": 0,
     "tags": None,
     "video_oss_path": None,
+    "audit_status": 0,
+    "category_status": 0,
+    "audit_video_id": None,
+    "mini_program_title": None
 }
 
 

+ 8 - 2
applications/utils/save_to_db.py

@@ -12,9 +12,11 @@ def insert_into_single_video_source_table(db_client, video_item):
     """
     insert_sql = f"""
         INSERT INTO publish_single_video_source
-        (content_trace_id, article_title, out_account_id, out_account_name, read_cnt, like_cnt, article_url, cover_url, video_oss_path, publish_timestamp, crawler_timestamp, url_unique_md5, category, tags, platform, source_account)
+        (content_trace_id, article_title, out_account_id, out_account_name, read_cnt, like_cnt, article_url, cover_url, 
+        video_oss_path, publish_timestamp, crawler_timestamp, url_unique_md5, category, tags, platform, source_account, 
+        category_status, audit_status, audit_video_id, mini_program_title)
         values
-        (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+        (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
     """
     try:
         db_client.save(
@@ -36,6 +38,10 @@ def insert_into_single_video_source_table(db_client, video_item):
                 video_item["tags"],
                 video_item["platform"],
                 video_item["source_account"],
+                video_item["category_status"],
+                video_item["audit_status"],
+                video_item["audit_video_id"],
+                video_item["mini_program_title"],
             ),
         )
     except Exception as e:

+ 26 - 0
schedule_app.py

@@ -0,0 +1,26 @@
+# from celery import Celery
+from tasks.crawler_tasks.crawler_video.crawler_piaoquan_videos import CrawlerPiaoQuanVideos
+from tasks.crawler_tasks.crawler_video.crawler_sohu_videos import CrawlerSohuHotVideos
+from tasks.crawler_tasks.crawler_video.crawler_sohu_videos import CrawlerSohuRecommendVideos
+
+
+# app = Celery('tasks', broker='redis://localhost:6379/0')
+
+# @app.task
+def run_piaoquan_video_crawler():
+    crawler = CrawlerPiaoQuanVideos()
+    crawler.deal()
+
+def run_sohu_video_crawler():
+    # step1, crawl sohu hot videos
+    crawler_sohu_hot_videos = CrawlerSohuHotVideos()
+    crawler_sohu_hot_videos.deal()
+
+    # step2, crawl sohu recommend videos
+    crawler_sohu_recommend_videos = CrawlerSohuRecommendVideos()
+    crawler_sohu_recommend_videos.deal()
+
+if __name__ == "__main__":
+    run_piaoquan_video_crawler()
+    run_sohu_video_crawler()
+

+ 177 - 0
tasks/crawler_tasks/crawler_video/crawler_piaoquan_videos.py

@@ -0,0 +1,177 @@
+from __future__ import annotations
+
+import json
+import time
+import traceback
+
+from pymysql.cursors import DictCursor
+from tqdm import tqdm
+
+from applications import log
+from applications.api import ApolloApi
+from applications.api import fetch_piaoquan_video_list_detail
+from applications.const.crawler_video_const import CrawlerPiaoQuanVideosConst
+from applications.db import DatabaseConnector
+from applications.pipeline import scrape_video_entities_process
+from applications.utils import Item
+from applications.utils import str_to_md5
+from applications.utils import insert_into_single_video_source_table
+
+from config import long_articles_config
+
+const = CrawlerPiaoQuanVideosConst()
+apollo_api = ApolloApi(env="prod")
+pq_long_articles_category_mapping = json.loads(apollo_api.get_config_value("pq_long_articles_category_mapping"))
+
+
+class CrawlerPiaoQuanVideos:
+    def __init__(self):
+        self.db_client = DatabaseConnector(long_articles_config)
+        self.db_client.connect()
+
+    def get_piaoquan_top_video_list(self) -> list[dict]:
+        fetch_query = f"""
+            select id, video_id, title, category
+            from {const.PIAOQUAN_TOP_VIDEO_TABLE}
+            where status = {const.INIT_STATUS};
+        """
+        task_list = self.db_client.fetch(fetch_query, cursor_type=DictCursor)
+        return task_list
+
+    def update_piaoquan_top_video_status(
+        self, pool_id: int, ori_status: int, new_status: int
+    ) -> int:
+        update_query = f"""
+            update {const.PIAOQUAN_TOP_VIDEO_TABLE}
+            set status = %s, status_update_timestamp = %s
+            where id = %s and status = %s;
+        """
+
+        return self.db_client.save(update_query, params=(new_status, int(time.time()), pool_id, ori_status))
+
+    def rollback_lock_tasks(self) -> int:
+        # roll back lock task which has been locked for more than 1 hour
+        rollback_query = f"""
+            update {const.PIAOQUAN_TOP_VIDEO_TABLE}
+            set status = %s
+            where status = %s and status_update_timestamp < %s;
+        """
+        return self.db_client.save(
+            rollback_query,
+            (const.INIT_STATUS, const.PROCESSING_STATUS, int(time.time() - const.MAX_LOCK_SECOND))
+        )
+
+    def crawler_each_video(self, video_data: dict) -> None:
+        """
+        crawler each video data
+        """
+        # lock video id
+        lock_acquired = self.update_piaoquan_top_video_status(
+            pool_id=video_data["id"],
+            ori_status=const.INIT_STATUS,
+            new_status=const.PROCESSING_STATUS,
+        )
+
+        if not lock_acquired:
+            return
+
+        # get video detail from piaoquan
+        response_from_piaoquan = fetch_piaoquan_video_list_detail(
+            [video_data["video_id"]]
+        )
+        video_detail = response_from_piaoquan["data"][0]
+        video_item = Item()
+        unique_id = f"{const.PLATFORM}-{video_data['video_id']}"
+
+        # add info into item
+        video_item.add("content_trace_id", "video{}".format(str_to_md5(unique_id)))
+        video_item.add("url_unique_md5", video_data["video_id"])
+        video_item.add("article_title", video_data["title"])
+        video_item.add("out_account_id", video_detail["uid"])
+        video_item.add("out_account_name", video_detail["user"]["nickName"])
+        video_item.add("mini_program_title", video_data["title"])
+        video_item.add("cover_url", video_detail["shareImgPath"])
+        video_item.add(
+            "publish_timestamp", int(video_detail["gmtCreateTimestamp"] / 1000)
+        )
+        video_item.add("platform", const.PLATFORM)
+        video_item.add(
+            "article_url",
+            f"https://admin.piaoquantv.com/cms/post-detail/{video_data['video_id']}/detail",
+        )
+        video_item.add("source_account", const.NO_SOURCE_ACCOUNT)
+        video_item.add("crawler_timestamp", int(time.time()))
+        video_item.add("video_oss_path", video_detail["ossVideoPath"])
+
+        # 将视频审核状态设置为审核中, 不影响后续发布流程
+        video_item.add("audit_status", const.AUDITING_STATUS)
+        video_item.add("audit_video_id", video_data["video_id"])
+        category = pq_long_articles_category_mapping.get(video_data["category"])
+        if category:
+            video_item.add("category", category)
+            video_item.add("category_status", const.SUCCESS_STATUS)
+
+        # check item before insert
+        video_item.check(source="video")
+        try:
+            item_with_oss_path = scrape_video_entities_process(
+                video_item=video_item.item, db_client=self.db_client
+            )
+            if item_with_oss_path:
+                insert_into_single_video_source_table(
+                    db_client=self.db_client, video_item=item_with_oss_path
+                )
+                self.update_piaoquan_top_video_status(
+                    pool_id=video_data["id"],
+                    ori_status=const.PROCESSING_STATUS,
+                    new_status=const.SUCCESS_STATUS
+                )
+            else:
+                self.update_piaoquan_top_video_status(
+                    pool_id=video_data["id"],
+                    ori_status=const.PROCESSING_STATUS,
+                    new_status=const.FAIL_STATUS
+                )
+
+        except Exception as e:
+            detail = {
+                "video_item": video_item.item,
+                "error": str(e),
+                "traceback": traceback.format_exc(),
+            }
+            self.update_piaoquan_top_video_status(
+                pool_id=video_data["id"],
+                ori_status=const.PROCESSING_STATUS,
+                new_status=const.FAIL_STATUS
+            )
+            log(
+                task="crawler_piaoquan_videos",
+                function="crawler_each_video",
+                message="crawler_piaoquan_videos failed",
+                status="failed",
+                data=detail,
+            )
+
+    def deal(self):
+        # roll back lock task
+        self.rollback_lock_tasks()
+
+        # get video_list
+        video_list = self.get_piaoquan_top_video_list()
+
+        for video_data in tqdm(video_list, desc="video_list"):
+            try:
+                self.crawler_each_video(video_data)
+
+            except Exception as e:
+                log(
+                    task="crawler_piaoquan_videos",
+                    function="crawler_each_video",
+                    message="crawler_piaoquan_videos failed",
+                    status="failed",
+                    data={
+                        "video_data": video_data,
+                        "error": str(e),
+                        "traceback": traceback.format_exc(),
+                    }
+                )