crawler_piaoquan_videos.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. from __future__ import annotations
  2. import json
  3. import time
  4. import traceback
  5. from pymysql.cursors import DictCursor
  6. from tqdm import tqdm
  7. from applications import log
  8. from applications.api import fetch_piaoquan_video_list_detail
  9. from applications.const.crawler_video_const import CrawlerPiaoQuanVideosConst
  10. from applications.db import DatabaseConnector
  11. from applications.pipeline import scrape_video_entities_process
  12. from applications.utils import Item
  13. from applications.utils import str_to_md5
  14. from applications.utils import insert_into_single_video_source_table
  15. from config import long_articles_config
  16. const = CrawlerPiaoQuanVideosConst()
  17. category_map = {
  18. "知识科普": "知识科普",
  19. "生活技巧科普": "知识科普",
  20. "老年相关法律科普": "知识科普",
  21. "中国战争史": "军事历史",
  22. "中国历史影像": "军事历史",
  23. "正能量剧情": "家长里短",
  24. "人财诈骗": "社会法治",
  25. "贪污腐败": "社会法治",
  26. "罕见画面": "奇闻趣事",
  27. "惊奇事件": "奇闻趣事",
  28. "动物萌宠": "奇闻趣事",
  29. "老明星": "名人八卦",
  30. "健康知识": "健康养生",
  31. "饮食健康": "健康养生",
  32. "人生忠告": "情感故事",
  33. "老年生活": "情感故事",
  34. "国际军事": "政治新闻",
  35. "他国政策": "政治新闻",
  36. "国际时政": "政治新闻",
  37. "历史名人": "历史人物",
  38. }
  39. class CrawlerPiaoQuanVideos:
  40. def __init__(self):
  41. self.db_client = DatabaseConnector(long_articles_config)
  42. self.db_client.connect()
  43. def get_piaoquan_top_video_list(self) -> list[dict]:
  44. fetch_query = f"""
  45. select id, video_id, title
  46. from {const.PIAOQUAN_TOP_VIDEO_TABLE}
  47. where status = {const.INIT_STATUS};
  48. """
  49. task_list = self.db_client.fetch(fetch_query, cursor_type=DictCursor)
  50. return task_list
  51. def update_piaoquan_top_video_status(
  52. self, pool_id: int, ori_status: int, new_status: int
  53. ) -> int:
  54. update_query = f"""
  55. update {const.PIAOQUAN_TOP_VIDEO_TABLE}
  56. set status = %s
  57. where id = %s and status = %s;
  58. """
  59. return self.db_client.save(update_query, (pool_id, ori_status, new_status))
  60. def crawler_each_video(self, video_data: dict) -> None:
  61. """
  62. crawler each video data
  63. """
  64. # lock video id
  65. lock_acquired = self.update_piaoquan_top_video_status(
  66. pool_id=video_data["id"],
  67. ori_status=const.INIT_STATUS,
  68. new_status=const.PROCESSING_STATUS,
  69. )
  70. if not lock_acquired:
  71. return
  72. # get video detail from piaoquan
  73. response_from_piaoquan = fetch_piaoquan_video_list_detail(
  74. [video_data["video_id"]]
  75. )
  76. video_detail = response_from_piaoquan["data"][0]
  77. video_item = Item()
  78. unique_id = f"{const.PLATFORM}-{video_data['video_id']}"
  79. # add info into item
  80. video_item.add("content_trace_id", "video{}".format(str_to_md5(unique_id)))
  81. video_item.add("url_unique_md5", video_data["video_id"])
  82. video_item.add("article_title", video_data["title"])
  83. video_item.add("out_account_id", video_detail["uid"])
  84. video_item.add("out_account_name", video_data["user"]["nickName"])
  85. video_item.add(
  86. "publish_timestamp", int(video_detail["gmtCreateTimestamp"] / 1000)
  87. )
  88. video_item.add("platform", const.PLATFORM)
  89. video_item.add(
  90. "article_url",
  91. f"https://admin.piaoquantv.com/cms/post-detail/{video_data['video_id']}/detail",
  92. )
  93. video_item.add("source_account", const.NO_SOURCE_ACCOUNT)
  94. video_item.add("crawler_timestamp", int(time.time()))
  95. video_item.add("oss_path", video_detail["ossVideoPath"])
  96. video_item.add("audit_status", video_detail["auditStatus"])
  97. video_item.add("category", category_map.get(video_data["category"]))
  98. # check item before insert
  99. video_item.check(source="video")
  100. try:
  101. item_with_oss_path = scrape_video_entities_process(
  102. video_item=video_item.item, db_client=self.db_client
  103. )
  104. if item_with_oss_path:
  105. insert_into_single_video_source_table(
  106. db_client=self.db_client, video_item=item_with_oss_path
  107. )
  108. except Exception as e:
  109. detail = {
  110. "video_item": video_item.item,
  111. "error": str(e),
  112. "traceback": traceback.format_exc(),
  113. }
  114. log(
  115. task="crawler_piaoquan_videos",
  116. function="crawler_each_video",
  117. message="crawler_piaoquan_videos failed",
  118. status="failed",
  119. data=detail,
  120. )