""" @author: luojunhui @tool: pycharm && deepseek """ import os import traceback from tqdm import tqdm from applications import log from applications.db import DatabaseConnector from applications.utils import download_sph_video from applications.utils import insert_into_single_video_source_table from applications.utils import Item from applications.utils import str_to_md5 from applications.utils import upload_to_oss from config import long_articles_config from coldStartTasks.crawler.channels import get_channel_account_videos NO_SOURCE_ACCOUNT = 0 class CrawlerChannelAccountVideos: """ crawler channel account videos """ def __init__(self): self.db_client = DatabaseConnector(db_config=long_articles_config) self.db_client.connect() self.success_crawler_video_count = 0 def whether_video_exists(self, title: str) -> bool: """ whether video exists, use video_id && title """ # check title sql = f""" select id from publish_single_video_source where article_title = %s; """ duplicate_id = self.db_client.fetch(query=sql, params=(title,)) if duplicate_id: return True return False def get_channel_account_list(self): """ get channel account list from database """ return def crawler_each_video(self, video: dict): """ download each video save video and decrypt video upload video to oss """ object_desc = video["objectDesc"] title = object_desc["description"] if self.whether_video_exists(title): return video_item = Item() video_id = video["id"] video_item.add("content_trace_id", "video{}".format(str_to_md5(str(video_id)))) video_item.add("url_unique_md5", video_id) video_item.add("article_title", title) video_item.add("out_account_id", video["username"]) video_item.add("out_account_name", video["nickname"]) video_item.add("publish_timestamp", video["createtime"]) media = object_desc["media"][0] url = media["Url"] decode_key = media["decodeKey"] url_token = media["urlToken"] download_url = url + url_token try: decrypt_path = download_sph_video(download_url=download_url, key=decode_key) oss_path = upload_to_oss(decrypt_path) video_item.add("video_oss_path", oss_path) video_item.add("source_account", NO_SOURCE_ACCOUNT) video_item.check(source="video") insert_into_single_video_source_table(self.db_client, video_item.item) os.remove(decrypt_path) except Exception as e: log( task="crawler_channel_account_videos", function="crawler_each_video", message="download video failed", data={ "error": str(e), "traceback": traceback.format_exc(), "video_id": video["id"], }, ) def crawler_each_account(self, channel_account_id: str, channel_account_name: str): """ get channel account videos """ response = get_channel_account_videos(channel_account_id) if response["ret"] == 200: response_data = response["data"] last_buffer = response_data["lastBuffer"] continue_flag = response_data["continueFlag"] video_list = response_data["object"] crawl_video_list_bar = tqdm(video_list, desc="crawl videos") for video in crawl_video_list_bar: crawl_video_list_bar.set_postfix({"video_id": video["id"]}) self.crawler_each_video(video) else: print(f"crawler channel account {channel_account_name} videos failed") return