crawler_sohu_videos.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. from __future__ import annotations
  2. import time
  3. import traceback
  4. from pymysql.cursors import DictCursor
  5. from tqdm import tqdm
  6. from applications import log
  7. from applications.db import DatabaseConnector
  8. from applications.pipeline import scrape_video_entities_process
  9. from applications.utils import Item
  10. from applications.utils import str_to_md5
  11. from applications.utils import insert_into_single_video_source_table
  12. from coldStartTasks.crawler.sohu import get_hot_point_videos
  13. from coldStartTasks.crawler.sohu import get_recommendation_video_list
  14. from coldStartTasks.crawler.sohu import get_user_homepage_videos
  15. from config import long_articles_config
  16. class CrawlerSohuVideos:
  17. def __init__(self):
  18. self.db_client = DatabaseConnector(long_articles_config)
  19. self.db_client.connect()
  20. self.platform = "sohu"
  21. def crawler_each_video(self, video_data):
  22. """
  23. crawler each video data
  24. """
  25. video_item = Item()
  26. unique_id = f"{self.platform}-{video_data['id']}"
  27. # add info into item
  28. video_item.add("content_trace_id", "video{}".format(str_to_md5(unique_id)))
  29. video_item.add("url_unique_md5", video_data["id"])
  30. video_item.add("article_title", video_data["title"])
  31. video_item.add("out_account_id", video_data["authorId"])
  32. video_item.add("out_account_name", video_data["authorName"])
  33. video_item.add("publish_timestamp", video_data["postTime"] / 1000)
  34. video_item.add("platform", self.platform)
  35. video_item.add("article_url", video_data["videoUrl"])
  36. video_item.add("source_account", 0)
  37. video_item.add("crawler_timestamp", int(time.time()))
  38. # check item before insert
  39. video_item.check(source="video")
  40. try:
  41. item_with_oss_path = scrape_video_entities_process(
  42. video_item=video_item.item, db_client=self.db_client
  43. )
  44. if item_with_oss_path:
  45. insert_into_single_video_source_table(
  46. db_client=self.db_client, video_item=item_with_oss_path
  47. )
  48. except Exception as e:
  49. detail = {
  50. "video_item": video_item.item,
  51. "error": str(e),
  52. "traceback": traceback.format_exc(),
  53. }
  54. print(detail)
  55. class CrawlerSohuHotVideos(CrawlerSohuVideos):
  56. def deal(self):
  57. """
  58. crawler sohu hot videos every day
  59. """
  60. hot_point_video_response = get_hot_point_videos()
  61. hot_point_video_list = hot_point_video_response["data"][
  62. "tpl-card-feed-pc-data"
  63. ]["list"]
  64. for video in tqdm(hot_point_video_list, desc="crawler sohu hot videos"):
  65. try:
  66. self.crawler_each_video(video)
  67. except Exception as e:
  68. log(
  69. task="crawler_sohu_videos",
  70. function="crawler_sohu_hot_videos",
  71. message="crawler_sohu_hot_videos failed",
  72. status="failed",
  73. data={
  74. "error": str(e),
  75. "traceback": traceback.format_exc(),
  76. "video": video,
  77. },
  78. )
  79. class CrawlerSohuRecommendVideos(CrawlerSohuVideos):
  80. def fetch_seed_videos(self) -> list[dict]:
  81. """
  82. get seed videos from database
  83. """
  84. fetch_query = f"""
  85. select id, out_account_id, url_unique_md5, article_title, score
  86. from publish_single_video_source
  87. where platform = 'sohu' and source_account = 0 and score > 0.6 and audit_status = 1 and bad_status = 0;
  88. """
  89. seed_videos = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
  90. return seed_videos
  91. def get_each_video_recommendation(self, seed_video: dict) -> None:
  92. """
  93. get each video recommendation
  94. """
  95. author_id = seed_video["out_account_id"]
  96. article_id = seed_video["url_unique_md5"]
  97. outside_url = f"https://www.sohu.com/a/{article_id}_{author_id}"
  98. page_list = [i for i in range(1, 8)]
  99. for page in page_list:
  100. try:
  101. response = get_recommendation_video_list(
  102. outside_url, author_id, article_id, page
  103. )
  104. if response:
  105. video_list = response["data"]["recommendVideoFeed"]["list"]
  106. for video in tqdm(video_list, desc=f"page: {page}"):
  107. self.crawler_each_video(video)
  108. except Exception as e:
  109. print(e)
  110. print(traceback.format_exc())
  111. continue
  112. def update_seed_video_status(self, task_id: int) -> int:
  113. """
  114. update seed video status
  115. """
  116. update_query = f"""
  117. update publish_single_video_source set source_account = %s where id = %s and source_account = %s;
  118. """
  119. return self.db_client.save(query=update_query, params=(1, task_id, 0))
  120. def deal(self):
  121. task_list = self.fetch_seed_videos()
  122. for task in tqdm(task_list):
  123. try:
  124. self.get_each_video_recommendation(task)
  125. self.update_seed_video_status(task_id=task["id"])
  126. except Exception as e:
  127. log(
  128. task="crawler_sohu_videos",
  129. function="crawler_sohu_hot_videos",
  130. message="crawler_sohu_hot_videos failed",
  131. status="failed",
  132. data={
  133. "error": str(e),
  134. "traceback": traceback.format_exc(),
  135. "video": task,
  136. },
  137. )
  138. class CrawlerSohuUserPageVideos(CrawlerSohuVideos):
  139. def get_author_list(self):
  140. """
  141. get author list from database
  142. """
  143. return [121644888]
  144. def process_each_page(self, response: dict):
  145. """
  146. process each page
  147. """
  148. video_list = response["data"]["FeedSlideloadAuthor_2_0_pc_1655965929143_data2"][
  149. "list"
  150. ]
  151. for video in tqdm(video_list, desc="crawler sohu user page videos"):
  152. try:
  153. self.crawler_each_video(video)
  154. except Exception as e:
  155. log(
  156. task="crawler_sohu_videos",
  157. function="process_each_page",
  158. message="crawler_sohu_user_videos failed",
  159. status="failed",
  160. data={
  161. "error": str(e),
  162. "traceback": traceback.format_exc(),
  163. "video": video,
  164. },
  165. )
  166. def get_each_user_videos(self, author_id: int):
  167. """
  168. get each user videos
  169. """
  170. page_list = [i for i in range(1, 2)]
  171. for page in page_list:
  172. try:
  173. response = get_user_homepage_videos(author_id, page)
  174. self.process_each_page(response)
  175. except Exception as e:
  176. log(
  177. task="crawler_sohu_videos",
  178. function="get_each_user_videos",
  179. message="crawler_sohu_user_videos failed",
  180. status="failed",
  181. data={
  182. "error": str(e),
  183. "traceback": traceback.format_exc(),
  184. "author_id": author_id,
  185. "page": page,
  186. },
  187. )
  188. def deal(self):
  189. author_list = self.get_author_list()
  190. for author_id in tqdm(author_list, desc="crawler sohu user videos"):
  191. try:
  192. self.get_each_user_videos(author_id)
  193. except Exception as e:
  194. log(
  195. task="crawler_sohu_videos",
  196. function="crawler_sohu_hot_videos",
  197. message="crawler_sohu_hot_videos failed",
  198. status="failed",
  199. data={
  200. "error": str(e),
  201. "traceback": traceback.format_exc(),
  202. "author_od": author_id,
  203. },
  204. )