123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 |
- from __future__ import annotations
- import time
- import traceback
- from pymysql.cursors import DictCursor
- from tqdm import tqdm
- from applications.db import DatabaseConnector
- from applications.pipeline import scrape_video_entities_process
- from applications.utils import Item
- from applications.utils import str_to_md5
- from applications.utils import insert_into_single_video_source_table
- from config import long_articles_config
- from coldStartTasks.crawler.sohu import get_recommendation_video_list
- class CrawlerSohuRecommendVideos:
- def __init__(self):
- self.db_client = DatabaseConnector(long_articles_config)
- self.db_client.connect()
- self.platform = 'sohu'
- def fetch_seed_videos(self) -> list[dict]:
- """
- get seed videos from database
- """
- fetch_query = f"""
- select id, out_account_id, url_unique_md5, article_title, score
- from publish_single_video_source
- where platform = 'sohu' and source_account = 0 and score > 0.6 and audit_status = 1 and bad_status = 0;
- """
- seed_videos = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
- return seed_videos
- def crawler_each_video(self, video_data):
- """
- crawler each video data
- """
- video_item = Item()
- unique_id = f"{self.platform}-{video_data['id']}"
- # add info into item
- video_item.add("content_trace_id", "video{}".format(str_to_md5(unique_id)))
- video_item.add("url_unique_md5", video_data["id"])
- video_item.add("article_title", video_data["title"])
- video_item.add("out_account_id", video_data['authorId'])
- video_item.add("out_account_name", video_data["authorName"])
- video_item.add("publish_timestamp", video_data["postTime"] / 1000)
- video_item.add("platform", self.platform)
- video_item.add("article_url", video_data["videoUrl"])
- video_item.add("source_account", 0)
- video_item.add("crawler_timestamp", int(time.time()))
- # check item before insert
- video_item.check(source="video")
- try:
- item_with_oss_path = scrape_video_entities_process(
- video_item=video_item.item, db_client=self.db_client
- )
- if item_with_oss_path:
- insert_into_single_video_source_table(
- db_client=self.db_client, video_item=item_with_oss_path
- )
- except Exception as e:
- detail = {
- "video_item": video_item.item,
- "error": str(e),
- "traceback": traceback.format_exc(),
- }
- print(detail)
- def get_each_video_recommendation(self, seed_video: dict) -> None:
- """
- get each video recommendation
- """
- author_id = seed_video["out_account_id"]
- article_id = seed_video["url_unique_md5"]
- outside_url = f"https://www.sohu.com/a/{article_id}_{author_id}"
- page_list = [i for i in range(1, 8)]
- for page in page_list:
- try:
- response = get_recommendation_video_list(outside_url, author_id, article_id, page)
- if response:
- video_list = response['data']['recommendVideoFeed']['list']
- for video in tqdm(video_list):
- self.crawler_each_video(video)
- except Exception as e:
- print(e)
- print(traceback.format_exc())
- continue
- def update_seed_video_status(self, task_id: int) -> int:
- """
- update seed video status
- """
- update_query = f"""
- update publish_single_video_source set source_account = %s where id = %s and source_account = %s;
- """
- return self.db_client.save(
- query=update_query,
- params=(1, task_id, 0)
- )
- def deal(self):
- task_list = self.fetch_seed_videos()
- for task in tqdm(task_list[:1]):
- try:
- self.get_each_video_recommendation(task)
- self.update_seed_video_status(task_id=task["id"])
- except Exception as e:
- print(e)
|