123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282 |
- from __future__ import annotations
- import time
- import traceback
- from pymysql.cursors import DictCursor
- from tqdm import tqdm
- from applications import log
- from applications.const import SohuVideoCrawlerConst
- from applications.db import DatabaseConnector
- from applications.pipeline import scrape_video_entities_process
- from applications.utils import Item
- from applications.utils import str_to_md5
- from applications.utils import insert_into_single_video_source_table
- from cold_start.crawler.sohu import get_video_detail
- from cold_start.crawler.sohu import get_hot_point_videos
- from cold_start.crawler.sohu import get_recommendation_video_list
- from cold_start.crawler.sohu import get_user_homepage_videos
- from config import long_articles_config
- const = SohuVideoCrawlerConst()
- class CrawlerSohuVideos:
- def __init__(self):
- self.db_client = DatabaseConnector(long_articles_config)
- self.db_client.connect()
- def crawler_each_video(self, video_data):
- """
- crawler each video data
- """
- video_item = Item()
- unique_id = f"{const.PLATFORM}-{video_data['id']}"
- # add info into item
- video_item.add("content_trace_id", "video{}".format(str_to_md5(unique_id)))
- video_item.add("url_unique_md5", video_data["id"])
- video_item.add("article_title", video_data["title"])
- video_item.add("out_account_id", video_data["authorId"])
- video_item.add("out_account_name", video_data["authorName"])
- video_item.add("publish_timestamp", video_data["postTime"] / 1000)
- video_item.add("platform", const.PLATFORM)
- video_item.add("article_url", video_data["videoUrl"])
- video_item.add("source_account", const.GET_RECOMMEND_INIT_STATUS)
- video_item.add("crawler_timestamp", int(time.time()))
- # check item before insert
- video_item.check(source="video")
- try:
- item_with_oss_path = scrape_video_entities_process(
- video_item=video_item.item, db_client=self.db_client
- )
- if item_with_oss_path:
- insert_into_single_video_source_table(
- db_client=self.db_client, video_item=item_with_oss_path
- )
- except Exception as e:
- detail = {
- "video_item": video_item.item,
- "error": str(e),
- "traceback": traceback.format_exc(),
- }
- log(
- task="crawler_sohu_videos",
- function="crawler_each_video",
- message="crawler_sohu_videos failed",
- status="failed",
- data=detail,
- )
- class CrawlerSohuHotVideos(CrawlerSohuVideos):
- # process hot video obj to satisfy video item
- def process_hot_video_obj(self, video_obj):
- """
- process hot video obj
- """
- article_url = f"https://www.sohu.com{video_obj['url']}"
- video_detail_response = get_video_detail(article_url=article_url)
- item_obj = {
- "id": video_obj["id"],
- "title": video_obj["title"],
- "authorId": video_detail_response["account_id"],
- "authorName": video_detail_response["account_name"],
- "postTime": video_detail_response["publish_timestamp"],
- "videoUrl": video_detail_response["video_url"],
- }
- self.crawler_each_video(item_obj)
- def deal(self):
- """
- crawler sohu hot videos every day
- """
- hot_point_video_response = get_hot_point_videos()
- hot_point_video_list = hot_point_video_response["data"][
- "tpl-card-feed-pc-data"
- ]["list"]
- for video in tqdm(hot_point_video_list, desc="crawler sohu hot videos"):
- try:
- self.process_hot_video_obj(video)
- except Exception as e:
- log(
- task="crawler_sohu_videos",
- function="crawler_sohu_hot_videos",
- message="crawler_sohu_hot_videos failed",
- status="failed",
- data={
- "error": str(e),
- "traceback": traceback.format_exc(),
- "video": video,
- },
- )
- class CrawlerSohuRecommendVideos(CrawlerSohuVideos):
- def fetch_seed_videos(self) -> list[dict]:
- """
- get seed videos from database
- """
- fetch_query = f"""
- select id, out_account_id, url_unique_md5, article_title, score
- from publish_single_video_source
- where platform = '{const.PLATFORM}'
- and source_account = {const.GET_RECOMMEND_INIT_STATUS}
- and score > {const.GET_RECOMMEND_THRESHOLD_SCORE}
- and audit_status = {const.AUDIT_SUCCESS_STATUS}
- and bad_status = {const.VIDEO_NOT_BAD_STATUS};
- """
- seed_videos = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
- return seed_videos
- def get_each_video_recommendation(self, seed_video: dict) -> None:
- """
- get each video recommendation
- """
- author_id = seed_video["out_account_id"]
- article_id = seed_video["url_unique_md5"]
- outside_url = f"https://www.sohu.com/a/{article_id}_{author_id}"
- for page in const.PAGE_LIST:
- try:
- response = get_recommendation_video_list(
- outside_url, author_id, article_id, page
- )
- if response:
- video_list = response["data"]["recommendVideoFeed"]["list"]
- for video in tqdm(video_list, desc=f"page: {page}"):
- self.crawler_each_video(video)
- except Exception as e:
- log(
- task="crawler_sohu_videos",
- function="get_each_video_recommendation",
- message="get_each_video_recommendation failed",
- status="failed",
- data={
- "error": str(e),
- "traceback": traceback.format_exc(),
- "page": page,
- },
- )
- def update_seed_video_status(self, task_id: int) -> int:
- """
- update seed video status
- """
- update_query = f"""
- update publish_single_video_source set source_account = %s where id = %s and source_account = %s;
- """
- return self.db_client.save(
- query=update_query,
- params=(
- const.GET_RECOMMEND_SUCCESS_STATUS,
- task_id,
- const.GET_RECOMMEND_INIT_STATUS,
- ),
- )
- def deal(self):
- task_list = self.fetch_seed_videos()
- for task in tqdm(task_list):
- try:
- self.get_each_video_recommendation(task)
- self.update_seed_video_status(task_id=task["id"])
- except Exception as e:
- log(
- task="crawler_sohu_videos",
- function="crawler_sohu_hot_videos",
- message="crawler_sohu_hot_videos failed",
- status="failed",
- data={
- "error": str(e),
- "traceback": traceback.format_exc(),
- "video": task,
- },
- )
- class CrawlerSohuUserPageVideos(CrawlerSohuVideos):
- """
- 个人主页文章占比大,账号体系还未建设,本次上线暂时不抓取,后续有需要再考虑
- """
- def get_author_list(self):
- """
- get author list from database
- """
- return []
- def process_each_page(self, response: dict):
- """
- process each page
- """
- video_list = response["data"]["FeedSlideloadAuthor_2_0_pc_1655965929143_data2"][
- "list"
- ]
- for video in tqdm(video_list, desc="crawler sohu user page videos"):
- try:
- self.crawler_each_video(video)
- except Exception as e:
- log(
- task="crawler_sohu_videos",
- function="process_each_page",
- message="crawler_sohu_user_videos failed",
- status="failed",
- data={
- "error": str(e),
- "traceback": traceback.format_exc(),
- "video": video,
- },
- )
- def get_each_user_videos(self, author_id: int):
- """
- get each user videos
- """
- page_list = [i for i in range(1, 2)]
- for page in page_list:
- try:
- response = get_user_homepage_videos(author_id, page)
- self.process_each_page(response)
- except Exception as e:
- log(
- task="crawler_sohu_videos",
- function="get_each_user_videos",
- message="crawler_sohu_user_videos failed",
- status="failed",
- data={
- "error": str(e),
- "traceback": traceback.format_exc(),
- "author_id": author_id,
- "page": page,
- },
- )
- def deal(self):
- author_list = self.get_author_list()
- for author_id in tqdm(author_list, desc="crawler sohu user videos"):
- try:
- self.get_each_user_videos(author_id)
- except Exception as e:
- log(
- task="crawler_sohu_videos",
- function="crawler_sohu_hot_videos",
- message="crawler_sohu_hot_videos failed",
- status="failed",
- data={
- "error": str(e),
- "traceback": traceback.format_exc(),
- "author_od": author_id,
- },
- )
|