from __future__ import annotations import json import time import traceback from pymysql.cursors import DictCursor from tqdm import tqdm from applications import log from applications.api import ApolloApi from applications.api import fetch_piaoquan_video_list_detail from applications.const.crawler_video_const import CrawlerPiaoQuanVideosConst from applications.db import DatabaseConnector from applications.pipeline import scrape_video_entities_process from applications.utils import Item from applications.utils import str_to_md5 from applications.utils import insert_into_single_video_source_table from config import long_articles_config const = CrawlerPiaoQuanVideosConst() apollo_api = ApolloApi(env="prod") pq_long_articles_category_mapping = json.loads(apollo_api.get_config_value("pq_long_articles_category_mapping")) class CrawlerPiaoQuanVideos: def __init__(self): self.db_client = DatabaseConnector(long_articles_config) self.db_client.connect() def get_piaoquan_top_video_list(self) -> list[dict]: fetch_query = f""" select id, video_id, title, category from {const.PIAOQUAN_TOP_VIDEO_TABLE} where status = {const.INIT_STATUS}; """ task_list = self.db_client.fetch(fetch_query, cursor_type=DictCursor) return task_list def update_piaoquan_top_video_status( self, pool_id: int, ori_status: int, new_status: int ) -> int: update_query = f""" update {const.PIAOQUAN_TOP_VIDEO_TABLE} set status = %s, status_update_timestamp = %s where id = %s and status = %s; """ return self.db_client.save(update_query, params=(new_status, int(time.time()), pool_id, ori_status)) def rollback_lock_tasks(self) -> int: # roll back lock task which has been locked for more than 1 hour rollback_query = f""" update {const.PIAOQUAN_TOP_VIDEO_TABLE} set status = %s where status = %s and status_update_timestamp < %s; """ return self.db_client.save( rollback_query, (const.INIT_STATUS, const.PROCESSING_STATUS, int(time.time() - const.MAX_LOCK_SECOND)) ) def crawler_each_video(self, video_data: dict) -> None: """ crawler each video data """ # lock video id lock_acquired = self.update_piaoquan_top_video_status( pool_id=video_data["id"], ori_status=const.INIT_STATUS, new_status=const.PROCESSING_STATUS, ) if not lock_acquired: return # get video detail from piaoquan response_from_piaoquan = fetch_piaoquan_video_list_detail( [video_data["video_id"]] ) video_detail = response_from_piaoquan["data"][0] video_item = Item() unique_id = f"{const.PLATFORM}-{video_data['video_id']}" # add info into item video_item.add("content_trace_id", "video{}".format(str_to_md5(unique_id))) video_item.add("url_unique_md5", video_data["video_id"]) video_item.add("article_title", video_data["title"]) video_item.add("out_account_id", video_detail["uid"]) video_item.add("out_account_name", video_detail["user"]["nickName"]) video_item.add("mini_program_title", video_data["title"]) video_item.add("cover_url", video_detail["shareImgPath"]) video_item.add( "publish_timestamp", int(video_detail["gmtCreateTimestamp"] / 1000) ) video_item.add("platform", const.PLATFORM) video_item.add( "article_url", f"https://admin.piaoquantv.com/cms/post-detail/{video_data['video_id']}/detail", ) video_item.add("source_account", const.NO_SOURCE_ACCOUNT) video_item.add("crawler_timestamp", int(time.time())) video_item.add("video_oss_path", video_detail["ossVideoPath"]) # 将视频审核状态设置为审核中, 不影响后续发布流程 video_item.add("audit_status", const.AUDITING_STATUS) video_item.add("audit_video_id", video_data["video_id"]) category = pq_long_articles_category_mapping.get(video_data["category"]) if category: video_item.add("category", category) video_item.add("category_status", const.SUCCESS_STATUS) # check item before insert video_item.check(source="video") try: item_with_oss_path = scrape_video_entities_process( video_item=video_item.item, db_client=self.db_client ) if item_with_oss_path: insert_into_single_video_source_table( db_client=self.db_client, video_item=item_with_oss_path ) self.update_piaoquan_top_video_status( pool_id=video_data["id"], ori_status=const.PROCESSING_STATUS, new_status=const.SUCCESS_STATUS ) else: self.update_piaoquan_top_video_status( pool_id=video_data["id"], ori_status=const.PROCESSING_STATUS, new_status=const.FAIL_STATUS ) except Exception as e: detail = { "video_item": video_item.item, "error": str(e), "traceback": traceback.format_exc(), } self.update_piaoquan_top_video_status( pool_id=video_data["id"], ori_status=const.PROCESSING_STATUS, new_status=const.FAIL_STATUS ) log( task="crawler_piaoquan_videos", function="crawler_each_video", message="crawler_piaoquan_videos failed", status="failed", data=detail, ) def deal(self): # roll back lock task self.rollback_lock_tasks() # get video_list video_list = self.get_piaoquan_top_video_list() for video_data in tqdm(video_list, desc="video_list"): try: self.crawler_each_video(video_data) except Exception as e: log( task="crawler_piaoquan_videos", function="crawler_each_video", message="crawler_piaoquan_videos failed", status="failed", data={ "video_data": video_data, "error": str(e), "traceback": traceback.format_exc(), } )