123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232 |
- """
- @author: luojunhui
- @tool: pycharm && deepseek
- """
- import re
- import os
- import traceback
- import time
- from pymysql.cursors import DictCursor
- from tqdm import tqdm
- from applications import log
- from applications.const import ChannelVideoCrawlerConst
- from applications.db import DatabaseConnector
- from applications.utils import download_sph_video
- from applications.utils import insert_into_single_video_source_table
- from applications.utils import Item
- from applications.utils import str_to_md5
- from applications.utils import upload_to_oss
- from config import long_articles_config
- from coldStartTasks.crawler.channels import get_channel_account_videos
- const = ChannelVideoCrawlerConst()
- class CrawlerChannelAccountVideos:
- """
- crawler channel account videos
- """
- def __init__(self):
- self.db_client = DatabaseConnector(db_config=long_articles_config)
- self.db_client.connect()
- def whether_video_exists(self, title: str) -> bool:
- """
- whether video exists, use video_id && title
- """
- # check title
- sql = f"""
- select id from publish_single_video_source
- where article_title = %s;
- """
- duplicate_id = self.db_client.fetch(query=sql, params=(title,))
- if duplicate_id:
- return True
- return False
- def get_channel_account_list(self) -> list[dict]:
- """
- get channel account list from database
- """
- sql = f"""
- select account_id, max_cursor
- from sph_account_for_videos
- where status = {const.CHANNEL_ACCOUNT_GOOD_STATUS}
- order by max_cursor;"""
- account_list = self.db_client.fetch(query=sql, cursor_type=DictCursor)
- return account_list
- def crawler_each_video(self, video: dict) -> None:
- """
- download each video
- save video and decrypt video
- upload video to oss
- """
- object_desc = video["objectDesc"]
- title = object_desc["description"]
- if self.whether_video_exists(title):
- log(
- task="crawler_channel_account_videos",
- function="crawler_each_video",
- message="video title exists",
- data={"video_id": video["id"], "title": title},
- )
- return
- cleaned_title = re.sub(r"[^\u4e00-\u9fff]", "", title)
- if len(cleaned_title) < const.MIN_TITLE_LENGTH:
- log(
- task="crawler_channel_account_videos",
- function="crawler_each_video",
- message="video title is too short",
- data={"video_id": video["id"], "title": title},
- )
- return
- video_length = video["objectDesc"]["media"][0]["VideoPlayLen"]
- if video_length and int(video_length) > const.MAX_VIDEO_LENGTH:
- log(
- task="crawler_channel_account_videos",
- function="crawler_each_video",
- message="video length is too long",
- data={"video_id": video["id"], "title": title, "length": video_length},
- )
- return
- video_item = Item()
- video_id = video["id"]
- video_item.add("content_trace_id", "video{}".format(str_to_md5(str(video_id))))
- video_item.add("url_unique_md5", video_id)
- video_item.add("article_title", title)
- video_item.add("out_account_id", video["username"])
- video_item.add("out_account_name", video["nickname"])
- video_item.add("publish_timestamp", video["createtime"])
- video_item.add("platform", "sph")
- video_item.add("crawler_timestamp", int(time.time()))
- media = object_desc["media"][0]
- url = media["Url"]
- decode_key = media["decodeKey"]
- url_token = media["urlToken"]
- download_url = url + url_token
- try:
- decrypt_path = download_sph_video(download_url=download_url, key=decode_key)
- oss_path = upload_to_oss(decrypt_path)
- video_item.add("video_oss_path", oss_path)
- video_item.add("source_account", const.NO_SOURCE_ACCOUNT_STATUS)
- video_item.check(source="video")
- insert_into_single_video_source_table(self.db_client, video_item.item)
- os.remove(decrypt_path)
- except Exception as e:
- log(
- task="crawler_channel_account_videos",
- function="crawler_each_video",
- message="download video failed",
- data={
- "error": str(e),
- "traceback": traceback.format_exc(),
- "video_id": video["id"],
- },
- )
- def crawler_each_account(self, channel_account: dict, last_buffer: str = "") -> None:
- """
- 通过循环替代递归,分页爬取频道账号视频
- """
- channel_account_id = channel_account["account_id"]
- max_cursor = channel_account.get("max_cursor") or const.DEFAULT_CURSOR
- current_last_buffer = last_buffer
- has_more = True
- while has_more:
- response = get_channel_account_videos(channel_account_id, last_buffer=current_last_buffer)
- if response["ret"] != 200:
- log(
- task="crawler_channel_account_videos",
- function="crawler_each_video",
- message="get_channel_account_videos failed",
- data={
- "response": response,
- "channel_account_id": channel_account_id,
- "max_cursor": max_cursor,
- },
- )
- break
- response_data = response["data"]
- response_data_type = type(response_data)
- if response_data_type is dict:
- current_last_buffer = response_data.get["lastBuffer"] # 更新分页游标
- has_more = response_data["continueFlag"] # 是否还有下一页
- video_list = response_data["object"]
- elif response_data_type is list:
- has_more = False
- video_list = response_data
- video_list = video_list
- else:
- return
- if not video_list:
- break
- create_timestamp = video_list[0]["createtime"]
- if create_timestamp < max_cursor:
- break
- crawl_video_list_bar = tqdm(video_list, desc="crawl videos")
- for video in crawl_video_list_bar:
- crawl_video_list_bar.set_postfix({"video_id": video["id"]})
- self.crawler_each_video(video)
- if has_more:
- time.sleep(const.SLEEP_SECOND)
- else:
- break
- def update_account_max_cursor(self, account_id: str) -> None:
- """
- update account max cursor
- """
- select_sql = f"""
- select max(publish_timestamp) as max_cursor from publish_single_video_source where out_account_id = '{account_id}';
- """
- response_mysql = self.db_client.fetch(query=select_sql)
- max_publish_timestamp = response_mysql[0][0]
- if max_publish_timestamp:
- update_sql = f"""
- update sph_account_for_videos
- set max_cursor = %s
- where account_id = %s;
- """
- self.db_client.save(
- query=update_sql, params=(max_publish_timestamp, account_id)
- )
- def deal(self):
- """
- deal channel account videos
- """
- account_list = self.get_channel_account_list()
- account_crawler_bar = tqdm(account_list, desc="crawler channel account videos")
- for account in account_crawler_bar:
- try:
- account_crawler_bar.set_postfix({"account_id": account["account_id"]})
- self.crawler_each_account(channel_account=account)
- self.update_account_max_cursor(account["account_id"])
- except Exception as e:
- log(
- task="crawler_channel_account_videos",
- function="deal",
- message="crawler channel account videos failed",
- data={
- "error": str(e),
- "traceback": traceback.format_exc(),
- "account_id": account["account_id"],
- },
- )
|