123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269 |
- """
- @author: luojunhui
- @description: video crawler
- """
- import os
- import json
- import time
- import traceback
- from typing import List, Dict
- from pymysql.cursors import DictCursor
- from tqdm import tqdm
- from applications import Functions
- from applications import bot, log
- from applications.const import BaiduVideoCrawlerConst
- from applications.db import DatabaseConnector
- from applications.exception import SpiderError
- from config import long_articles_config
- from coldStartTasks.crawler.baidu.baidu_spider import baidu_account_video_crawler
- from coldStartTasks.crawler.baidu.baidu_spider import baidu_single_video_crawler
- const = BaiduVideoCrawlerConst()
- empty_list = []
- functions = Functions()
- class BaiduVideoCrawler(object):
- """
- baidu video crawler
- """
- def __init__(self):
- self.db = None
- self.success_crawler_video_count = 0
- self.connect_db()
- def connect_db(self) -> None:
- """
- connect db
- """
- self.db = DatabaseConnector(db_config=long_articles_config)
- self.db.connect()
- def get_account_list(self) -> List[Dict]:
- """
- get account list
- """
- sql = f"""
- select account_id, account_name, max_cursor
- from baidu_account_for_videos
- where status = {const.BAIDU_ACCOUNT_GOOD_STATUS};
- """
- account_list = self.db.fetch(query=sql, cursor_type=DictCursor)
- return account_list
- def whether_video_exists(self, title: str) -> bool:
- """
- whether video exists, use video_id && title
- """
- # check title
- sql = f"""
- select id from publish_single_video_source
- where article_title = %s;
- """
- duplicate_id = self.db.fetch(query=sql, params=(title,))
- if duplicate_id:
- print(title + " video exists")
- return True
- return False
- def save_each_video(self, video: Dict, account_id: str, account_name: str) -> None:
- """
- download and save each video
- """
- # print(json.dumps(video, ensure_ascii=False, indent=4))
- video_id = video["id"]
- title = video["title"]
- # judge whether video exists
- if self.whether_video_exists(title):
- return
- read_cnt = video.get("playcnt", 0)
- like_cnt = video.get("like_num", 0)
- publish_timestamp = video["publish_time"]
- # duration = video['duration']
- cover_url = video["poster"]
- video_url = video["playurl"]
- # sensitive_flag = video.get('sensitive_flag')
- video_more_info = video.get("contentcms_intervene_data")
- if video_more_info:
- video_category_list = video_more_info.get("category_v2")
- if video_category_list:
- video_category = video_category_list[0]
- else:
- video_category = None
- else:
- video_category = None
- manual_tags = video.get("manual_tags")
- video_path = os.path.join(const.LOCAL_PATH_DIR, "{}.mp4".format(video_id))
- download_path = functions.download_baidu_videos(video_url, video_path)
- if download_path:
- oss_path = functions.upload_to_oss(local_video_path=download_path)
- insert_sql = f"""
- INSERT INTO publish_single_video_source
- (content_trace_id, article_title, out_account_id, out_account_name, read_cnt, like_cnt, article_url, cover_url, video_oss_path, publish_timestamp, crawler_timestamp, url_unique_md5, category, tags, platform, source_account)
- values
- (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
- """
- try:
- self.db.save(
- query=insert_sql,
- params=(
- "video{}".format(functions.str_to_md5(video_id)),
- title,
- account_id,
- account_name,
- read_cnt,
- like_cnt,
- video_url,
- cover_url,
- oss_path,
- publish_timestamp,
- int(time.time()),
- video_id,
- video_category,
- (
- json.dumps(manual_tags, ensure_ascii=False)
- if manual_tags
- else None
- ),
- "hksp",
- const.NO_SOURCE_ACCOUNT_STATUS,
- ),
- )
- self.success_crawler_video_count += 1
- except Exception as e:
- log(
- task="baidu_video_crawler",
- function="save_each_video",
- message="save video failed",
- data={
- "error": str(e),
- "traceback": traceback.format_exc(),
- "video_id": video_id,
- "oss_path": oss_path,
- },
- )
- else:
- print(f"download video failed, video_id: {video_id}")
- def save_video_list(
- self, account_id: str, account_name: str, video_list: List[Dict]
- ) -> None:
- """
- save video list
- """
- progress_bar = tqdm(video_list, desc="crawler account: {}".format(account_name))
- for video_obj in progress_bar:
- if video_obj["type"] == "video":
- video_id = video_obj["content"]["vid"]
- try:
- video_detail = baidu_single_video_crawler(video_id)
- self.save_each_video(
- video=video_detail,
- account_id=account_id,
- account_name=account_name,
- )
- progress_bar.set_postfix({"videoId": video_id})
- except SpiderError as e:
- print("save single video fail", e)
- continue
- else:
- continue
- def crawler_each_account(self, account: Dict, cursor=None) -> None:
- """
- crawler each account
- response_strategy
- """
- account_id = account["account_id"]
- max_cursor = account["max_cursor"]
- if not max_cursor:
- max_cursor = const.DEFAULT_CURSOR
- account_name = account["account_name"]
- try:
- response_json = baidu_account_video_crawler(account_id, cursor=cursor)
- video_list = response_json.get("results", empty_list)
- if video_list:
- self.save_video_list(
- account_id=account_id,
- account_name=account_name,
- video_list=video_list,
- )
- # check next page
- has_next_page = response_json.get("has_more", False)
- if has_next_page:
- next_cursor = response_json.get("ctime", const.DEFAULT_CURSOR)
- if next_cursor < max_cursor:
- print("No more videos after 2024-01-01")
- return
- else:
- return self.crawler_each_account(account, next_cursor)
- except SpiderError as e:
- print(e)
- return
- def update_cursor(self, account_id: str) -> None:
- """
- update cursor for each account
- """
- select_sql = f"""
- select max(publish_timestamp) as max_cursor from publish_single_video_source where out_account_id = '{account_id}';
- """
- response_mysql = self.db.fetch(query=select_sql)
- max_publish_timestamp = response_mysql[0][0]
- if max_publish_timestamp:
- max_cursor = max_publish_timestamp * const.TIMESTAMP_TO_CURSOR
- update_sql = f"""
- update baidu_account_for_videos
- set max_cursor = %s
- where account_id = %s;
- """
- self.db.save(query=update_sql, params=(max_cursor, account_id))
- def deal(self) -> None:
- """
- deal
- """
- account_list = self.get_account_list()
- success_cnt = 0
- fail_cnt = 0
- account_list_process_bar = tqdm(account_list, desc="process account list")
- for account in account_list_process_bar:
- try:
- account_list_process_bar.set_postfix(
- {"account_name": account["account_name"]}
- )
- self.crawler_each_account(account)
- self.update_cursor(account["account_id"])
- success_cnt += 1
- except Exception as e:
- fail_cnt += 1
- log(
- task="baidu_video_crawler",
- function="deal",
- message="crawler each account failed",
- data={
- "account_id": account["account_id"],
- "account_name": account["account_name"],
- "error": str(e),
- "trace_back": traceback.format_exc(),
- },
- )
- bot(
- title="baidu video crawler task finished",
- detail={
- "success_crawl_account_num": success_cnt,
- "fail_crawl_account_num": fail_cnt,
- "success_crawl_video_num": self.success_crawler_video_count,
- "success_crawl_account_rate": success_cnt / (success_cnt + fail_cnt),
- },
- mention=False,
- )
|