crawler_piaoquan_videos.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. from __future__ import annotations
  2. import json
  3. import time
  4. import traceback
  5. from pymysql.cursors import DictCursor
  6. from tqdm import tqdm
  7. from applications import log
  8. from applications.api import ApolloApi
  9. from applications.api import fetch_piaoquan_video_list_detail
  10. from applications.api import fetch_piaoquan_account_video_list
  11. from applications.const.crawler_video_const import CrawlerPiaoQuanVideosConst
  12. from applications.db import DatabaseConnector
  13. from applications.pipeline import scrape_video_entities_process
  14. from applications.utils import Item
  15. from applications.utils import str_to_md5
  16. from applications.utils import insert_into_single_video_source_table
  17. from config import long_articles_config
  18. const = CrawlerPiaoQuanVideosConst()
  19. apollo_api = ApolloApi(env="prod")
  20. pq_long_articles_category_mapping = json.loads(apollo_api.get_config_value("pq_long_articles_category_mapping"))
  21. class CrawlerPiaoQuanVideos:
  22. def __init__(self):
  23. self.db_client = DatabaseConnector(long_articles_config)
  24. self.db_client.connect()
  25. class CrawlerPiaoQuanTopVideos(CrawlerPiaoQuanVideos):
  26. def get_piaoquan_top_video_list(self) -> list[dict]:
  27. fetch_query = f"""
  28. select id, video_id, title, category
  29. from {const.PIAOQUAN_TOP_VIDEO_TABLE}
  30. where status = {const.INIT_STATUS};
  31. """
  32. task_list = self.db_client.fetch(fetch_query, cursor_type=DictCursor)
  33. return task_list
  34. def update_piaoquan_top_video_status(
  35. self, pool_id: int, ori_status: int, new_status: int
  36. ) -> int:
  37. update_query = f"""
  38. update {const.PIAOQUAN_TOP_VIDEO_TABLE}
  39. set status = %s, status_update_timestamp = %s
  40. where id = %s and status = %s;
  41. """
  42. return self.db_client.save(update_query, params=(new_status, int(time.time()), pool_id, ori_status))
  43. def rollback_lock_tasks(self) -> int:
  44. # roll back lock task which has been locked for more than 1 hour
  45. rollback_query = f"""
  46. update {const.PIAOQUAN_TOP_VIDEO_TABLE}
  47. set status = %s
  48. where status = %s and status_update_timestamp < %s;
  49. """
  50. return self.db_client.save(
  51. rollback_query,
  52. (const.INIT_STATUS, const.PROCESSING_STATUS, int(time.time() - const.MAX_LOCK_SECOND))
  53. )
  54. def crawler_each_video(self, video_data: dict) -> None:
  55. """
  56. crawler each video data
  57. """
  58. # lock video id
  59. lock_acquired = self.update_piaoquan_top_video_status(
  60. pool_id=video_data["id"],
  61. ori_status=const.INIT_STATUS,
  62. new_status=const.PROCESSING_STATUS,
  63. )
  64. if not lock_acquired:
  65. return
  66. # get video detail from piaoquan
  67. response_from_piaoquan = fetch_piaoquan_video_list_detail(
  68. [video_data["video_id"]]
  69. )
  70. video_detail = response_from_piaoquan["data"][0]
  71. video_item = Item()
  72. unique_id = f"{const.PLATFORM}-{video_data['video_id']}"
  73. # add info into item
  74. video_item.add("content_trace_id", "video{}".format(str_to_md5(unique_id)))
  75. video_item.add("url_unique_md5", video_data["video_id"])
  76. video_item.add("article_title", video_data["title"])
  77. video_item.add("out_account_id", video_detail["uid"])
  78. video_item.add("out_account_name", video_detail["user"]["nickName"])
  79. video_item.add("mini_program_title", video_data["title"])
  80. video_item.add("cover_url", video_detail["shareImgPath"])
  81. video_item.add(
  82. "publish_timestamp", int(video_detail["gmtCreateTimestamp"] / 1000)
  83. )
  84. video_item.add("platform", const.PLATFORM)
  85. video_item.add(
  86. "article_url",
  87. f"https://admin.piaoquantv.com/cms/post-detail/{video_data['video_id']}/detail",
  88. )
  89. video_item.add("source_account", const.NO_SOURCE_ACCOUNT)
  90. video_item.add("crawler_timestamp", int(time.time()))
  91. video_item.add("video_oss_path", video_detail["ossVideoPath"])
  92. # 将视频审核状态设置为审核中, 不影响后续发布流程
  93. video_item.add("audit_status", const.AUDITING_STATUS)
  94. video_item.add("audit_video_id", video_data["video_id"])
  95. category = pq_long_articles_category_mapping.get(video_data["category"])
  96. if category:
  97. video_item.add("category", category)
  98. video_item.add("category_status", const.SUCCESS_STATUS)
  99. # check item before insert
  100. video_item.check(source="video")
  101. try:
  102. item_with_oss_path = scrape_video_entities_process(
  103. video_item=video_item.item, db_client=self.db_client
  104. )
  105. if item_with_oss_path:
  106. insert_into_single_video_source_table(
  107. db_client=self.db_client, video_item=item_with_oss_path
  108. )
  109. self.update_piaoquan_top_video_status(
  110. pool_id=video_data["id"],
  111. ori_status=const.PROCESSING_STATUS,
  112. new_status=const.SUCCESS_STATUS
  113. )
  114. else:
  115. self.update_piaoquan_top_video_status(
  116. pool_id=video_data["id"],
  117. ori_status=const.PROCESSING_STATUS,
  118. new_status=const.FAIL_STATUS
  119. )
  120. except Exception as e:
  121. detail = {
  122. "video_item": video_item.item,
  123. "error": str(e),
  124. "traceback": traceback.format_exc(),
  125. }
  126. self.update_piaoquan_top_video_status(
  127. pool_id=video_data["id"],
  128. ori_status=const.PROCESSING_STATUS,
  129. new_status=const.FAIL_STATUS
  130. )
  131. log(
  132. task="crawler_piaoquan_videos",
  133. function="crawler_each_video",
  134. message="crawler_piaoquan_videos failed",
  135. status="failed",
  136. data=detail,
  137. )
  138. def deal(self):
  139. # roll back lock task
  140. self.rollback_lock_tasks()
  141. # get video_list
  142. video_list = self.get_piaoquan_top_video_list()
  143. for video_data in tqdm(video_list, desc="video_list"):
  144. try:
  145. self.crawler_each_video(video_data)
  146. except Exception as e:
  147. log(
  148. task="crawler_piaoquan_videos",
  149. function="crawler_each_video",
  150. message="crawler_piaoquan_videos failed",
  151. status="failed",
  152. data={
  153. "video_data": video_data,
  154. "error": str(e),
  155. "traceback": traceback.format_exc(),
  156. }
  157. )
  158. class CrawlerPiaoQuanAccountVideos(CrawlerPiaoQuanVideos):
  159. def get_piaoquan_account_video_list(self) -> list[dict]:
  160. account_id = "81584998"
  161. has_next_page = True
  162. page_id = 1
  163. page_size = 10
  164. while has_next_page:
  165. response = fetch_piaoquan_account_video_list(
  166. account_id=account_id,
  167. page_id=page_id,
  168. page_size=page_size
  169. )
  170. video_list = response["content"]["objs"]
  171. self.insert_video_list(video_list)
  172. def insert_video_list(self, video_list: list[dict]) -> None:
  173. pass