crawler_piaoquan_videos.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. from __future__ import annotations
  2. import json
  3. import time
  4. import traceback
  5. from pymysql.cursors import DictCursor
  6. from tqdm import tqdm
  7. from applications import log
  8. from applications.api import ApolloApi
  9. from applications.api import fetch_piaoquan_video_list_detail
  10. from applications.const.crawler_video_const import CrawlerPiaoQuanVideosConst
  11. from applications.db import DatabaseConnector
  12. from applications.pipeline import scrape_video_entities_process
  13. from applications.utils import Item
  14. from applications.utils import str_to_md5
  15. from applications.utils import insert_into_single_video_source_table
  16. from config import long_articles_config
  17. const = CrawlerPiaoQuanVideosConst()
  18. apollo_api = ApolloApi(env="prod")
  19. pq_long_articles_category_mapping = json.loads(apollo_api.get_config_value("pq_long_articles_category_mapping"))
  20. class CrawlerPiaoQuanVideos:
  21. def __init__(self):
  22. self.db_client = DatabaseConnector(long_articles_config)
  23. self.db_client.connect()
  24. def get_piaoquan_top_video_list(self) -> list[dict]:
  25. fetch_query = f"""
  26. select id, video_id, title, category
  27. from {const.PIAOQUAN_TOP_VIDEO_TABLE}
  28. where status = {const.INIT_STATUS};
  29. """
  30. task_list = self.db_client.fetch(fetch_query, cursor_type=DictCursor)
  31. return task_list
  32. def update_piaoquan_top_video_status(
  33. self, pool_id: int, ori_status: int, new_status: int
  34. ) -> int:
  35. update_query = f"""
  36. update {const.PIAOQUAN_TOP_VIDEO_TABLE}
  37. set status = %s, status_update_timestamp = %s
  38. where id = %s and status = %s;
  39. """
  40. return self.db_client.save(update_query, params=(new_status, int(time.time()), pool_id, ori_status))
  41. def rollback_lock_tasks(self) -> int:
  42. # roll back lock task which has been locked for more than 1 hour
  43. rollback_query = f"""
  44. update {const.PIAOQUAN_TOP_VIDEO_TABLE}
  45. set status = %s
  46. where status = %s and status_update_timestamp < %s;
  47. """
  48. return self.db_client.save(
  49. rollback_query,
  50. (const.INIT_STATUS, const.PROCESSING_STATUS, int(time.time() - const.MAX_LOCK_SECOND))
  51. )
  52. def crawler_each_video(self, video_data: dict) -> None:
  53. """
  54. crawler each video data
  55. """
  56. # lock video id
  57. lock_acquired = self.update_piaoquan_top_video_status(
  58. pool_id=video_data["id"],
  59. ori_status=const.INIT_STATUS,
  60. new_status=const.PROCESSING_STATUS,
  61. )
  62. if not lock_acquired:
  63. return
  64. # get video detail from piaoquan
  65. response_from_piaoquan = fetch_piaoquan_video_list_detail(
  66. [video_data["video_id"]]
  67. )
  68. video_detail = response_from_piaoquan["data"][0]
  69. video_item = Item()
  70. unique_id = f"{const.PLATFORM}-{video_data['video_id']}"
  71. # add info into item
  72. video_item.add("content_trace_id", "video{}".format(str_to_md5(unique_id)))
  73. video_item.add("url_unique_md5", video_data["video_id"])
  74. video_item.add("article_title", video_data["title"])
  75. video_item.add("out_account_id", video_detail["uid"])
  76. video_item.add("out_account_name", video_detail["user"]["nickName"])
  77. video_item.add("mini_program_title", video_data["title"])
  78. video_item.add("cover_url", video_detail["shareImgPath"])
  79. video_item.add(
  80. "publish_timestamp", int(video_detail["gmtCreateTimestamp"] / 1000)
  81. )
  82. video_item.add("platform", const.PLATFORM)
  83. video_item.add(
  84. "article_url",
  85. f"https://admin.piaoquantv.com/cms/post-detail/{video_data['video_id']}/detail",
  86. )
  87. video_item.add("source_account", const.NO_SOURCE_ACCOUNT)
  88. video_item.add("crawler_timestamp", int(time.time()))
  89. video_item.add("video_oss_path", video_detail["ossVideoPath"])
  90. # 将视频审核状态设置为审核中, 不影响后续发布流程
  91. video_item.add("audit_status", const.AUDITING_STATUS)
  92. video_item.add("audit_video_id", video_data["video_id"])
  93. category = pq_long_articles_category_mapping.get(video_data["category"])
  94. if category:
  95. video_item.add("category", category)
  96. video_item.add("category_status", const.SUCCESS_STATUS)
  97. # check item before insert
  98. video_item.check(source="video")
  99. try:
  100. item_with_oss_path = scrape_video_entities_process(
  101. video_item=video_item.item, db_client=self.db_client
  102. )
  103. if item_with_oss_path:
  104. insert_into_single_video_source_table(
  105. db_client=self.db_client, video_item=item_with_oss_path
  106. )
  107. self.update_piaoquan_top_video_status(
  108. pool_id=video_data["id"],
  109. ori_status=const.PROCESSING_STATUS,
  110. new_status=const.SUCCESS_STATUS
  111. )
  112. else:
  113. self.update_piaoquan_top_video_status(
  114. pool_id=video_data["id"],
  115. ori_status=const.PROCESSING_STATUS,
  116. new_status=const.FAIL_STATUS
  117. )
  118. except Exception as e:
  119. detail = {
  120. "video_item": video_item.item,
  121. "error": str(e),
  122. "traceback": traceback.format_exc(),
  123. }
  124. self.update_piaoquan_top_video_status(
  125. pool_id=video_data["id"],
  126. ori_status=const.PROCESSING_STATUS,
  127. new_status=const.FAIL_STATUS
  128. )
  129. log(
  130. task="crawler_piaoquan_videos",
  131. function="crawler_each_video",
  132. message="crawler_piaoquan_videos failed",
  133. status="failed",
  134. data=detail,
  135. )
  136. def deal(self):
  137. # roll back lock task
  138. self.rollback_lock_tasks()
  139. # get video_list
  140. video_list = self.get_piaoquan_top_video_list()
  141. for video_data in tqdm(video_list, desc="video_list"):
  142. try:
  143. self.crawler_each_video(video_data)
  144. except Exception as e:
  145. log(
  146. task="crawler_piaoquan_videos",
  147. function="crawler_each_video",
  148. message="crawler_piaoquan_videos failed",
  149. status="failed",
  150. data={
  151. "video_data": video_data,
  152. "error": str(e),
  153. "traceback": traceback.format_exc(),
  154. }
  155. )