Преглед изворни кода

Merge branch '2025-03-06-toutiao-videos' into 2025-03-11-toutiao-related-recommendation
merge toutiao

luojunhui пре 7 месеци
родитељ
комит
710c3dd0b4

+ 27 - 0
applications/const/__init__.py

@@ -319,6 +319,33 @@ class ChannelVideoCrawlerConst:
     SLEEP_SECOND = 2
 
 
+class ToutiaoVideoCrawlerConst:
+    """
+    const for toutiao video crawler
+    """
+    # platform
+    PLATFORM = "toutiao"
+
+    # account status
+    TOUTIAO_ACCOUNT_GOOD_STATUS = 1
+    TOUTIAO_ACCOUNT_BAD_STATUS = 0
+
+    # earliest cursor, 2021-01-01 00:00:00
+    DEFAULT_CURSOR = 1609430400
+
+    # no source account
+    NO_SOURCE_ACCOUNT_STATUS = 0
+
+    # title length min
+    MIN_TITLE_LENGTH = 10
+
+    # max video length(second)
+    MAX_VIDEO_LENGTH = 600
+
+    # sleep second
+    SLEEP_SECOND = 3
+
+
 
 
 

Разлика између датотеке није приказан због своје велике величине
+ 7548 - 0
applications/js/toutiao.js


+ 4 - 0
applications/pipeline/__init__.py

@@ -0,0 +1,4 @@
+"""
+@author: luojunhui
+"""
+from .crawler_pipeline import scrape_video_entities_process

+ 83 - 0
applications/pipeline/crawler_pipeline.py

@@ -0,0 +1,83 @@
+"""
+@author: luojunhui
+"""
+
+import os
+import json
+
+from applications import log
+
+from applications.utils import download_gzh_video
+from applications.utils import download_toutiao_video
+from applications.utils import upload_to_oss
+
+from config import apolloConfig
+
+my_config = apolloConfig()
+
+empty_dict = {}
+sensitive_word_list = json.loads(my_config.getConfigValue("sensitive_word_list"))
+
+
+def whether_title_sensitive(title: str) -> bool:
+    """
+    title sensitive words filter
+    """
+    for word in sensitive_word_list:
+        if word in title:
+            return True
+
+    return False
+
+
+def whether_duplicate_video_title(video_title: str, db_client) -> bool:
+    """
+    whether duplicate video title
+    """
+    sql = f"""
+        select id from publish_single_video_source
+        where article_title = %s;
+    """
+    duplicate_id = db_client.fetch(query=sql, params=(video_title,))
+    if duplicate_id:
+        return True
+
+    return False
+
+
+def scrape_video_entities_process(video_item, db_client) -> dict:
+    """
+    video crawler pipeline
+    """
+    article_url = video_item["article_url"]
+    platform = video_item["platform"]
+    video_title = video_item["article_title"]
+    # whether title sensitive
+    if whether_title_sensitive(video_title):
+        return empty_dict
+
+    # whether duplicate video title
+    if whether_duplicate_video_title(video_title, db_client):
+        return empty_dict
+
+    # download video
+    match platform:
+        case "toutiao":
+            video_path = download_toutiao_video(article_url)
+        case "gzh":
+            video_path = download_gzh_video(article_url)
+        case "hksp":
+            video_path = ""
+        case "sph":
+            video_path = ""
+        case _:
+            return empty_dict
+
+    if video_path:
+        # upload video to oss
+        oss_path = upload_to_oss(video_path)
+        video_item["video_oss_path"] = oss_path
+        os.remove(video_path)
+        return video_item
+    else:
+        return empty_dict

+ 1 - 0
applications/utils/__init__.py

@@ -6,6 +6,7 @@ from .cold_start import get_inner_account_set
 from .common import *
 from .download_video import download_gzh_video
 from .download_video import download_sph_video
+from .download_video import download_toutiao_video
 from .item import Item
 from .save_to_db import insert_into_single_video_source_table
 from .upload import upload_to_oss

+ 39 - 0
applications/utils/common.py

@@ -4,6 +4,13 @@
 
 import hashlib
 
+from requests import RequestException
+from tenacity import (
+    stop_after_attempt,
+    wait_exponential,
+    retry_if_exception_type,
+)
+
 
 def str_to_md5(strings):
     """
@@ -20,3 +27,35 @@ def str_to_md5(strings):
     # 获取16进制形式的MD5哈希值
     md5_value = md5_hash.hexdigest()
     return md5_value
+
+
+def proxy():
+    """
+    快代理
+    """
+    # 隧道域名:端口号
+    tunnel = "j685.kdltps.com:15818"
+
+    # 用户名密码方式
+    username = "t14070979713487"
+    password = "hqwanfvy"
+    proxies = {
+        "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel},
+        "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel}
+    }
+    return proxies
+
+
+def request_retry(retry_times, min_retry_delay, max_retry_delay):
+    """
+    :param retry_times:
+    :param min_retry_delay:
+    :param max_retry_delay:
+    """
+    common_retry = dict(
+        stop=stop_after_attempt(retry_times),
+        wait=wait_exponential(min=min_retry_delay, max=max_retry_delay),
+        retry=retry_if_exception_type((RequestException, TimeoutError)),
+        reraise=True  # 重试耗尽后重新抛出异常
+    )
+    return common_retry

+ 15 - 0
applications/utils/download_video.py

@@ -139,3 +139,18 @@ def decrypt_sph_video(video_path: str, key: int, save_path: str) -> None:
     except Exception as e:
         print(traceback.format_exc())
         raise RuntimeError(f"Decryption failed: {str(e)}") from e
+
+
+def download_toutiao_video(video_url: str) -> str:
+    """
+    download toutiao video
+    """
+    save_path = "static/{}.mp4".format(str_to_md5(video_url))
+    response = requests.get(video_url, headers=headers, stream=True)
+    with open(save_path, "wb") as f:
+        for chunk in response.iter_content(chunk_size=8192):
+            if chunk:
+                f.write(chunk)
+
+    return save_path
+

+ 1 - 1
coldStartTasks/crawler/__init__.py

@@ -2,4 +2,4 @@
 @author: luojunhui
 """
 from .weixin_account_crawler import WeixinAccountCrawler
-from .weixin_video_crawler import WeixinVideoCrawler
+from .weixin_video_crawler import WeixinVideoCrawler

+ 4 - 0
coldStartTasks/crawler/toutiao/__init__.py

@@ -0,0 +1,4 @@
+"""
+@author: luojunhui
+"""
+from .blogger import get_toutiao_account_video_list

+ 64 - 0
coldStartTasks/crawler/toutiao/blogger.py

@@ -0,0 +1,64 @@
+"""
+@author: luojunhui
+"""
+
+from __future__ import annotations
+
+import json
+import requests
+from tenacity import retry
+
+from applications import log
+from applications.utils import proxy, request_retry
+from .use_js import call_js_function
+
+retry_desc = request_retry(retry_times=3, min_retry_delay=2, max_retry_delay=30)
+
+
+@retry(**retry_desc)
+def get_toutiao_account_video_list(
+    account_id: str, cookie: str, max_behot_time=0
+) -> dict | None:
+    """
+    get toutiao account video list
+    :param account_id: toutiao account id
+    :param cookie: cookie maybe expire not quite sure
+    :param max_behot_time: max behot time
+    :return: toutiao account video list
+    """
+    ms_token = "mFs9gU4FJc23gFWPvBfQxFsBRrx1xBEJD_ZRTAolHfPrae84kTEBaHQR3s8ToiLX4-U9hgATTZ2cVHlSixmj5YCTOPoVM-43gOt3aVHkxfXHEuUtTJe-wUEs%3D"
+    query_params = [
+        0,
+        1,
+        14,
+        "category=pc_user_hot&token={}&aid=24&app_name=toutiao_web&msToken={}".format(
+            account_id, ms_token
+        ),
+        "",
+        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36",
+    ]
+    a_bogus = call_js_function(query_params)
+    url = f"https://www.toutiao.com/api/pc/list/user/feed?category=pc_profile_video&token={account_id}&max_behot_time={max_behot_time}&hot_video=0&entrance_gid=&aid=24&app_name=toutiao_web&msToken={ms_token}&a_bogus={a_bogus}"
+    headers = {
+        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3",
+        "cookie": cookie,
+    }
+    try:
+        response = requests.get(url, headers=headers, proxies=proxy())
+        response.raise_for_status()
+        return response.json()
+    except requests.exceptions.RequestException as e:
+        log(
+            task="toutiao account crawler",
+            function="get_toutiao_account_video_list",
+            message=f"API请求失败: {e}",
+            data={"account_id": account_id},
+        )
+    except json.JSONDecodeError as e:
+        log(
+            task="toutiao account crawler",
+            function="get_toutiao_account_video_list",
+            message=f"响应解析失败: {e}",
+            data={"account_id": account_id},
+        )
+    return None

+ 25 - 0
coldStartTasks/crawler/toutiao/use_js.py

@@ -0,0 +1,25 @@
+"""
+@author: luojunhui
+"""
+import json
+import subprocess
+
+from config import toutiao_js_path
+
+
+def call_js_function(arguments_list):
+    """
+    call js function
+    """
+    # 将参数转换为JSON字符串
+    args_json = json.dumps(arguments_list)
+    # 调用Node.js执行脚本
+    result = subprocess.run(
+        ['node', toutiao_js_path, args_json],
+        capture_output=True,
+        text=True
+    )
+    if result.returncode == 0:
+        return result.stdout.strip()
+    else:
+        raise Exception(f"Error: {result.stderr}")

+ 4 - 1
config/__init__.py

@@ -104,4 +104,7 @@ gewe_token = "d3fb918f-0f36-4769-b095-410181614231"
 gewe_app_id = "wx_GKpVW8xfEhcaxMIK9sSm6"
 
 # sph decrypt key
-decrypt_key_path = 'applications/so/libsph_decrypt.so'
+decrypt_key_path = 'applications/so/libsph_decrypt.so'
+
+# toutiao js path
+toutiao_js_path = 'applications/js/toutiao.js'

+ 208 - 0
tasks/crawler_toutiao_account_videos.py

@@ -0,0 +1,208 @@
+"""
+@author: luojunhui
+"""
+
+from __future__ import annotations
+
+import time
+import traceback
+
+from pymysql.cursors import DictCursor
+from tqdm import tqdm
+
+from applications import log
+from applications.const import ToutiaoVideoCrawlerConst
+from applications.db import DatabaseConnector
+from applications.pipeline import scrape_video_entities_process
+from applications.utils import Item
+from applications.utils import str_to_md5
+from applications.utils import insert_into_single_video_source_table
+from coldStartTasks.crawler.toutiao import get_toutiao_account_video_list
+from config import apolloConfig, long_articles_config
+
+const = ToutiaoVideoCrawlerConst()
+config = apolloConfig()
+cookie = config.getConfigValue("toutiao_blogger_cookie")
+
+
+class CrawlerToutiaoAccountVideos:
+    """
+    toutiao blogger crawler
+    """
+
+    def __init__(self):
+        self.db_client = DatabaseConnector(db_config=long_articles_config)
+        self.db_client.connect()
+
+    def get_account_list(self):
+        """
+        get account list
+        """
+        sql = f"""
+            select account_id, max_cursor
+            from video_meta_accounts
+            where platform = 'toutiao' and status = {const.TOUTIAO_ACCOUNT_GOOD_STATUS};
+        """
+        account_list = self.db_client.fetch(query=sql, cursor_type=DictCursor)
+        return account_list
+
+    def crawler_each_account_video_list(
+        self, account_id: str, max_cursor: int | None, max_behot_time: int = 0
+    ):
+        """
+        account_id: toutiao account id
+        max_cursor: crawler latest cursor for each account
+        max_behot_time: max behot time from toutiao, use to switch to next page
+        """
+        has_more = True
+        current_cursor = max_behot_time
+        max_cursor = max_cursor or const.DEFAULT_CURSOR
+
+        while has_more:
+            response = get_toutiao_account_video_list(
+                account_id=account_id, cookie=cookie, max_behot_time=current_cursor
+            )
+            if not response:
+                break
+
+            if response["message"] != "success":
+                log(
+                    task="crawler_toutiao_account_videos",
+                    function="crawler_toutiao_account_videos",
+                    message="get response from toutiao failed",
+                    data={"account_id": account_id, "response": response},
+                )
+                break
+
+            video_list = response["data"]
+            has_more = response["has_more"]
+            current_cursor = response["next"]["max_behot_time"]
+
+            if not video_list:
+                break
+
+            max_timestamp_in_this_group = video_list[0]["publish_time"]
+            if max_timestamp_in_this_group < max_cursor:
+                break
+
+            # do crawler each video
+            crawler_video_list_bar = tqdm(video_list, desc="crawler videos")
+            for video in crawler_video_list_bar:
+                try:
+                    crawler_video_list_bar.set_postfix({"video_id": video["id"]})
+                    self.crawler_each_video(video)
+
+                except Exception as e:
+                    log(
+                        task="crawler_toutiao_account_videos",
+                        function="crawler_each_account_video_list",
+                        message="crawler each video failed",
+                        data={
+                            "account_id": account_id,
+                            "video_info": video,
+                            "error": str(e),
+                            "traceback": traceback.format_exc(),
+                        },
+                    )
+
+            if has_more:
+                time.sleep(const.SLEEP_SECOND)
+            else:
+                break
+
+    def crawler_each_video(self, video_data):
+        """
+        crawler each video data
+        """
+        video_item = Item()
+        video_id = video_data["video_id"]
+        title = video_data["title"]
+        media = video_data["video"]
+        url = media["download_addr"]["url_list"][0]
+
+        # add info into item
+        video_item.add("content_trace_id", "video{}".format(str_to_md5(str(video_id))))
+        video_item.add("url_unique_md5", video_id)
+        video_item.add("article_title", title)
+        video_item.add("out_account_id", video_data["user"]["user_id"])
+        video_item.add("out_account_name", video_data["source"])
+        video_item.add("publish_timestamp", video_data["publish_time"])
+        video_item.add("platform", const.PLATFORM)
+        video_item.add("read_cnt", video_data.get("read_count", 0))
+        video_item.add("article_url", url)
+        video_item.add("source_account", const.NO_SOURCE_ACCOUNT_STATUS)
+        video_item.add("crawler_timestamp", int(time.time()))
+
+        # check item before insert
+        video_item.check(source="video")
+        try:
+            item_with_oss_path = scrape_video_entities_process(
+                video_item=video_item.item, db_client=self.db_client
+            )
+            if item_with_oss_path:
+                insert_into_single_video_source_table(
+                    self.db_client, item_with_oss_path
+                )
+        except Exception as e:
+            log(
+                task="crawler_toutiao_account_videos",
+                function="crawler_toutiao_account_videos",
+                message="etl failed",
+                data={
+                    "video_item": video_item.item,
+                    "error": str(e),
+                    "traceback": traceback.format_exc(),
+                }
+            )
+
+    def update_account_max_cursor(self, account_id: str) -> None:
+        """
+        update account max cursor
+        """
+        select_sql = f"""
+            select max(publish_timestamp) as max_cursor 
+            from publish_single_video_source 
+            where out_account_id = '{account_id}' and platform = '{const.PLATFORM}';
+        """
+        response_mysql = self.db_client.fetch(query=select_sql)
+        max_publish_timestamp = response_mysql[0][0]
+
+        if max_publish_timestamp:
+            update_sql = f"""
+                update video_meta_accounts
+                set max_cursor = %s
+                where account_id = %s and platform = %s;
+            """
+            self.db_client.save(
+                query=update_sql,
+                params=(max_publish_timestamp, account_id, const.PLATFORM),
+            )
+
+    def deal(self) -> None:
+        """
+        class entrance
+        """
+        account_list = self.get_account_list()
+        account_list_bar = tqdm(account_list, desc="crawler toutiao accounts")
+        for account in account_list_bar:
+            account_id = account["account_id"]
+            max_cursor = account["max_cursor"]
+            try:
+                # crawl each account
+                account_list_bar.set_postfix({"account_id": account_id})
+                self.crawler_each_account_video_list(
+                    account_id=account_id, max_cursor=max_cursor
+                )
+                self.update_account_max_cursor(account_id)
+
+            except Exception as e:
+                # add log and bot
+                log(
+                    task="crawler_toutiao_account_videos",
+                    function="deal",
+                    message=account_id,
+                    data={
+                        "error": str(e),
+                        "traceback": traceback.format_exc(),
+                    },
+                )

+ 10 - 0
toutiao_video_crawler.py

@@ -0,0 +1,10 @@
+"""
+@author: luojunhui
+"""
+
+from tasks.crawler_toutiao_account_videos import CrawlerToutiaoAccountVideos
+
+
+if __name__ == '__main__':
+    crawler = CrawlerToutiaoAccountVideos()
+    crawler.deal()

Неке датотеке нису приказане због велике количине промена