crawler_sohu_recommend_videos.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. from __future__ import annotations
  2. import time
  3. import traceback
  4. from pymysql.cursors import DictCursor
  5. from tqdm import tqdm
  6. from applications.db import DatabaseConnector
  7. from applications.pipeline import scrape_video_entities_process
  8. from applications.utils import Item
  9. from applications.utils import str_to_md5
  10. from applications.utils import insert_into_single_video_source_table
  11. from config import long_articles_config
  12. from coldStartTasks.crawler.sohu import get_recommendation_video_list
  13. class CrawlerSohuRecommendVideos:
  14. def __init__(self):
  15. self.db_client = DatabaseConnector(long_articles_config)
  16. self.db_client.connect()
  17. self.platform = 'sohu'
  18. def fetch_seed_videos(self) -> list[dict]:
  19. """
  20. get seed videos from database
  21. """
  22. fetch_query = f"""
  23. select id, out_account_id, url_unique_md5, article_title, score
  24. from publish_single_video_source
  25. where platform = 'sohu' and source_account = 0 and score > 0.6 and audit_status = 1 and bad_status = 0;
  26. """
  27. seed_videos = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
  28. return seed_videos
  29. def crawler_each_video(self, video_data):
  30. """
  31. crawler each video data
  32. """
  33. video_item = Item()
  34. unique_id = f"{self.platform}-{video_data['id']}"
  35. # add info into item
  36. video_item.add("content_trace_id", "video{}".format(str_to_md5(unique_id)))
  37. video_item.add("url_unique_md5", video_data["id"])
  38. video_item.add("article_title", video_data["title"])
  39. video_item.add("out_account_id", video_data['authorId'])
  40. video_item.add("out_account_name", video_data["authorName"])
  41. video_item.add("publish_timestamp", video_data["postTime"] / 1000)
  42. video_item.add("platform", self.platform)
  43. video_item.add("article_url", video_data["videoUrl"])
  44. video_item.add("source_account", 0)
  45. video_item.add("crawler_timestamp", int(time.time()))
  46. # check item before insert
  47. video_item.check(source="video")
  48. try:
  49. item_with_oss_path = scrape_video_entities_process(
  50. video_item=video_item.item, db_client=self.db_client
  51. )
  52. if item_with_oss_path:
  53. insert_into_single_video_source_table(
  54. db_client=self.db_client, video_item=item_with_oss_path
  55. )
  56. except Exception as e:
  57. detail = {
  58. "video_item": video_item.item,
  59. "error": str(e),
  60. "traceback": traceback.format_exc(),
  61. }
  62. print(detail)
  63. def get_each_video_recommendation(self, seed_video: dict) -> None:
  64. """
  65. get each video recommendation
  66. """
  67. author_id = seed_video["out_account_id"]
  68. article_id = seed_video["url_unique_md5"]
  69. outside_url = f"https://www.sohu.com/a/{article_id}_{author_id}"
  70. page_list = [i for i in range(1, 8)]
  71. for page in page_list:
  72. try:
  73. response = get_recommendation_video_list(outside_url, author_id, article_id, page)
  74. if response:
  75. video_list = response['data']['recommendVideoFeed']['list']
  76. for video in tqdm(video_list):
  77. self.crawler_each_video(video)
  78. except Exception as e:
  79. print(e)
  80. print(traceback.format_exc())
  81. continue
  82. def update_seed_video_status(self, task_id: int) -> int:
  83. """
  84. update seed video status
  85. """
  86. update_query = f"""
  87. update publish_single_video_source set source_account = %s where id = %s and source_account = %s;
  88. """
  89. return self.db_client.save(
  90. query=update_query,
  91. params=(1, task_id, 0)
  92. )
  93. def deal(self):
  94. task_list = self.fetch_seed_videos()
  95. for task in tqdm(task_list[:1]):
  96. try:
  97. self.get_each_video_recommendation(task)
  98. self.update_seed_video_status(task_id=task["id"])
  99. except Exception as e:
  100. print(e)