123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177 |
- from __future__ import annotations
- import json
- import time
- import traceback
- from pymysql.cursors import DictCursor
- from tqdm import tqdm
- from applications import log
- from applications.api import ApolloApi
- from applications.api import fetch_piaoquan_video_list_detail
- from applications.const.crawler_video_const import CrawlerPiaoQuanVideosConst
- from applications.db import DatabaseConnector
- from applications.pipeline import scrape_video_entities_process
- from applications.utils import Item
- from applications.utils import str_to_md5
- from applications.utils import insert_into_single_video_source_table
- from config import long_articles_config
- const = CrawlerPiaoQuanVideosConst()
- apollo_api = ApolloApi(env="prod")
- pq_long_articles_category_mapping = json.loads(apollo_api.get_config_value("pq_long_articles_category_mapping"))
- class CrawlerPiaoQuanVideos:
- def __init__(self):
- self.db_client = DatabaseConnector(long_articles_config)
- self.db_client.connect()
- def get_piaoquan_top_video_list(self) -> list[dict]:
- fetch_query = f"""
- select id, video_id, title, category
- from {const.PIAOQUAN_TOP_VIDEO_TABLE}
- where status = {const.INIT_STATUS};
- """
- task_list = self.db_client.fetch(fetch_query, cursor_type=DictCursor)
- return task_list
- def update_piaoquan_top_video_status(
- self, pool_id: int, ori_status: int, new_status: int
- ) -> int:
- update_query = f"""
- update {const.PIAOQUAN_TOP_VIDEO_TABLE}
- set status = %s, status_update_timestamp = %s
- where id = %s and status = %s;
- """
- return self.db_client.save(update_query, params=(new_status, int(time.time()), pool_id, ori_status))
- def rollback_lock_tasks(self) -> int:
- # roll back lock task which has been locked for more than 1 hour
- rollback_query = f"""
- update {const.PIAOQUAN_TOP_VIDEO_TABLE}
- set status = %s
- where status = %s and status_update_timestamp < %s;
- """
- return self.db_client.save(
- rollback_query,
- (const.INIT_STATUS, const.PROCESSING_STATUS, int(time.time() - const.MAX_LOCK_SECOND))
- )
- def crawler_each_video(self, video_data: dict) -> None:
- """
- crawler each video data
- """
- # lock video id
- lock_acquired = self.update_piaoquan_top_video_status(
- pool_id=video_data["id"],
- ori_status=const.INIT_STATUS,
- new_status=const.PROCESSING_STATUS,
- )
- if not lock_acquired:
- return
- # get video detail from piaoquan
- response_from_piaoquan = fetch_piaoquan_video_list_detail(
- [video_data["video_id"]]
- )
- video_detail = response_from_piaoquan["data"][0]
- video_item = Item()
- unique_id = f"{const.PLATFORM}-{video_data['video_id']}"
- # add info into item
- video_item.add("content_trace_id", "video{}".format(str_to_md5(unique_id)))
- video_item.add("url_unique_md5", video_data["video_id"])
- video_item.add("article_title", video_data["title"])
- video_item.add("out_account_id", video_detail["uid"])
- video_item.add("out_account_name", video_detail["user"]["nickName"])
- video_item.add("mini_program_title", video_data["title"])
- video_item.add("cover_url", video_detail["shareImgPath"])
- video_item.add(
- "publish_timestamp", int(video_detail["gmtCreateTimestamp"] / 1000)
- )
- video_item.add("platform", const.PLATFORM)
- video_item.add(
- "article_url",
- f"https://admin.piaoquantv.com/cms/post-detail/{video_data['video_id']}/detail",
- )
- video_item.add("source_account", const.NO_SOURCE_ACCOUNT)
- video_item.add("crawler_timestamp", int(time.time()))
- video_item.add("video_oss_path", video_detail["ossVideoPath"])
- # 将视频审核状态设置为审核中, 不影响后续发布流程
- video_item.add("audit_status", const.AUDITING_STATUS)
- video_item.add("audit_video_id", video_data["video_id"])
- category = pq_long_articles_category_mapping.get(video_data["category"])
- if category:
- video_item.add("category", category)
- video_item.add("category_status", const.SUCCESS_STATUS)
- # check item before insert
- video_item.check(source="video")
- try:
- item_with_oss_path = scrape_video_entities_process(
- video_item=video_item.item, db_client=self.db_client
- )
- if item_with_oss_path:
- insert_into_single_video_source_table(
- db_client=self.db_client, video_item=item_with_oss_path
- )
- self.update_piaoquan_top_video_status(
- pool_id=video_data["id"],
- ori_status=const.PROCESSING_STATUS,
- new_status=const.SUCCESS_STATUS
- )
- else:
- self.update_piaoquan_top_video_status(
- pool_id=video_data["id"],
- ori_status=const.PROCESSING_STATUS,
- new_status=const.FAIL_STATUS
- )
- except Exception as e:
- detail = {
- "video_item": video_item.item,
- "error": str(e),
- "traceback": traceback.format_exc(),
- }
- self.update_piaoquan_top_video_status(
- pool_id=video_data["id"],
- ori_status=const.PROCESSING_STATUS,
- new_status=const.FAIL_STATUS
- )
- log(
- task="crawler_piaoquan_videos",
- function="crawler_each_video",
- message="crawler_piaoquan_videos failed",
- status="failed",
- data=detail,
- )
- def deal(self):
- # roll back lock task
- self.rollback_lock_tasks()
- # get video_list
- video_list = self.get_piaoquan_top_video_list()
- for video_data in tqdm(video_list, desc="video_list"):
- try:
- self.crawler_each_video(video_data)
- except Exception as e:
- log(
- task="crawler_piaoquan_videos",
- function="crawler_each_video",
- message="crawler_piaoquan_videos failed",
- status="failed",
- data={
- "video_data": video_data,
- "error": str(e),
- "traceback": traceback.format_exc(),
- }
- )
|