|
@@ -0,0 +1,219 @@
|
|
|
+"""
|
|
|
+@author: luojunhui
|
|
|
+@tool: pycharm && deepseek
|
|
|
+"""
|
|
|
+
|
|
|
+import re
|
|
|
+import os
|
|
|
+import traceback
|
|
|
+import time
|
|
|
+
|
|
|
+from pymysql.cursors import DictCursor
|
|
|
+from tqdm import tqdm
|
|
|
+
|
|
|
+from applications import log
|
|
|
+from applications.const import ChannelVideoCrawlerConst
|
|
|
+from applications.db import DatabaseConnector
|
|
|
+from applications.utils import download_sph_video
|
|
|
+from applications.utils import insert_into_single_video_source_table
|
|
|
+from applications.utils import Item
|
|
|
+from applications.utils import str_to_md5
|
|
|
+from applications.utils import upload_to_oss
|
|
|
+from config import long_articles_config
|
|
|
+from coldStartTasks.crawler.channels import get_channel_account_videos
|
|
|
+
|
|
|
+const = ChannelVideoCrawlerConst()
|
|
|
+
|
|
|
+
|
|
|
+class CrawlerChannelAccountVideos:
|
|
|
+ """
|
|
|
+ crawler channel account videos
|
|
|
+ """
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ self.db_client = DatabaseConnector(db_config=long_articles_config)
|
|
|
+ self.db_client.connect()
|
|
|
+
|
|
|
+ def whether_video_exists(self, title: str) -> bool:
|
|
|
+ """
|
|
|
+ whether video exists, use video_id && title
|
|
|
+ """
|
|
|
+ # check title
|
|
|
+ sql = f"""
|
|
|
+ select id from publish_single_video_source
|
|
|
+ where article_title = %s;
|
|
|
+ """
|
|
|
+ duplicate_id = self.db_client.fetch(query=sql, params=(title,))
|
|
|
+ if duplicate_id:
|
|
|
+ return True
|
|
|
+
|
|
|
+ return False
|
|
|
+
|
|
|
+ def get_channel_account_list(self) -> list[dict]:
|
|
|
+ """
|
|
|
+ get channel account list from database
|
|
|
+ """
|
|
|
+ sql = f"""select account_id, max_cursor from sph_account_for_videos where status = {const.CHANNEL_ACCOUNT_GOOD_STATUS};"""
|
|
|
+ account_list = self.db_client.fetch(query=sql, cursor_type=DictCursor)
|
|
|
+ return account_list
|
|
|
+
|
|
|
+ def crawler_each_video(self, video: dict) -> None:
|
|
|
+ """
|
|
|
+ download each video
|
|
|
+ save video and decrypt video
|
|
|
+ upload video to oss
|
|
|
+ """
|
|
|
+ object_desc = video["objectDesc"]
|
|
|
+ title = object_desc["description"]
|
|
|
+ if self.whether_video_exists(title):
|
|
|
+ log(
|
|
|
+ task="crawler_channel_account_videos",
|
|
|
+ function="crawler_each_video",
|
|
|
+ message="video title exists",
|
|
|
+ data={"video_id": video["id"], "title": title},
|
|
|
+ )
|
|
|
+ return
|
|
|
+
|
|
|
+ cleaned_title = re.sub(r"[^\u4e00-\u9fff]", "", title)
|
|
|
+ if len(cleaned_title) < const.MIN_TITLE_LENGTH:
|
|
|
+ log(
|
|
|
+ task="crawler_channel_account_videos",
|
|
|
+ function="crawler_each_video",
|
|
|
+ message="video title is too short",
|
|
|
+ data={"video_id": video["id"], "title": title},
|
|
|
+ )
|
|
|
+ return
|
|
|
+
|
|
|
+ video_length = video["objectDesc"]["media"][0]["VideoPlayLen"]
|
|
|
+ if video_length and int(video_length) > const.MAX_VIDEO_LENGTH:
|
|
|
+ log(
|
|
|
+ task="crawler_channel_account_videos",
|
|
|
+ function="crawler_each_video",
|
|
|
+ message="video length is too long",
|
|
|
+ data={"video_id": video["id"], "title": title, "length": video_length},
|
|
|
+ )
|
|
|
+ return
|
|
|
+
|
|
|
+ video_item = Item()
|
|
|
+ video_id = video["id"]
|
|
|
+ video_item.add("content_trace_id", "video{}".format(str_to_md5(str(video_id))))
|
|
|
+ video_item.add("url_unique_md5", video_id)
|
|
|
+ video_item.add("article_title", title)
|
|
|
+ video_item.add("out_account_id", video["username"])
|
|
|
+ video_item.add("out_account_name", video["nickname"])
|
|
|
+ video_item.add("publish_timestamp", video["createtime"])
|
|
|
+ video_item.add("platform", "sph")
|
|
|
+ media = object_desc["media"][0]
|
|
|
+ url = media["Url"]
|
|
|
+ decode_key = media["decodeKey"]
|
|
|
+ url_token = media["urlToken"]
|
|
|
+ download_url = url + url_token
|
|
|
+ try:
|
|
|
+ decrypt_path = download_sph_video(download_url=download_url, key=decode_key)
|
|
|
+ oss_path = upload_to_oss(decrypt_path)
|
|
|
+ video_item.add("video_oss_path", oss_path)
|
|
|
+ video_item.add("source_account", const.NO_SOURCE_ACCOUNT_STATUS)
|
|
|
+ video_item.check(source="video")
|
|
|
+ insert_into_single_video_source_table(self.db_client, video_item.item)
|
|
|
+ os.remove(decrypt_path)
|
|
|
+ except Exception as e:
|
|
|
+ log(
|
|
|
+ task="crawler_channel_account_videos",
|
|
|
+ function="crawler_each_video",
|
|
|
+ message="download video failed",
|
|
|
+ data={
|
|
|
+ "error": str(e),
|
|
|
+ "traceback": traceback.format_exc(),
|
|
|
+ "video_id": video["id"],
|
|
|
+ },
|
|
|
+ )
|
|
|
+
|
|
|
+ def crawler_each_account(self, channel_account: dict, last_buffer: str = "") -> None:
|
|
|
+ """
|
|
|
+ 通过循环替代递归,分页爬取频道账号视频
|
|
|
+ """
|
|
|
+ channel_account_id = channel_account["account_id"]
|
|
|
+ max_cursor = channel_account.get("max_cursor") or const.DEFAULT_CURSOR
|
|
|
+ current_last_buffer = last_buffer
|
|
|
+ has_more = True
|
|
|
+
|
|
|
+ while has_more:
|
|
|
+ response = get_channel_account_videos(channel_account_id, last_buffer=current_last_buffer)
|
|
|
+ if response["ret"] != 200:
|
|
|
+ log(
|
|
|
+ task="crawler_channel_account_videos",
|
|
|
+ function="crawler_each_video",
|
|
|
+ message="get_channel_account_videos failed",
|
|
|
+ data={
|
|
|
+ "response": response,
|
|
|
+ "channel_account_id": channel_account_id,
|
|
|
+ "max_cursor": max_cursor,
|
|
|
+ },
|
|
|
+ )
|
|
|
+ break
|
|
|
+
|
|
|
+ response_data = response["data"]
|
|
|
+ current_last_buffer = response_data["lastBuffer"] # 更新分页游标
|
|
|
+ has_more = response_data["continueFlag"] # 是否还有下一页
|
|
|
+ video_list = response_data["object"]
|
|
|
+
|
|
|
+ if not video_list:
|
|
|
+ break
|
|
|
+
|
|
|
+ create_timestamp = video_list[0]["createtime"]
|
|
|
+ if create_timestamp < max_cursor:
|
|
|
+ break
|
|
|
+
|
|
|
+ crawl_video_list_bar = tqdm(video_list, desc="crawl videos")
|
|
|
+ for video in crawl_video_list_bar:
|
|
|
+ crawl_video_list_bar.set_postfix({"video_id": video["id"]})
|
|
|
+ self.crawler_each_video(video)
|
|
|
+
|
|
|
+ if has_more:
|
|
|
+ time.sleep(const.SLEEP_SECOND)
|
|
|
+ else:
|
|
|
+ break
|
|
|
+
|
|
|
+ def update_account_max_cursor(self, account_id: str) -> None:
|
|
|
+ """
|
|
|
+ update account max cursor
|
|
|
+ """
|
|
|
+ select_sql = f"""
|
|
|
+ select max(publish_timestamp) as max_cursor from publish_single_video_source where out_account_id = '{account_id}';
|
|
|
+ """
|
|
|
+ response_mysql = self.db_client.fetch(query=select_sql)
|
|
|
+ max_publish_timestamp = response_mysql[0][0]
|
|
|
+
|
|
|
+ if max_publish_timestamp:
|
|
|
+ update_sql = f"""
|
|
|
+ update sph_account_for_videos
|
|
|
+ set max_cursor = %s
|
|
|
+ where account_id = %s;
|
|
|
+ """
|
|
|
+ self.db_client.save(
|
|
|
+ query=update_sql, params=(max_publish_timestamp, account_id)
|
|
|
+ )
|
|
|
+
|
|
|
+ def deal(self):
|
|
|
+ """
|
|
|
+ deal channel account videos
|
|
|
+ """
|
|
|
+ account_list = self.get_channel_account_list()
|
|
|
+ account_crawler_bar = tqdm(account_list, desc="crawler channel account videos")
|
|
|
+ for account in account_crawler_bar:
|
|
|
+ try:
|
|
|
+ account_crawler_bar.set_postfix({"account_id": account["account_id"]})
|
|
|
+ self.crawler_each_account(channel_account=account)
|
|
|
+ self.update_account_max_cursor(account["account_id"])
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ log(
|
|
|
+ task="crawler_channel_account_videos",
|
|
|
+ function="deal",
|
|
|
+ message="crawler channel account videos failed",
|
|
|
+ data={
|
|
|
+ "error": str(e),
|
|
|
+ "traceback": traceback.format_exc(),
|
|
|
+ "account_id": account["account_id"],
|
|
|
+ },
|
|
|
+ )
|