123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127 |
- """
- @author: luojunhui
- @tool: pycharm && deepseek
- """
- import os
- import traceback
- import time
- from tqdm import tqdm
- from applications import log
- from applications.db import DatabaseConnector
- from applications.utils import download_sph_video
- from applications.utils import insert_into_single_video_source_table
- from applications.utils import Item
- from applications.utils import str_to_md5
- from applications.utils import upload_to_oss
- from config import long_articles_config
- from coldStartTasks.crawler.channels import get_channel_account_videos
- NO_SOURCE_ACCOUNT = 0
- class CrawlerChannelAccountVideos:
- """
- crawler channel account videos
- """
- def __init__(self):
- self.db_client = DatabaseConnector(db_config=long_articles_config)
- self.db_client.connect()
- self.success_crawler_video_count = 0
- def whether_video_exists(self, title: str) -> bool:
- """
- whether video exists, use video_id && title
- """
- # check title
- sql = f"""
- select id from publish_single_video_source
- where article_title = %s;
- """
- duplicate_id = self.db_client.fetch(query=sql, params=(title,))
- if duplicate_id:
- return True
- return False
- def get_channel_account_list(self):
- """
- get channel account list from database
- """
- return
- def crawler_each_video(self, video: dict):
- """
- download each video
- save video and decrypt video
- upload video to oss
- """
- object_desc = video["objectDesc"]
- title = object_desc["description"]
- if self.whether_video_exists(title):
- return
- video_item = Item()
- video_id = video["id"]
- video_item.add("content_trace_id", "video{}".format(str_to_md5(str(video_id))))
- video_item.add("url_unique_md5", video_id)
- video_item.add("article_title", title)
- video_item.add("out_account_id", video["username"])
- video_item.add("out_account_name", video["nickname"])
- video_item.add("publish_timestamp", video["createtime"])
- media = object_desc["media"][0]
- url = media["Url"]
- decode_key = media["decodeKey"]
- url_token = media["urlToken"]
- download_url = url + url_token
- try:
- decrypt_path = download_sph_video(download_url=download_url, key=decode_key)
- oss_path = upload_to_oss(decrypt_path)
- video_item.add("video_oss_path", oss_path)
- video_item.add("source_account", NO_SOURCE_ACCOUNT)
- video_item.check(source="video")
- insert_into_single_video_source_table(self.db_client, video_item.item)
- os.remove(decrypt_path)
- except Exception as e:
- log(
- task="crawler_channel_account_videos",
- function="crawler_each_video",
- message="download video failed",
- data={
- "error": str(e),
- "traceback": traceback.format_exc(),
- "video_id": video["id"],
- },
- )
- def crawler_each_account(self, channel_account_id: str, channel_account_name: str, last_buffer: str = ""):
- """
- get channel account videos
- """
- response = get_channel_account_videos(channel_account_id, last_buffer=last_buffer)
- if response["ret"] == 200:
- response_data = response["data"]
- last_buffer = response_data["lastBuffer"]
- continue_flag = response_data["continueFlag"]
- print("last_buffer: ", last_buffer)
- video_list = response_data["object"]
- create_timestamp = video_list[0]['createtime']
- if create_timestamp < 1704038400:
- return
- crawl_video_list_bar = tqdm(video_list, desc="crawl videos")
- for video in crawl_video_list_bar:
- crawl_video_list_bar.set_postfix({"video_id": video["id"]})
- self.crawler_each_video(video)
- if continue_flag:
- time.sleep(1)
- return self.crawler_each_account(channel_account_id, channel_account_name, last_buffer)
- else:
- return
- else:
- print(f"crawler channel account {channel_account_name} videos failed")
- return
|