浏览代码

Merge branch 'luojunhui-baidu-account-crawler' of luojunhui/LongArticlesJob into master

luojunhui 8 月之前
父节点
当前提交
d195b00dda

+ 25 - 0
applications/const/__init__.py

@@ -208,3 +208,28 @@ class ArticleCollectorConst:
     ARTICLE_SUCCESS_CODE = 0
     ARTICLE_UNKNOWN_CODE = 10000
 
+
+class BaiduVideoCrawlerConst:
+    """
+    const for baidu video crawler
+    """
+    # account status
+    BAIDU_ACCOUNT_GOOD_STATUS = 1
+    BAIDU_ACCOUNT_BAD_STATUS = 0
+
+    # earliest cursor, 2024-01-01 00:00:00
+    DEFAULT_CURSOR = 17040384000000
+
+    # no source account
+    NO_SOURCE_ACCOUNT_STATUS = 0
+
+    # timestamp To Cursor
+    TIMESTAMP_TO_CURSOR = 10000
+
+    # local path dir
+    LOCAL_PATH_DIR = "static"
+
+
+
+
+

+ 12 - 8
applications/db/__init__.py

@@ -30,12 +30,12 @@ class DatabaseConnector:
         """
         try:
             self.connection = pymysql.connect(
-                host=self.db_config.get('host', 'localhost'),
-                user=self.db_config['user'],
-                password=self.db_config['password'],
-                db=self.db_config['db'],
-                port=self.db_config.get('port', 3306),
-                charset=self.db_config.get('charset', 'utf8mb4')
+                host=self.db_config.get("host", "localhost"),
+                user=self.db_config["user"],
+                password=self.db_config["password"],
+                db=self.db_config["db"],
+                port=self.db_config.get("port", 3306),
+                charset=self.db_config.get("charset", "utf8mb4"),
             )
         except pymysql.MySQLError as e:
             raise ConnectionError(f"无法连接到数据库: {e}")
@@ -48,9 +48,10 @@ class DatabaseConnector:
             self.connection.close()
             self.connection = None
 
-    def fetch(self, query, cursor_type=None):
+    def fetch(self, query, cursor_type=None, params=None):
         """
         执行单条查询语句,并返回结果。
+        :param params: 查询传参
         :param cursor_type: 输出的返回格式
         :param query: 查询语句
         :return: 查询结果列表
@@ -61,7 +62,10 @@ class DatabaseConnector:
 
         try:
             with self.connection.cursor(cursor_type) as cursor:
-                cursor.execute(query)
+                if params:
+                    cursor.execute(query, params)
+                else:
+                    cursor.execute(query)
                 result = cursor.fetchall()
                 return result
         except pymysql.MySQLError as e:

+ 4 - 3
applications/exception/spider_error.py

@@ -7,9 +7,9 @@ from applications import log
 
 
 class SpiderError(Exception):
-    """数据库查询异常"""
+    """spider_task_error"""
 
-    def __init__(self, error=None, spider=None, url=None):
+    def __init__(self, platform=None, error=None, spider=None, url=None):
         """
         :param error: 异常对象,可选,用于提供更详细的错误信息。
         :param spider: 爬虫任务
@@ -22,7 +22,8 @@ class SpiderError(Exception):
         }
         log(
             task="spider_task",
-            function="log_spider_error",
+            function="{}".format(platform),
+            message="{} 抓取失败".format(spider),
             data=error_obj
         )
         super().__init__(json.dumps(error_obj, ensure_ascii=False, indent=4))

+ 25 - 0
applications/functions.py

@@ -296,3 +296,28 @@ class Functions(object):
         params = parse_qs(urlparse(url).query)
         info = params.get(key, [])
         return info[0] if info else None
+
+    @classmethod
+    def download_baidu_videos(cls, video_url, save_path):
+        """
+        :param video_url: baidu video url
+        :param save_path: save path
+        """
+        if os.path.exists(save_path):
+            return save_path
+
+        response = requests.get(
+            video_url,
+            headers={
+                'User-Agent': FakeUserAgent().chrome,
+                "Accept": "*/*",
+                "Accept-Language": "zh-CN,zh;q=0.9"
+            }
+        )
+        with open(save_path, 'wb') as f:
+            f.write(response.content)
+        TEN_KB = 1024 * 10
+        if os.path.getsize(save_path) > TEN_KB:
+            return save_path
+        else:
+            return None

+ 1 - 0
coldStartTasks/crawler/baidu/__init__.py

@@ -0,0 +1 @@
+from .video_crawler import BaiduVideoCrawler

+ 4 - 0
coldStartTasks/crawler/baidu/account_crawler.py

@@ -0,0 +1,4 @@
+"""
+@author: luojunhui
+"""
+

+ 96 - 0
coldStartTasks/crawler/baidu/baidu_spider.py

@@ -0,0 +1,96 @@
+"""
+@author: luojunhui
+"""
+import base64
+import uuid
+
+import requests
+
+from fake_useragent import FakeUserAgent
+
+from applications.exception import SpiderError
+from applications import Functions
+
+functions = Functions()
+
+
+def baidu_account_video_crawler(account_id, cursor=None):
+    """
+    baidu account video crawler
+    :param account_id: 百度账号id
+    :param cursor: 游标, 默认为None,表示从最新的开始爬取
+    success requests:
+    """
+    cookie_str = uuid.uuid4().__str__().replace('-', '').upper()
+    url = "https://haokan.baidu.com/web/author/listall?"
+    params = {
+        'app_id': account_id,
+        'ctime': cursor,
+        'rn': 10,
+        'searchAfter': '',
+        '_api': 1
+    }
+    headers = {
+        'Accept': '*/*',
+        'Accept-Language': 'zh,zh-CN;q=0.9',
+        'Connection': 'keep-alive',
+        'Referer': 'https://haokan.baidu.com/author/{}'.format(account_id),
+        'User-Agent': FakeUserAgent().chrome,
+        'x-requested-with': 'xmlhttprequest',
+        'Cookie': 'BAIDUID={}:FG=1; BAIDUID_BFESS={}:FG=1'.format(cookie_str, cookie_str)
+    }
+    try:
+        response = requests.request("GET", url, headers=headers, params=params, proxies=functions.proxy())
+        response_json = response.json()
+        if response_json['errmsg'] == '成功':
+            response_data = response_json['data']
+            return response_data
+        else:
+            raise SpiderError(
+                platform="baidu",
+                spider="account_video_crawler",
+                error=response_json['errmsg'],
+                url=url
+            )
+
+    except Exception as e:
+        raise SpiderError(
+            platform="baidu",
+            spider="account_video_crawler",
+            error=str(e),
+            url=url
+        )
+
+
+def baidu_single_video_crawler(video_id):
+    """
+    baidu video crawler
+    :param video_id: 视频id
+    """
+    url = "https://haokan.baidu.com/v"
+    params = {
+        'vid': video_id,
+        '_format': 'json'
+    }
+    base_64_string = base64.b64encode(str(uuid.uuid4()).encode()).decode()
+    headers = {
+        'Accept': '*/*',
+        'cookie': "BIDUPSID={}".format(base_64_string),
+        'Accept-Language': 'en,zh;q=0.9,zh-CN;q=0.8',
+        'Cache-Control': 'no-cache',
+        'Connection': 'keep-alive',
+        'Content-Type': 'application/x-www-form-urlencoded',
+        'Referer': 'https://haokan.baidu.com',
+        'User-Agent': FakeUserAgent().chrome,
+    }
+    try:
+        response = requests.request("GET", url, headers=headers, params=params, proxies=functions.proxy())
+        response_json = response.json()
+        return response_json['data']['apiData']['curVideoMeta']
+    except Exception as e:
+        raise SpiderError(
+            platform="baidu",
+            spider="single_video_crawler",
+            error=str(e),
+            url=url
+        )

+ 269 - 0
coldStartTasks/crawler/baidu/video_crawler.py

@@ -0,0 +1,269 @@
+"""
+@author: luojunhui
+@description: video crawler
+"""
+
+import os
+import json
+import time
+import traceback
+from typing import List, Dict
+
+from pymysql.cursors import DictCursor
+from tqdm import tqdm
+
+from applications import Functions
+from applications import bot, log
+from applications.const import BaiduVideoCrawlerConst
+from applications.db import DatabaseConnector
+from applications.exception import SpiderError
+from config import long_articles_config
+from coldStartTasks.crawler.baidu.baidu_spider import baidu_account_video_crawler
+from coldStartTasks.crawler.baidu.baidu_spider import baidu_single_video_crawler
+
+const = BaiduVideoCrawlerConst()
+empty_list = []
+functions = Functions()
+
+
+class BaiduVideoCrawler(object):
+    """
+    baidu video crawler
+    """
+
+    def __init__(self):
+        self.db = None
+        self.success_crawler_video_count = 0
+        self.connect_db()
+
+    def connect_db(self) -> None:
+        """
+        connect db
+        """
+        self.db = DatabaseConnector(db_config=long_articles_config)
+        self.db.connect()
+
+    def get_account_list(self) -> List[Dict]:
+        """
+        get account list
+        """
+        sql = f"""
+            select account_id, account_name, max_cursor 
+            from baidu_account_for_videos
+            where status = {const.BAIDU_ACCOUNT_GOOD_STATUS};
+        """
+        account_list = self.db.fetch(query=sql, cursor_type=DictCursor)
+        return account_list
+
+    def whether_video_exists(self, title: str) -> bool:
+        """
+        whether video exists, use video_id && title
+        """
+        # check title
+        sql = f"""
+            select id from publish_single_video_source
+            where article_title = %s;
+        """
+        duplicate_id = self.db.fetch(query=sql, params=(title,))
+        if duplicate_id:
+            print(title + " video exists")
+            return True
+
+        return False
+
+    def save_each_video(self, video: Dict, account_id: str, account_name: str) -> None:
+        """
+        download and save each video
+        """
+        # print(json.dumps(video, ensure_ascii=False, indent=4))
+        video_id = video["id"]
+        title = video["title"]
+
+        # judge whether video exists
+        if self.whether_video_exists(title):
+            return
+
+        read_cnt = video.get("playcnt", 0)
+        like_cnt = video.get("like_num", 0)
+        publish_timestamp = video["publish_time"]
+        # duration = video['duration']
+        cover_url = video["poster"]
+        video_url = video["playurl"]
+        # sensitive_flag = video.get('sensitive_flag')
+        video_more_info = video.get("contentcms_intervene_data")
+        if video_more_info:
+            video_category_list = video_more_info.get("category_v2")
+            if video_category_list:
+                video_category = video_category_list[0]
+            else:
+                video_category = None
+        else:
+            video_category = None
+        manual_tags = video.get("manual_tags")
+
+        video_path = os.path.join(const.LOCAL_PATH_DIR, "{}.mp4".format(video_id))
+        download_path = functions.download_baidu_videos(video_url, video_path)
+        if download_path:
+            oss_path = functions.upload_to_oss(local_video_path=download_path)
+            insert_sql = f"""
+                INSERT INTO publish_single_video_source
+                (content_trace_id, article_title, out_account_id, out_account_name, read_cnt, like_cnt, article_url, cover_url, video_oss_path, publish_timestamp, crawler_timestamp, url_unique_md5, category, tags, platform, source_account)
+                values
+                (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+            """
+            try:
+                self.db.save(
+                    query=insert_sql,
+                    params=(
+                        "video{}".format(functions.str_to_md5(video_id)),
+                        title,
+                        account_id,
+                        account_name,
+                        read_cnt,
+                        like_cnt,
+                        video_url,
+                        cover_url,
+                        oss_path,
+                        publish_timestamp,
+                        int(time.time()),
+                        video_id,
+                        video_category,
+                        (
+                            json.dumps(manual_tags, ensure_ascii=False)
+                            if manual_tags
+                            else None
+                        ),
+                        "baidu",
+                        const.NO_SOURCE_ACCOUNT_STATUS,
+                    ),
+                )
+                self.success_crawler_video_count += 1
+            except Exception as e:
+                log(
+                    task="baidu_video_crawler",
+                    function="save_each_video",
+                    message="save video failed",
+                    data={
+                        "error": str(e),
+                        "traceback": traceback.format_exc(),
+                        "video_id": video_id,
+                        "oss_path": oss_path,
+                    },
+                )
+        else:
+            print(f"download video failed, video_id: {video_id}")
+
+    def save_video_list(
+        self, account_id: str, account_name: str, video_list: List[Dict]
+    ) -> None:
+        """
+        save video list
+        """
+        progress_bar = tqdm(video_list, desc="crawler account: {}".format(account_name))
+        for video_obj in progress_bar:
+            if video_obj["type"] == "video":
+                video_id = video_obj["content"]["vid"]
+                try:
+                    video_detail = baidu_single_video_crawler(video_id)
+                    self.save_each_video(
+                        video=video_detail,
+                        account_id=account_id,
+                        account_name=account_name,
+                    )
+                    progress_bar.set_postfix({"videoId": video_id})
+                except SpiderError as e:
+                    print("save single video fail", e)
+                    continue
+            else:
+                continue
+
+    def crawler_each_account(self, account: Dict, cursor=None) -> None:
+        """
+        crawler each account
+        response_strategy
+        """
+        account_id = account["account_id"]
+        max_cursor = account["max_cursor"]
+        if not max_cursor:
+            max_cursor = const.DEFAULT_CURSOR
+        account_name = account["account_name"]
+        try:
+            response_json = baidu_account_video_crawler(account_id, cursor=cursor)
+
+            video_list = response_json.get("results", empty_list)
+            if video_list:
+                self.save_video_list(
+                    account_id=account_id,
+                    account_name=account_name,
+                    video_list=video_list,
+                )
+            # check next page
+            has_next_page = response_json.get("has_more", False)
+            if has_next_page:
+                next_cursor = response_json.get("ctime", const.DEFAULT_CURSOR)
+                if next_cursor < max_cursor:
+                    print("No more videos after 2024-01-01")
+                    return
+                else:
+                    return self.crawler_each_account(account, next_cursor)
+        except SpiderError as e:
+            print(e)
+            return
+
+    def update_cursor(self, account_id: str) -> None:
+        """
+        update cursor for each account
+        """
+        select_sql = f"""
+            select max(publish_timestamp) as max_cursor from publish_single_video_source where out_account_id = '{account_id}';
+        """
+        response_mysql = self.db.fetch(query=select_sql)
+        max_publish_timestamp = response_mysql[0][0]
+        if max_publish_timestamp:
+            max_cursor = max_publish_timestamp * const.TIMESTAMP_TO_CURSOR
+            update_sql = f"""
+                update baidu_account_for_videos
+                set max_cursor = %s
+                where account_id = %s;
+            """
+            self.db.save(query=update_sql, params=(max_cursor, account_id))
+
+    def deal(self) -> None:
+        """
+        deal
+        """
+        account_list = self.get_account_list()
+        success_cnt = 0
+        fail_cnt = 0
+        account_list_process_bar = tqdm(account_list, desc="process account list")
+        for account in account_list_process_bar:
+            try:
+                account_list_process_bar.set_postfix(
+                    {"account_name": account["account_name"]}
+                )
+                self.crawler_each_account(account)
+                self.update_cursor(account["account_id"])
+                success_cnt += 1
+            except Exception as e:
+                fail_cnt += 1
+                log(
+                    task="baidu_video_crawler",
+                    function="deal",
+                    message="crawler each account failed",
+                    data={
+                        "account_id": account["account_id"],
+                        "account_name": account["account_name"],
+                        "error": str(e),
+                        "trace_back": traceback.format_exc(),
+                    },
+                )
+        bot(
+            title="baidu video crawler task finished",
+            detail={
+                "success_crawl_account_num": success_cnt,
+                "fail_crawl_account_num": fail_cnt,
+                "success_crawl_video_num": self.success_crawler_video_count,
+                "success_crawl_account_rate": success_cnt / (success_cnt + fail_cnt),
+            },
+            metion=False,
+        )

+ 2 - 1
requirements.txt

@@ -20,4 +20,5 @@ protobuf~=3.20.3
 openai~=1.17.0
 oss2~=2.19.1
 fake-useragent~=1.5.1
-playwright~=1.49.1
+playwright~=1.49.1
+volcengine-python-sdk[ark]

+ 8 - 0
run_baidu_video_crawler.py

@@ -0,0 +1,8 @@
+"""
+@author: luojunhui
+"""
+from coldStartTasks.crawler.baidu import BaiduVideoCrawler
+
+if __name__ == '__main__':
+    task = BaiduVideoCrawler()
+    task.deal()

+ 26 - 0
sh/run_baidu_video_crawler.sh

@@ -0,0 +1,26 @@
+#!/bin/bash
+
+# 获取当前日期,格式为 YYYY-MM-DD
+CURRENT_DATE=$(date +%F)
+
+# 日志文件路径,包含日期
+LOG_FILE="/root/luojunhui/logs/baidu_video_crawler_log_$CURRENT_DATE.txt"
+
+# 重定向整个脚本的输出到带日期的日志文件
+exec >> "$LOG_FILE" 2>&1
+if pgrep -f "python3 run_baidu_video_crawler.py" > /dev/null
+then
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - run_baidu_video_crawler.py is running"
+else
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - trying to restart run_baidu_video_crawler.py"
+    # 切换到指定目录
+    cd /root/luojunhui/LongArticlesJob
+
+    # 激活 Conda 环境
+    source /root/miniconda3/etc/profile.d/conda.sh
+    conda activate tasks
+
+    # 在后台运行 Python 脚本并重定向日志输出
+    nohup python3 run_baidu_video_crawler.py >> "${LOG_FILE}" 2>&1 &
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - successfully restarted run_baidu_video_crawler.py"
+fi