""" @author: luojunhui @tool: pycharm && deepseek """ import re import os import traceback import time from pymysql.cursors import DictCursor from tqdm import tqdm from applications import log from applications.const import ChannelVideoCrawlerConst from applications.db import DatabaseConnector from applications.utils import download_sph_video from applications.utils import insert_into_single_video_source_table from applications.utils import Item from applications.utils import str_to_md5 from applications.utils import upload_to_oss from config import long_articles_config from coldStartTasks.crawler.channels import get_channel_account_videos const = ChannelVideoCrawlerConst() class CrawlerChannelAccountVideos: """ crawler channel account videos """ def __init__(self): self.db_client = DatabaseConnector(db_config=long_articles_config) self.db_client.connect() def whether_video_exists(self, title: str) -> bool: """ whether video exists, use video_id && title """ # check title sql = f""" select id from publish_single_video_source where article_title = %s; """ duplicate_id = self.db_client.fetch(query=sql, params=(title,)) if duplicate_id: return True return False def get_channel_account_list(self) -> list[dict]: """ get channel account list from database """ sql = f""" select account_id, max_cursor from sph_account_for_videos where status = {const.CHANNEL_ACCOUNT_GOOD_STATUS} order by max_cursor;""" account_list = self.db_client.fetch(query=sql, cursor_type=DictCursor) return account_list def crawler_each_video(self, video: dict) -> None: """ download each video save video and decrypt video upload video to oss """ object_desc = video["objectDesc"] title = object_desc["description"] if self.whether_video_exists(title): log( task="crawler_channel_account_videos", function="crawler_each_video", message="video title exists", data={"video_id": video["id"], "title": title}, ) return cleaned_title = re.sub(r"[^\u4e00-\u9fff]", "", title) if len(cleaned_title) < const.MIN_TITLE_LENGTH: log( task="crawler_channel_account_videos", function="crawler_each_video", message="video title is too short", data={"video_id": video["id"], "title": title}, ) return video_length = video["objectDesc"]["media"][0]["VideoPlayLen"] if video_length and int(video_length) > const.MAX_VIDEO_LENGTH: log( task="crawler_channel_account_videos", function="crawler_each_video", message="video length is too long", data={"video_id": video["id"], "title": title, "length": video_length}, ) return video_item = Item() video_id = video["id"] video_item.add("content_trace_id", "video{}".format(str_to_md5(str(video_id)))) video_item.add("url_unique_md5", video_id) video_item.add("article_title", title) video_item.add("out_account_id", video["username"]) video_item.add("out_account_name", video["nickname"]) video_item.add("publish_timestamp", video["createtime"]) video_item.add("platform", "sph") video_item.add("crawler_timestamp", int(time.time())) media = object_desc["media"][0] url = media["Url"] decode_key = media["decodeKey"] url_token = media["urlToken"] download_url = url + url_token try: decrypt_path = download_sph_video(download_url=download_url, key=decode_key) oss_path = upload_to_oss(decrypt_path) video_item.add("video_oss_path", oss_path) video_item.add("source_account", const.NO_SOURCE_ACCOUNT_STATUS) video_item.check(source="video") insert_into_single_video_source_table(self.db_client, video_item.item) os.remove(decrypt_path) except Exception as e: log( task="crawler_channel_account_videos", function="crawler_each_video", message="download video failed", data={ "error": str(e), "traceback": traceback.format_exc(), "video_id": video["id"], }, ) def crawler_each_account(self, channel_account: dict, last_buffer: str = "") -> None: """ 通过循环替代递归,分页爬取频道账号视频 """ channel_account_id = channel_account["account_id"] max_cursor = channel_account.get("max_cursor") or const.DEFAULT_CURSOR current_last_buffer = last_buffer has_more = True while has_more: response = get_channel_account_videos(channel_account_id, last_buffer=current_last_buffer) if response["ret"] != 200: log( task="crawler_channel_account_videos", function="crawler_each_video", message="get_channel_account_videos failed", data={ "response": response, "channel_account_id": channel_account_id, "max_cursor": max_cursor, }, ) break response_data = response["data"] response_data_type = type(response_data) if response_data_type is dict: current_last_buffer = response_data.get["lastBuffer"] # 更新分页游标 has_more = response_data["continueFlag"] # 是否还有下一页 video_list = response_data["object"] elif response_data_type is list: has_more = False video_list = response_data video_list = video_list else: return if not video_list: break create_timestamp = video_list[0]["createtime"] if create_timestamp < max_cursor: break crawl_video_list_bar = tqdm(video_list, desc="crawl videos") for video in crawl_video_list_bar: crawl_video_list_bar.set_postfix({"video_id": video["id"]}) self.crawler_each_video(video) if has_more: time.sleep(const.SLEEP_SECOND) else: break def update_account_max_cursor(self, account_id: str) -> None: """ update account max cursor """ select_sql = f""" select max(publish_timestamp) as max_cursor from publish_single_video_source where out_account_id = '{account_id}'; """ response_mysql = self.db_client.fetch(query=select_sql) max_publish_timestamp = response_mysql[0][0] if max_publish_timestamp: update_sql = f""" update sph_account_for_videos set max_cursor = %s where account_id = %s; """ self.db_client.save( query=update_sql, params=(max_publish_timestamp, account_id) ) def deal(self): """ deal channel account videos """ account_list = self.get_channel_account_list() account_crawler_bar = tqdm(account_list, desc="crawler channel account videos") for account in account_crawler_bar: try: account_crawler_bar.set_postfix({"account_id": account["account_id"]}) self.crawler_each_account(channel_account=account) self.update_account_max_cursor(account["account_id"]) except Exception as e: log( task="crawler_channel_account_videos", function="deal", message="crawler channel account videos failed", data={ "error": str(e), "traceback": traceback.format_exc(), "account_id": account["account_id"], }, )