crawler_piaoquan_videos.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. from __future__ import annotations
  2. import json
  3. import time
  4. import traceback
  5. from pymysql.cursors import DictCursor
  6. from tqdm import tqdm
  7. from applications import log
  8. from applications.api import ApolloApi
  9. from applications.api import fetch_piaoquan_video_list_detail
  10. from applications.const.crawler_video_const import CrawlerPiaoQuanVideosConst
  11. from applications.db import DatabaseConnector
  12. from applications.pipeline import scrape_video_entities_process
  13. from applications.utils import Item
  14. from applications.utils import str_to_md5
  15. from applications.utils import insert_into_single_video_source_table
  16. from config import long_articles_config
  17. const = CrawlerPiaoQuanVideosConst()
  18. apollo_api = ApolloApi()
  19. pq_long_articles_category_mapping = json.loads(apollo_api.get_config_value("pq_long_articles_category_mapping"))
  20. class CrawlerPiaoQuanVideos:
  21. def __init__(self):
  22. self.db_client = DatabaseConnector(long_articles_config)
  23. self.db_client.connect()
  24. def get_piaoquan_top_video_list(self) -> list[dict]:
  25. fetch_query = f"""
  26. select id, video_id, title
  27. from {const.PIAOQUAN_TOP_VIDEO_TABLE}
  28. where status = {const.INIT_STATUS};
  29. """
  30. task_list = self.db_client.fetch(fetch_query, cursor_type=DictCursor)
  31. return task_list
  32. def update_piaoquan_top_video_status(
  33. self, pool_id: int, ori_status: int, new_status: int
  34. ) -> int:
  35. update_query = f"""
  36. update {const.PIAOQUAN_TOP_VIDEO_TABLE}
  37. set status = %s
  38. where id = %s and status = %s;
  39. """
  40. return self.db_client.save(update_query, (pool_id, ori_status, new_status))
  41. def crawler_each_video(self, video_data: dict) -> None:
  42. """
  43. crawler each video data
  44. """
  45. # lock video id
  46. lock_acquired = self.update_piaoquan_top_video_status(
  47. pool_id=video_data["id"],
  48. ori_status=const.INIT_STATUS,
  49. new_status=const.PROCESSING_STATUS,
  50. )
  51. if not lock_acquired:
  52. return
  53. # get video detail from piaoquan
  54. response_from_piaoquan = fetch_piaoquan_video_list_detail(
  55. [video_data["video_id"]]
  56. )
  57. video_detail = response_from_piaoquan["data"][0]
  58. video_item = Item()
  59. unique_id = f"{const.PLATFORM}-{video_data['video_id']}"
  60. # add info into item
  61. video_item.add("content_trace_id", "video{}".format(str_to_md5(unique_id)))
  62. video_item.add("url_unique_md5", video_data["video_id"])
  63. video_item.add("article_title", video_data["title"])
  64. video_item.add("out_account_id", video_detail["uid"])
  65. video_item.add("out_account_name", video_data["user"]["nickName"])
  66. video_item.add(
  67. "publish_timestamp", int(video_detail["gmtCreateTimestamp"] / 1000)
  68. )
  69. video_item.add("platform", const.PLATFORM)
  70. video_item.add(
  71. "article_url",
  72. f"https://admin.piaoquantv.com/cms/post-detail/{video_data['video_id']}/detail",
  73. )
  74. video_item.add("source_account", const.NO_SOURCE_ACCOUNT)
  75. video_item.add("crawler_timestamp", int(time.time()))
  76. video_item.add("video_oss_path", video_detail["ossVideoPath"])
  77. video_item.add("audit_status", video_detail["auditStatus"])
  78. category = pq_long_articles_category_mapping.get(video_data["category"])
  79. if category:
  80. video_item.add("category", category)
  81. video_item.add("category_status", const.SUCCESS_STATUS)
  82. # check item before insert
  83. video_item.check(source="video")
  84. try:
  85. item_with_oss_path = scrape_video_entities_process(
  86. video_item=video_item.item, db_client=self.db_client
  87. )
  88. if item_with_oss_path:
  89. insert_into_single_video_source_table(
  90. db_client=self.db_client, video_item=item_with_oss_path
  91. )
  92. except Exception as e:
  93. detail = {
  94. "video_item": video_item.item,
  95. "error": str(e),
  96. "traceback": traceback.format_exc(),
  97. }
  98. log(
  99. task="crawler_piaoquan_videos",
  100. function="crawler_each_video",
  101. message="crawler_piaoquan_videos failed",
  102. status="failed",
  103. data=detail,
  104. )