""" @author: luojunhui @description: video crawler """ import json import time from pymysql.cursors import DictCursor from tqdm import tqdm from applications import Functions from applications.db import DatabaseConnector from applications.exception import SpiderError from config import long_articles_config from coldStartTasks.crawler.baidu.baidu_spider import baidu_account_video_crawler from coldStartTasks.crawler.baidu.baidu_spider import baidu_single_video_crawler empty_list = [] functions = Functions() DEFAULT_CURSOR = 17040384000000 # 最早时间为2024-01-01 00:00:00 class BaiduVideoCrawler(object): """ baidu video crawler """ def __init__(self): self.db = None def connect_db(self): """ connect db """ self.db = DatabaseConnector(db_config=long_articles_config) self.db.connect() def get_account_list(self): """ get account list status = 1 表示正常抓取的账号 """ sql = f""" select account_id, account_name, latest_crawler_timestamp as max_cursor from baidu_account_for_videos where status = 1; """ account_list = self.db.fetch(query=sql, cursor_type=DictCursor) return account_list def whether_video_exists(self, video_id, title): """ whether video exists, use video_id && title """ # first check video_id sql_1 = f""" select id from publish_single_video_source where url_unique_md5 = '{video_id}'; """ count_1 = self.db.fetch(query=sql_1) if count_1: print(video_id + " video exists") return True # check title sql_2 = f""" select id from publish_single_video_source where article_title = '{title}'; """ count_2 = self.db.fetch(query=sql_2) if count_2: print(title + " video exists") return True return False def save_each_video(self, video, account_id, account_name): """ download and save each video """ # print(json.dumps(video, ensure_ascii=False, indent=4)) video_id = video['id'] title = video['title'] # judge whether video exists if self.whether_video_exists(video_id, title): return read_cnt = video.get('playcnt', 0) like_cnt = video.get('like_num', 0) publish_timestamp = video['publish_time'] # duration = video['duration'] cover_url = video['poster'] video_url = video['playurl'] # sensitive_flag = video.get('sensitive_flag') video_more_info = video.get('contentcms_intervene_data') if video_more_info: video_category_list = video_more_info.get('category_v2') if video_category_list: video_category = video_category_list[0] else: video_category = None else: video_category = None manual_tags = video.get('manual_tags') video_path = 'static/{}.mp4'.format(video_id) download_path = functions.download_baidu_videos(video_url, video_path) if download_path: oss_path = functions.upload_to_oss(local_video_path=download_path) insert_sql = f""" INSERT INTO publish_single_video_source (content_trace_id, article_title, out_account_id, out_account_name, read_cnt, like_cnt, article_url, cover_url, video_oss_path, publish_timestamp, crawler_timestamp, url_unique_md5, category, tags, platform, source_account) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); """ try: self.db.save( query=insert_sql, params=( "video{}".format(functions.str_to_md5(video_id)), title, account_id, account_name, read_cnt, like_cnt, video_url, cover_url, oss_path, publish_timestamp, int(time.time()), video_id, video_category, json.dumps(manual_tags, ensure_ascii=False) if manual_tags else None, "baidu", 0 ) ) except Exception as e: print(e) else: print(f"download video failed, video_id: {video_id}") def save_video_list(self, account_id, account_name, video_list): """ save video list """ # print(json.dumps(video_list, ensure_ascii=False, indent=4)) for video_obj in tqdm(video_list, desc="save video list"): if video_obj['type'] == 'video': video_id = video_obj['content']['vid'] try: video_detail = baidu_single_video_crawler(video_id) self.save_each_video(video_detail, account_id=account_id, account_name=account_name) except SpiderError as e: print(e) continue else: continue def crawler_each_account(self, account, cursor=None): """ crawler each account response_strategy """ account_id = account['account_id'] max_cursor = account['max_cursor'] if not max_cursor: max_cursor = DEFAULT_CURSOR account_name = account['account_name'] try: response_json = baidu_account_video_crawler(account_id, cursor=cursor) video_list = response_json.get("results", empty_list) if video_list: self.save_video_list( account_id=account_id, account_name=account_name, video_list=video_list ) # check next page has_next_page = response_json.get("has_more", False) if has_next_page: next_cursor = response_json.get("ctime", DEFAULT_CURSOR) if next_cursor < max_cursor: print("No more videos after 2024-01-01") return else: return self.crawler_each_account(account, next_cursor) except SpiderError as e: print(e) return def deal(self): """ deal """ account_list = self.get_account_list() for account in account_list[1:]: self.crawler_each_account(account)