Pārlūkot izejas kodu

头条blogger接口完成

luojunhui 3 mēneši atpakaļ
vecāks
revīzija
0868cf92ff

+ 5 - 2
applications/const/__init__.py

@@ -323,9 +323,12 @@ class ToutiaoVideoCrawlerConst:
     """
     const for toutiao video crawler
     """
+    # platform
+    PLATFORM = "toutiao"
+
     # account status
-    CHANNEL_ACCOUNT_GOOD_STATUS = 1
-    CHANNEL_ACCOUNT_BAD_STATUS = 0
+    TOUTIAO_ACCOUNT_GOOD_STATUS = 1
+    TOUTIAO_ACCOUNT_BAD_STATUS = 0
 
     # earliest cursor, 2021-01-01 00:00:00
     DEFAULT_CURSOR = 1609430400

+ 22 - 0
applications/utils/common.py

@@ -4,6 +4,13 @@
 
 import hashlib
 
+from requests import RequestException
+from tenacity import (
+    stop_after_attempt,
+    wait_exponential,
+    retry_if_exception_type,
+)
+
 
 def str_to_md5(strings):
     """
@@ -37,3 +44,18 @@ def proxy():
         "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel}
     }
     return proxies
+
+
+def request_retry(retry_times, min_retry_delay, max_retry_delay):
+    """
+    :param retry_times:
+    :param min_retry_delay:
+    :param max_retry_delay:
+    """
+    common_retry = dict(
+        stop=stop_after_attempt(retry_times),
+        wait=wait_exponential(min=min_retry_delay, max=max_retry_delay),
+        retry=retry_if_exception_type((RequestException, TimeoutError)),
+        reraise=True  # 重试耗尽后重新抛出异常
+    )
+    return common_retry

+ 41 - 11
coldStartTasks/crawler/toutiao/blogger.py

@@ -1,13 +1,24 @@
 """
 @author: luojunhui
 """
+
+from __future__ import annotations
+
 import json
 import requests
-from applications.utils import proxy
+from tenacity import retry
+
+from applications import log
+from applications.utils import proxy, request_retry
 from .use_js import call_js_function
 
+retry_desc = request_retry(retry_times=3, min_retry_delay=2, max_retry_delay=30)
+
 
-def get_toutiao_account_video_list(account_id: str, cookie: str, max_behot_time=0) -> dict:
+@retry(**retry_desc)
+def get_toutiao_account_video_list(
+    account_id: str, cookie: str, max_behot_time=0
+) -> dict | None:
     """
     get toutiao account video list
     :param account_id: toutiao account id
@@ -15,20 +26,39 @@ def get_toutiao_account_video_list(account_id: str, cookie: str, max_behot_time=
     :param max_behot_time: max behot time
     :return: toutiao account video list
     """
-    ms_token = 'mFs9gU4FJc23gFWPvBfQxFsBRrx1xBEJD_ZRTAolHfPrae84kTEBaHQR3s8ToiLX4-U9hgATTZ2cVHlSixmj5YCTOPoVM-43gOt3aVHkxfXHEuUtTJe-wUEs%3D'
+    ms_token = "mFs9gU4FJc23gFWPvBfQxFsBRrx1xBEJD_ZRTAolHfPrae84kTEBaHQR3s8ToiLX4-U9hgATTZ2cVHlSixmj5YCTOPoVM-43gOt3aVHkxfXHEuUtTJe-wUEs%3D"
     query_params = [
         0,
         1,
         14,
-        'category=pc_user_hot&token={}&aid=24&app_name=toutiao_web&msToken={}'.format(account_id, ms_token),
-        '',
-        'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36',
+        "category=pc_user_hot&token={}&aid=24&app_name=toutiao_web&msToken={}".format(
+            account_id, ms_token
+        ),
+        "",
+        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36",
     ]
     a_bogus = call_js_function(query_params)
-    url = f'https://www.toutiao.com/api/pc/list/user/feed?category=pc_profile_video&token={account_id}&max_behot_time={max_behot_time}&hot_video=0&entrance_gid=&aid=24&app_name=toutiao_web&msToken={ms_token}&a_bogus={a_bogus}'
+    url = f"https://www.toutiao.com/api/pc/list/user/feed?category=pc_profile_video&token={account_id}&max_behot_time={max_behot_time}&hot_video=0&entrance_gid=&aid=24&app_name=toutiao_web&msToken={ms_token}&a_bogus={a_bogus}"
     headers = {
-        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3',
-        'cookie': cookie
+        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3",
+        "cookie": cookie,
     }
-    response = requests.get(url, headers=headers, proxies=proxy())
-    return response.json()
+    try:
+        response = requests.get(url, headers=headers, proxies=proxy())
+        response.raise_for_status()
+        return response.json()
+    except requests.exceptions.RequestException as e:
+        log(
+            task="toutiao account crawler",
+            function="get_toutiao_account_video_list",
+            message=f"API请求失败: {e}",
+            data={"account_id": account_id},
+        )
+    except json.JSONDecodeError as e:
+        log(
+            task="toutiao account crawler",
+            function="get_toutiao_account_video_list",
+            message=f"响应解析失败: {e}",
+            data={"account_id": account_id},
+        )
+    return None

+ 93 - 37
tasks/crawler_toutiao_account_videos.py

@@ -2,10 +2,15 @@
 @author: luojunhui
 """
 
+from __future__ import annotations
+
 import time
+import traceback
 
+from pymysql.cursors import DictCursor
 from tqdm import tqdm
 
+from applications import log
 from applications.const import ToutiaoVideoCrawlerConst
 from applications.db import DatabaseConnector
 from applications.pipeline import scrape_video_entities_process
@@ -33,21 +38,40 @@ class CrawlerToutiaoAccountVideos:
         """
         get account list
         """
-        return
+        sql = f"""
+            select account_id, max_cursor
+            from video_meta_accounts
+            where platform = 'toutiao' and status = {const.TOUTIAO_ACCOUNT_GOOD_STATUS};
+        """
+        account_list = self.db_client.fetch(query=sql, cursor_type=DictCursor)
+        return account_list
 
-    def crawler_each_account_video_list(self, account_id, max_behot_time=0):
+    def crawler_each_account_video_list(
+        self, account_id: str, max_cursor: int | None, max_behot_time: int = 0
+    ):
         """
-        get each account video list
+        account_id: toutiao account id
+        max_cursor: crawler latest cursor for each account
+        max_behot_time: max behot time from toutiao, use to switch to next page
         """
-        current_cursor = max_behot_time
         has_more = True
+        current_cursor = max_behot_time
+        max_cursor = max_cursor or const.DEFAULT_CURSOR
 
         while has_more:
             response = get_toutiao_account_video_list(
                 account_id=account_id, cookie=cookie, max_behot_time=current_cursor
             )
+            if not response:
+                break
+
             if response["message"] != "success":
-                print("error")
+                log(
+                    task="crawler_toutiao_account_videos",
+                    function="crawler_toutiao_account_videos",
+                    message="get response from toutiao failed",
+                    data={"account_id": account_id, "response": response},
+                )
                 break
 
             video_list = response["data"]
@@ -58,13 +82,27 @@ class CrawlerToutiaoAccountVideos:
                 break
 
             max_timestamp_in_this_group = video_list[0]["publish_time"]
-            if max_timestamp_in_this_group < const.DEFAULT_CURSOR:
+            if max_timestamp_in_this_group < max_cursor:
                 break
 
+            # do crawler each video
             crawler_video_list_bar = tqdm(video_list, desc="crawler videos")
             for video in crawler_video_list_bar:
-                crawler_video_list_bar.set_postfix({"video_id": video["id"]})
-                self.crawler_each_video(video)
+                try:
+                    crawler_video_list_bar.set_postfix({"video_id": video["id"]})
+                    self.crawler_each_video(video)
+                except Exception as e:
+                    log(
+                        task="crawler_toutiao_account_videos",
+                        function="crawler_each_account_video_list",
+                        message="crawler each video failed",
+                        data={
+                            "account_id": account_id,
+                            "video_info": video,
+                            "error": str(e),
+                            "traceback": traceback.format_exc(),
+                        },
+                    )
 
             if has_more:
                 time.sleep(const.SLEEP_SECOND)
@@ -88,7 +126,7 @@ class CrawlerToutiaoAccountVideos:
         video_item.add("out_account_id", video_data["user"]["user_id"])
         video_item.add("out_account_name", video_data["source"])
         video_item.add("publish_timestamp", video_data["publish_time"])
-        video_item.add("platform", "toutiao")
+        video_item.add("platform", const.PLATFORM)
         video_item.add("read_cnt", video_data["read_count"])
         video_item.add("article_url", url)
         video_item.add("source_account", const.NO_SOURCE_ACCOUNT_STATUS)
@@ -107,36 +145,54 @@ class CrawlerToutiaoAccountVideos:
         except Exception as e:
             print(e)
 
-    def deal(self):
+    def update_account_max_cursor(self, account_id: str) -> None:
+        """
+        update account max cursor
+        """
+        select_sql = f"""
+            select max(publish_timestamp) as max_cursor 
+            from publish_single_video_source 
+            where out_account_id = '{account_id}' and platform = '{const.PLATFORM}';
+        """
+        response_mysql = self.db_client.fetch(query=select_sql)
+        max_publish_timestamp = response_mysql[0][0]
+
+        if max_publish_timestamp:
+            update_sql = f"""
+                        update video_meta_accounts
+                        set max_cursor = %s
+                        where account_id = %s and platform = %s;
+            """
+            self.db_client.save(
+                query=update_sql,
+                params=(max_publish_timestamp, account_id, const.PLATFORM),
+            )
+
+    def deal(self) -> None:
         """
         class entrance
         """
-        account_id_list = [
-            "MS4wLjABAAAABIPlAy8EngHf3bXkYFrN935tdJuS9nu3wCdeONe5ZkMxcsQ5AQkxYEcUGqPcA6K7",
-            "MS4wLjABAAAAXqC8gtp2uYlGyO8Bua10GOTqsmi6TPUshTullb1vsSlK1WoRPRW0b1cFmKEpKDyy",
-            "MS4wLjABAAAAUG5on9TNGiGcDAnthjQUz8hs93QU-R37KzAqsCj_IsU",
-            "MS4wLjABAAAAHbUq1p1NodyaVw8nBwdz7su5NIrONIcZ22xLbCRIxUC09s8FeqmQh4tg9MOCLktV",
-            "MS4wLjABAAAAbH-9GMPXTyC9RE-aSpzi0thIrqw-SzbdPz-v7M7YGGQ",
-            "MS4wLjABAAAAq3YelxNuDki2gDu83MEBS7zultxsY8YZ1AWcC1XRugSrFFLOgBZvFeFmNn-h_5Qa",
-            "MS4wLjABAAAA29pf0waQ3QOGd03JpLbYgju5Bg4t1xyIZByY0ijTDYN5Y1aL9LV-DuiSAz7UNfqL",
-            "MS4wLjABAAAAlNBBh2wsAfQIKY6XkVQj6FC9FZonfX8jjsIiVl7xV4c",
-            "MS4wLjABAAAA_5u04HihfTRaYKILhN0ksZqGQXtPqoAS3lMe44oEKFc8NKsVrA6hR-OSN82gw-ue",
-            "MS4wLjABAAAAG5dpmasVG0C2bgr9hNclcKxqm6DPz_1dCOr4fzNT-V0",
-            "MS4wLjABAAAAoUESfCcb-NXbHXJr-A7TszauMxIvjXd0EhULvmVyUhpj-HSs5gsCxrbZFvEcJZzU",
-            "MS4wLjABAAAAlshV8QVXTo4VxSjSHh9B7LpK4_DPKA1vJkbcH8-3Jmq7QohWBHpcphQ2gKAKYe7M",
-            "MS4wLjABAAAAKO4skzt3d35FYb92Vv1lVgzpPz9PdAGsXvqs3WyXILs",
-            "MS4wLjABAAAAp1CP5bxMGYW7fxMZOJKSuSMQeMD7AMw5MyOvP-1xC14",
-            "MS4wLjABAAAAld-tIrZWcmQp9K_IRTI2zcT5GFlzrOH2yj7Cino8xqU",
-            "MS4wLjABAAAAncBYHG1eIO-gSC1FIs8YmGjVTQuN9s9-NBbFs_1pOX0apGmlQd0GroZpb2TpAzVb",
-            "MS4wLjABAAAAqYXDF25BWZBXePfjCISRSmzQRytwOJhBwii9YnzwirYt1MAzdk6kikc6QChcYC9G",
-            "MS4wLjABAAAA_t2pW2XSRFL4P8rV4X3T0hIEnEBxCbLC_cgD3B-Q9mwYorMiNyyoGcmLuyVxnyj1",
-            "MS4wLjABAAAAEU1n5akXZ7Fvd8wkm1BV6pMRI58mgZUPgyQGHBiRKIi4UcoRglDk6xgEgEK8Lk3n",
-            "MS4wLjABAAAAlwoEZD-OROoX_nMoulzBDCnlMqj72GIAB-PO2A3C0GVmYGOnBEH0jhbibVyRUqir",
-        ]
-
-        for account_id in account_id_list:
+        account_list = self.get_account_list()
+        account_list_bar = tqdm(account_list, desc="crawler toutiao accounts")
+        for account in account_list_bar:
+            account_id = account["account_id"]
+            max_cursor = account["max_cursor"]
             try:
-                self.crawler_each_account_video_list(account_id)
+                # crawl each account
+                account_list_bar.set_postfix({"account_id": account_id})
+                self.crawler_each_account_video_list(
+                    account_id=account_id, max_cursor=max_cursor
+                )
+                self.update_account_max_cursor(account_id)
+
             except Exception as e:
-                print(e)
-                continue
+                # add log and bot
+                log(
+                    task="crawler_toutiao_account_videos",
+                    function="deal",
+                    message=account_id,
+                    data={
+                        "error": str(e),
+                        "traceback": traceback.format_exc(),
+                    },
+                )