123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200 |
- """
- @author: luojunhui
- @description: video crawler
- """
- import json
- import time
- from pymysql.cursors import DictCursor
- from tqdm import tqdm
- from applications import Functions
- from applications.db import DatabaseConnector
- from applications.exception import SpiderError
- from config import long_articles_config
- from coldStartTasks.crawler.baidu.baidu_spider import baidu_account_video_crawler
- from coldStartTasks.crawler.baidu.baidu_spider import baidu_single_video_crawler
- empty_list = []
- functions = Functions()
- DEFAULT_CURSOR = 17040384000000 # 最早时间为2024-01-01 00:00:00
- class BaiduVideoCrawler(object):
- """
- baidu video crawler
- """
- def __init__(self):
- self.db = None
- def connect_db(self):
- """
- connect db
- """
- self.db = DatabaseConnector(db_config=long_articles_config)
- self.db.connect()
- def get_account_list(self):
- """
- get account list
- status = 1 表示正常抓取的账号
- """
- sql = f"""
- select account_id, account_name, latest_crawler_timestamp as max_cursor
- from baidu_account_for_videos
- where status = 1;
- """
- account_list = self.db.fetch(query=sql, cursor_type=DictCursor)
- return account_list
- def whether_video_exists(self, video_id, title):
- """
- whether video exists, use video_id && title
- """
- # first check video_id
- sql_1 = f"""
- select id from publish_single_video_source
- where url_unique_md5 = '{video_id}';
- """
- count_1 = self.db.fetch(query=sql_1)
- if count_1:
- print(video_id + " video exists")
- return True
- # check title
- sql_2 = f"""
- select id from publish_single_video_source
- where article_title = '{title}';
- """
- count_2 = self.db.fetch(query=sql_2)
- if count_2:
- print(title + " video exists")
- return True
- return False
- def save_each_video(self, video, account_id, account_name):
- """
- download and save each video
- """
- # print(json.dumps(video, ensure_ascii=False, indent=4))
- video_id = video['id']
- title = video['title']
- # judge whether video exists
- if self.whether_video_exists(video_id, title):
- return
- read_cnt = video.get('playcnt', 0)
- like_cnt = video.get('like_num', 0)
- publish_timestamp = video['publish_time']
- # duration = video['duration']
- cover_url = video['poster']
- video_url = video['playurl']
- # sensitive_flag = video.get('sensitive_flag')
- video_more_info = video.get('contentcms_intervene_data')
- if video_more_info:
- video_category_list = video_more_info.get('category_v2')
- if video_category_list:
- video_category = video_category_list[0]
- else:
- video_category = None
- else:
- video_category = None
- manual_tags = video.get('manual_tags')
- video_path = 'static/{}.mp4'.format(video_id)
- download_path = functions.download_baidu_videos(video_url, video_path)
- if download_path:
- oss_path = functions.upload_to_oss(local_video_path=download_path)
- insert_sql = f"""
- INSERT INTO publish_single_video_source
- (content_trace_id, article_title, out_account_id, out_account_name, read_cnt, like_cnt, article_url, cover_url, video_oss_path, publish_timestamp, crawler_timestamp, url_unique_md5, category, tags, platform, source_account)
- values
- (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
- """
- try:
- self.db.save(
- query=insert_sql,
- params=(
- "video{}".format(functions.str_to_md5(video_id)),
- title,
- account_id,
- account_name,
- read_cnt,
- like_cnt,
- video_url,
- cover_url,
- oss_path,
- publish_timestamp,
- int(time.time()),
- video_id,
- video_category,
- json.dumps(manual_tags, ensure_ascii=False) if manual_tags else None,
- "baidu",
- 0
- )
- )
- except Exception as e:
- print(e)
- else:
- print(f"download video failed, video_id: {video_id}")
- def save_video_list(self, account_id, account_name, video_list):
- """
- save video list
- """
- # print(json.dumps(video_list, ensure_ascii=False, indent=4))
- for video_obj in tqdm(video_list, desc="save video list"):
- if video_obj['type'] == 'video':
- video_id = video_obj['content']['vid']
- try:
- video_detail = baidu_single_video_crawler(video_id)
- self.save_each_video(video_detail, account_id=account_id, account_name=account_name)
- except SpiderError as e:
- print(e)
- continue
- else:
- continue
- def crawler_each_account(self, account, cursor=None):
- """
- crawler each account
- response_strategy
- """
- account_id = account['account_id']
- max_cursor = account['max_cursor']
- if not max_cursor:
- max_cursor = DEFAULT_CURSOR
- account_name = account['account_name']
- try:
- response_json = baidu_account_video_crawler(account_id, cursor=cursor)
- video_list = response_json.get("results", empty_list)
- if video_list:
- self.save_video_list(
- account_id=account_id,
- account_name=account_name,
- video_list=video_list
- )
- # check next page
- has_next_page = response_json.get("has_more", False)
- if has_next_page:
- next_cursor = response_json.get("ctime", DEFAULT_CURSOR)
- if next_cursor < max_cursor:
- print("No more videos after 2024-01-01")
- return
- else:
- return self.crawler_each_account(account, next_cursor)
- except SpiderError as e:
- print(e)
- return
- def deal(self):
- """
- deal
- """
- account_list = self.get_account_list()
- for account in account_list[1:]:
- self.crawler_each_account(account)
|