|
@@ -2,14 +2,17 @@
|
|
|
@author: luojunhui
|
|
|
@tool: pycharm && deepseek
|
|
|
"""
|
|
|
+
|
|
|
import re
|
|
|
import os
|
|
|
import traceback
|
|
|
import time
|
|
|
|
|
|
+from pymysql.cursors import DictCursor
|
|
|
from tqdm import tqdm
|
|
|
|
|
|
from applications import log
|
|
|
+from applications.const import ChannelVideoCrawlerConst
|
|
|
from applications.db import DatabaseConnector
|
|
|
from applications.utils import download_sph_video
|
|
|
from applications.utils import insert_into_single_video_source_table
|
|
@@ -19,7 +22,7 @@ from applications.utils import upload_to_oss
|
|
|
from config import long_articles_config
|
|
|
from coldStartTasks.crawler.channels import get_channel_account_videos
|
|
|
|
|
|
-NO_SOURCE_ACCOUNT = 0
|
|
|
+const = ChannelVideoCrawlerConst()
|
|
|
|
|
|
|
|
|
class CrawlerChannelAccountVideos:
|
|
@@ -30,7 +33,6 @@ class CrawlerChannelAccountVideos:
|
|
|
def __init__(self):
|
|
|
self.db_client = DatabaseConnector(db_config=long_articles_config)
|
|
|
self.db_client.connect()
|
|
|
- self.success_crawler_video_count = 0
|
|
|
|
|
|
def whether_video_exists(self, title: str) -> bool:
|
|
|
"""
|
|
@@ -51,7 +53,9 @@ class CrawlerChannelAccountVideos:
|
|
|
"""
|
|
|
get channel account list from database
|
|
|
"""
|
|
|
- return
|
|
|
+ sql = f"""select account_id, max_cursor from sph_account_for_videos where status = {const.CHANNEL_ACCOUNT_GOOD_STATUS};"""
|
|
|
+ account_list = self.db_client.fetch(query=sql, cursor_type=DictCursor)
|
|
|
+ return account_list
|
|
|
|
|
|
def crawler_each_video(self, video: dict):
|
|
|
"""
|
|
@@ -66,23 +70,28 @@ class CrawlerChannelAccountVideos:
|
|
|
task="crawler_channel_account_videos",
|
|
|
function="crawler_each_video",
|
|
|
message="video title exists",
|
|
|
- data={"video_id": video["id"], "title": title}
|
|
|
+ data={"video_id": video["id"], "title": title},
|
|
|
)
|
|
|
return
|
|
|
|
|
|
- cleaned_title = re.sub(r'[^\u4e00-\u9fff]', '', title)
|
|
|
- if len(cleaned_title) < 10:
|
|
|
+ cleaned_title = re.sub(r"[^\u4e00-\u9fff]", "", title)
|
|
|
+ if len(cleaned_title) < const.MIN_TITLE_LENGTH:
|
|
|
log(
|
|
|
task="crawler_channel_account_videos",
|
|
|
function="crawler_each_video",
|
|
|
message="video title is too short",
|
|
|
- data={"video_id": video["id"], "title": title}
|
|
|
+ data={"video_id": video["id"], "title": title},
|
|
|
)
|
|
|
return
|
|
|
|
|
|
- video_length = video['objectDesc']['media'][0]['VideoPlayLen']
|
|
|
- if video_length and int(video_length) > 240:
|
|
|
- print("video to large")
|
|
|
+ video_length = video["objectDesc"]["media"][0]["VideoPlayLen"]
|
|
|
+ if video_length and int(video_length) > const.MAX_VIDEO_LENGTH:
|
|
|
+ log(
|
|
|
+ task="crawler_channel_account_videos",
|
|
|
+ function="crawler_each_video",
|
|
|
+ message="video length is too long",
|
|
|
+ data={"video_id": video["id"], "title": title},
|
|
|
+ )
|
|
|
return
|
|
|
|
|
|
video_item = Item()
|
|
@@ -93,7 +102,7 @@ class CrawlerChannelAccountVideos:
|
|
|
video_item.add("out_account_id", video["username"])
|
|
|
video_item.add("out_account_name", video["nickname"])
|
|
|
video_item.add("publish_timestamp", video["createtime"])
|
|
|
- video_item.add("platform", 'sph')
|
|
|
+ video_item.add("platform", "sph")
|
|
|
media = object_desc["media"][0]
|
|
|
url = media["Url"]
|
|
|
decode_key = media["decodeKey"]
|
|
@@ -103,7 +112,7 @@ class CrawlerChannelAccountVideos:
|
|
|
decrypt_path = download_sph_video(download_url=download_url, key=decode_key)
|
|
|
oss_path = upload_to_oss(decrypt_path)
|
|
|
video_item.add("video_oss_path", oss_path)
|
|
|
- video_item.add("source_account", NO_SOURCE_ACCOUNT)
|
|
|
+ video_item.add("source_account", const.NO_SOURCE_ACCOUNT_STATUS)
|
|
|
video_item.check(source="video")
|
|
|
insert_into_single_video_source_table(self.db_client, video_item.item)
|
|
|
os.remove(decrypt_path)
|
|
@@ -119,18 +128,25 @@ class CrawlerChannelAccountVideos:
|
|
|
},
|
|
|
)
|
|
|
|
|
|
- def crawler_each_account(self, channel_account_id: str, channel_account_name: str, last_buffer: str = ""):
|
|
|
+ def crawler_each_account(self, channel_account: dict, last_buffer: str = ""):
|
|
|
"""
|
|
|
get channel account videos
|
|
|
"""
|
|
|
- response = get_channel_account_videos(channel_account_id, last_buffer=last_buffer)
|
|
|
+ channel_account_id = channel_account["account_id"]
|
|
|
+ max_cursor = channel_account["max_cursor"]
|
|
|
+ if not max_cursor:
|
|
|
+ max_cursor = const.DEFAULT_CURSOR
|
|
|
+
|
|
|
+ response = get_channel_account_videos(
|
|
|
+ channel_account_id, last_buffer=last_buffer
|
|
|
+ )
|
|
|
if response["ret"] == 200:
|
|
|
response_data = response["data"]
|
|
|
last_buffer = response_data["lastBuffer"]
|
|
|
continue_flag = response_data["continueFlag"]
|
|
|
video_list = response_data["object"]
|
|
|
- create_timestamp = video_list[0]['createtime']
|
|
|
- if create_timestamp < 1704038400:
|
|
|
+ create_timestamp = video_list[0]["createtime"]
|
|
|
+ if create_timestamp < max_cursor:
|
|
|
return
|
|
|
|
|
|
crawl_video_list_bar = tqdm(video_list, desc="crawl videos")
|
|
@@ -139,11 +155,64 @@ class CrawlerChannelAccountVideos:
|
|
|
self.crawler_each_video(video)
|
|
|
|
|
|
if continue_flag:
|
|
|
- time.sleep(1)
|
|
|
- return self.crawler_each_account(channel_account_id, channel_account_name, last_buffer)
|
|
|
+ time.sleep(const.SLEEP_SECOND)
|
|
|
+ return self.crawler_each_account(channel_account_id, last_buffer)
|
|
|
else:
|
|
|
return
|
|
|
|
|
|
else:
|
|
|
- print(f"crawler channel account {channel_account_name} videos failed")
|
|
|
+ log(
|
|
|
+ task="crawler_channel_account_videos",
|
|
|
+ function="crawler_each_video",
|
|
|
+ message="get_channel_account_videos failed",
|
|
|
+ data={
|
|
|
+ "response": response,
|
|
|
+ "channel_account_id": channel_account_id,
|
|
|
+ "max_cursor": max_cursor,
|
|
|
+ },
|
|
|
+ )
|
|
|
return
|
|
|
+
|
|
|
+ def update_account_max_cursor(self, account_id):
|
|
|
+ """
|
|
|
+ update account max cursor
|
|
|
+ """
|
|
|
+ select_sql = f"""
|
|
|
+ select max(publish_timestamp) as max_cursor from publish_single_video_source where out_account_id = '{account_id}';
|
|
|
+ """
|
|
|
+ response_mysql = self.db_client.fetch(query=select_sql)
|
|
|
+ max_publish_timestamp = response_mysql[0][0]
|
|
|
+
|
|
|
+ if max_publish_timestamp:
|
|
|
+ update_sql = f"""
|
|
|
+ update sph_account_for_videos
|
|
|
+ set max_cursor = %s
|
|
|
+ where account_id = %s;
|
|
|
+ """
|
|
|
+ self.db_client.save(
|
|
|
+ query=update_sql, params=(max_publish_timestamp, account_id)
|
|
|
+ )
|
|
|
+
|
|
|
+ def deal(self):
|
|
|
+ """
|
|
|
+ deal channel account videos
|
|
|
+ """
|
|
|
+ account_list = self.get_channel_account_list()
|
|
|
+ account_crawler_bar = tqdm(account_list, desc="crawler channel account videos")
|
|
|
+ for account in account_crawler_bar:
|
|
|
+ try:
|
|
|
+ account_crawler_bar.set_postfix({"account_id": account["account_id"]})
|
|
|
+ self.crawler_each_account(channel_account=account)
|
|
|
+ self.update_account_max_cursor(account["account_id"])
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ log(
|
|
|
+ task="crawler_channel_account_videos",
|
|
|
+ function="deal",
|
|
|
+ message="crawler channel account videos failed",
|
|
|
+ data={
|
|
|
+ "error": str(e),
|
|
|
+ "traceback": traceback.format_exc(),
|
|
|
+ "account_id": account["account_id"],
|
|
|
+ },
|
|
|
+ )
|