""" @author: luojunhui @description: video crawler """ import os import json import time import traceback from typing import List, Dict from pymysql.cursors import DictCursor from tqdm import tqdm from applications import Functions from applications import bot, log from applications.const import BaiduVideoCrawlerConst from applications.db import DatabaseConnector from applications.exception import SpiderError from config import long_articles_config from coldStartTasks.crawler.baidu.baidu_spider import baidu_account_video_crawler from coldStartTasks.crawler.baidu.baidu_spider import baidu_single_video_crawler const = BaiduVideoCrawlerConst() empty_list = [] functions = Functions() class BaiduVideoCrawler(object): """ baidu video crawler """ def __init__(self): self.db = None self.success_crawler_video_count = 0 self.connect_db() def connect_db(self) -> None: """ connect db """ self.db = DatabaseConnector(db_config=long_articles_config) self.db.connect() def get_account_list(self) -> List[Dict]: """ get account list """ sql = f""" select account_id, account_name, max_cursor from baidu_account_for_videos where status = {const.BAIDU_ACCOUNT_GOOD_STATUS}; """ account_list = self.db.fetch(query=sql, cursor_type=DictCursor) return account_list def whether_video_exists(self, title: str) -> bool: """ whether video exists, use video_id && title """ # check title sql = f""" select id from publish_single_video_source where article_title = %s; """ duplicate_id = self.db.fetch(query=sql, params=(title,)) if duplicate_id: print(title + " video exists") return True return False def save_each_video(self, video: Dict, account_id: str, account_name: str) -> None: """ download and save each video """ # print(json.dumps(video, ensure_ascii=False, indent=4)) video_id = video["id"] title = video["title"] # judge whether video exists if self.whether_video_exists(title): return read_cnt = video.get("playcnt", 0) like_cnt = video.get("like_num", 0) publish_timestamp = video["publish_time"] # duration = video['duration'] cover_url = video["poster"] video_url = video["playurl"] # sensitive_flag = video.get('sensitive_flag') video_more_info = video.get("contentcms_intervene_data") if video_more_info: video_category_list = video_more_info.get("category_v2") if video_category_list: video_category = video_category_list[0] else: video_category = None else: video_category = None manual_tags = video.get("manual_tags") video_path = os.path.join(const.LOCAL_PATH_DIR, "{}.mp4".format(video_id)) download_path = functions.download_baidu_videos(video_url, video_path) if download_path: oss_path = functions.upload_to_oss(local_video_path=download_path) insert_sql = f""" INSERT INTO publish_single_video_source (content_trace_id, article_title, out_account_id, out_account_name, read_cnt, like_cnt, article_url, cover_url, video_oss_path, publish_timestamp, crawler_timestamp, url_unique_md5, category, tags, platform, source_account) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); """ try: self.db.save( query=insert_sql, params=( "video{}".format(functions.str_to_md5(video_id)), title, account_id, account_name, read_cnt, like_cnt, video_url, cover_url, oss_path, publish_timestamp, int(time.time()), video_id, video_category, ( json.dumps(manual_tags, ensure_ascii=False) if manual_tags else None ), "hksp", const.NO_SOURCE_ACCOUNT_STATUS, ), ) self.success_crawler_video_count += 1 except Exception as e: log( task="baidu_video_crawler", function="save_each_video", message="save video failed", data={ "error": str(e), "traceback": traceback.format_exc(), "video_id": video_id, "oss_path": oss_path, }, ) else: print(f"download video failed, video_id: {video_id}") def save_video_list( self, account_id: str, account_name: str, video_list: List[Dict] ) -> None: """ save video list """ progress_bar = tqdm(video_list, desc="crawler account: {}".format(account_name)) for video_obj in progress_bar: if video_obj["type"] == "video": video_id = video_obj["content"]["vid"] try: video_detail = baidu_single_video_crawler(video_id) self.save_each_video( video=video_detail, account_id=account_id, account_name=account_name, ) progress_bar.set_postfix({"videoId": video_id}) except SpiderError as e: print("save single video fail", e) continue else: continue def crawler_each_account(self, account: Dict, cursor=None) -> None: """ crawler each account response_strategy """ account_id = account["account_id"] max_cursor = account["max_cursor"] if not max_cursor: max_cursor = const.DEFAULT_CURSOR account_name = account["account_name"] try: response_json = baidu_account_video_crawler(account_id, cursor=cursor) video_list = response_json.get("results", empty_list) if video_list: self.save_video_list( account_id=account_id, account_name=account_name, video_list=video_list, ) # check next page has_next_page = response_json.get("has_more", False) if has_next_page: next_cursor = response_json.get("ctime", const.DEFAULT_CURSOR) if next_cursor < max_cursor: print("No more videos after 2024-01-01") return else: return self.crawler_each_account(account, next_cursor) except SpiderError as e: print(e) return def update_cursor(self, account_id: str) -> None: """ update cursor for each account """ select_sql = f""" select max(publish_timestamp) as max_cursor from publish_single_video_source where out_account_id = '{account_id}'; """ response_mysql = self.db.fetch(query=select_sql) max_publish_timestamp = response_mysql[0][0] if max_publish_timestamp: max_cursor = max_publish_timestamp * const.TIMESTAMP_TO_CURSOR update_sql = f""" update baidu_account_for_videos set max_cursor = %s where account_id = %s; """ self.db.save(query=update_sql, params=(max_cursor, account_id)) def deal(self) -> None: """ deal """ account_list = self.get_account_list() success_cnt = 0 fail_cnt = 0 account_list_process_bar = tqdm(account_list, desc="process account list") for account in account_list_process_bar: try: account_list_process_bar.set_postfix( {"account_name": account["account_name"]} ) self.crawler_each_account(account) self.update_cursor(account["account_id"]) success_cnt += 1 except Exception as e: fail_cnt += 1 log( task="baidu_video_crawler", function="deal", message="crawler each account failed", data={ "account_id": account["account_id"], "account_name": account["account_name"], "error": str(e), "trace_back": traceback.format_exc(), }, ) bot( title="baidu video crawler task finished", detail={ "success_crawl_account_num": success_cnt, "fail_crawl_account_num": fail_cnt, "success_crawl_video_num": self.success_crawler_video_count, "success_crawl_account_rate": success_cnt / (success_cnt + fail_cnt), }, mention=False, )