crawler_sohu_videos.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. from __future__ import annotations
  2. import time
  3. import traceback
  4. from pymysql.cursors import DictCursor
  5. from tqdm import tqdm
  6. from applications import log
  7. from applications.const import SohuVideoCrawlerConst
  8. from applications.db import DatabaseConnector
  9. from applications.pipeline import scrape_video_entities_process
  10. from applications.utils import Item
  11. from applications.utils import str_to_md5
  12. from applications.utils import insert_into_single_video_source_table
  13. from cold_start.crawler.sohu import get_video_detail
  14. from cold_start.crawler.sohu import get_hot_point_videos
  15. from cold_start.crawler.sohu import get_recommendation_video_list
  16. from cold_start.crawler.sohu import get_user_homepage_videos
  17. from config import long_articles_config
  18. const = SohuVideoCrawlerConst()
  19. class CrawlerSohuVideos:
  20. def __init__(self):
  21. self.db_client = DatabaseConnector(long_articles_config)
  22. self.db_client.connect()
  23. def crawler_each_video(self, video_data):
  24. """
  25. crawler each video data
  26. """
  27. video_item = Item()
  28. unique_id = f"{const.PLATFORM}-{video_data['id']}"
  29. # add info into item
  30. video_item.add("content_trace_id", "video{}".format(str_to_md5(unique_id)))
  31. video_item.add("url_unique_md5", video_data["id"])
  32. video_item.add("article_title", video_data["title"])
  33. video_item.add("out_account_id", video_data["authorId"])
  34. video_item.add("out_account_name", video_data["authorName"])
  35. video_item.add("publish_timestamp", video_data["postTime"] / 1000)
  36. video_item.add("platform", const.PLATFORM)
  37. video_item.add("article_url", video_data["videoUrl"])
  38. video_item.add("source_account", const.GET_RECOMMEND_INIT_STATUS)
  39. video_item.add("crawler_timestamp", int(time.time()))
  40. # check item before insert
  41. video_item.check(source="video")
  42. try:
  43. item_with_oss_path = scrape_video_entities_process(
  44. video_item=video_item.item, db_client=self.db_client
  45. )
  46. if item_with_oss_path:
  47. insert_into_single_video_source_table(
  48. db_client=self.db_client, video_item=item_with_oss_path
  49. )
  50. except Exception as e:
  51. detail = {
  52. "video_item": video_item.item,
  53. "error": str(e),
  54. "traceback": traceback.format_exc(),
  55. }
  56. log(
  57. task="crawler_sohu_videos",
  58. function="crawler_each_video",
  59. message="crawler_sohu_videos failed",
  60. status="failed",
  61. data=detail,
  62. )
  63. class CrawlerSohuHotVideos(CrawlerSohuVideos):
  64. # process hot video obj to satisfy video item
  65. def process_hot_video_obj(self, video_obj):
  66. """
  67. process hot video obj
  68. """
  69. article_url = f"https://www.sohu.com{video_obj['url']}"
  70. video_detail_response = get_video_detail(article_url=article_url)
  71. item_obj = {
  72. "id": video_obj["id"],
  73. "title": video_obj["title"],
  74. "authorId": video_detail_response["account_id"],
  75. "authorName": video_detail_response["account_name"],
  76. "postTime": video_detail_response["publish_timestamp"],
  77. "videoUrl": video_detail_response["video_url"],
  78. }
  79. self.crawler_each_video(item_obj)
  80. def deal(self):
  81. """
  82. crawler sohu hot videos every day
  83. """
  84. hot_point_video_response = get_hot_point_videos()
  85. hot_point_video_list = hot_point_video_response["data"][
  86. "tpl-card-feed-pc-data"
  87. ]["list"]
  88. for video in tqdm(hot_point_video_list, desc="crawler sohu hot videos"):
  89. try:
  90. self.process_hot_video_obj(video)
  91. except Exception as e:
  92. log(
  93. task="crawler_sohu_videos",
  94. function="crawler_sohu_hot_videos",
  95. message="crawler_sohu_hot_videos failed",
  96. status="failed",
  97. data={
  98. "error": str(e),
  99. "traceback": traceback.format_exc(),
  100. "video": video,
  101. },
  102. )
  103. class CrawlerSohuRecommendVideos(CrawlerSohuVideos):
  104. def fetch_seed_videos(self) -> list[dict]:
  105. """
  106. get seed videos from database
  107. """
  108. fetch_query = f"""
  109. select id, out_account_id, url_unique_md5, article_title, score
  110. from publish_single_video_source
  111. where platform = '{const.PLATFORM}'
  112. and source_account = {const.GET_RECOMMEND_INIT_STATUS}
  113. and score > {const.GET_RECOMMEND_THRESHOLD_SCORE}
  114. and audit_status = {const.AUDIT_SUCCESS_STATUS}
  115. and bad_status = {const.VIDEO_NOT_BAD_STATUS};
  116. """
  117. seed_videos = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
  118. return seed_videos
  119. def get_each_video_recommendation(self, seed_video: dict) -> None:
  120. """
  121. get each video recommendation
  122. """
  123. author_id = seed_video["out_account_id"]
  124. article_id = seed_video["url_unique_md5"]
  125. outside_url = f"https://www.sohu.com/a/{article_id}_{author_id}"
  126. for page in const.PAGE_LIST:
  127. try:
  128. response = get_recommendation_video_list(
  129. outside_url, author_id, article_id, page
  130. )
  131. if response:
  132. video_list = response["data"]["recommendVideoFeed"]["list"]
  133. for video in tqdm(video_list, desc=f"page: {page}"):
  134. self.crawler_each_video(video)
  135. except Exception as e:
  136. log(
  137. task="crawler_sohu_videos",
  138. function="get_each_video_recommendation",
  139. message="get_each_video_recommendation failed",
  140. status="failed",
  141. data={
  142. "error": str(e),
  143. "traceback": traceback.format_exc(),
  144. "page": page,
  145. },
  146. )
  147. def update_seed_video_status(self, task_id: int) -> int:
  148. """
  149. update seed video status
  150. """
  151. update_query = f"""
  152. update publish_single_video_source set source_account = %s where id = %s and source_account = %s;
  153. """
  154. return self.db_client.save(
  155. query=update_query,
  156. params=(
  157. const.GET_RECOMMEND_SUCCESS_STATUS,
  158. task_id,
  159. const.GET_RECOMMEND_INIT_STATUS,
  160. ),
  161. )
  162. def deal(self):
  163. task_list = self.fetch_seed_videos()
  164. for task in tqdm(task_list):
  165. try:
  166. self.get_each_video_recommendation(task)
  167. self.update_seed_video_status(task_id=task["id"])
  168. except Exception as e:
  169. log(
  170. task="crawler_sohu_videos",
  171. function="crawler_sohu_hot_videos",
  172. message="crawler_sohu_hot_videos failed",
  173. status="failed",
  174. data={
  175. "error": str(e),
  176. "traceback": traceback.format_exc(),
  177. "video": task,
  178. },
  179. )
  180. class CrawlerSohuUserPageVideos(CrawlerSohuVideos):
  181. """
  182. 个人主页文章占比大,账号体系还未建设,本次上线暂时不抓取,后续有需要再考虑
  183. """
  184. def get_author_list(self):
  185. """
  186. get author list from database
  187. """
  188. return []
  189. def process_each_page(self, response: dict):
  190. """
  191. process each page
  192. """
  193. video_list = response["data"]["FeedSlideloadAuthor_2_0_pc_1655965929143_data2"][
  194. "list"
  195. ]
  196. for video in tqdm(video_list, desc="crawler sohu user page videos"):
  197. try:
  198. self.crawler_each_video(video)
  199. except Exception as e:
  200. log(
  201. task="crawler_sohu_videos",
  202. function="process_each_page",
  203. message="crawler_sohu_user_videos failed",
  204. status="failed",
  205. data={
  206. "error": str(e),
  207. "traceback": traceback.format_exc(),
  208. "video": video,
  209. },
  210. )
  211. def get_each_user_videos(self, author_id: int):
  212. """
  213. get each user videos
  214. """
  215. page_list = [i for i in range(1, 2)]
  216. for page in page_list:
  217. try:
  218. response = get_user_homepage_videos(author_id, page)
  219. self.process_each_page(response)
  220. except Exception as e:
  221. log(
  222. task="crawler_sohu_videos",
  223. function="get_each_user_videos",
  224. message="crawler_sohu_user_videos failed",
  225. status="failed",
  226. data={
  227. "error": str(e),
  228. "traceback": traceback.format_exc(),
  229. "author_id": author_id,
  230. "page": page,
  231. },
  232. )
  233. def deal(self):
  234. author_list = self.get_author_list()
  235. for author_id in tqdm(author_list, desc="crawler sohu user videos"):
  236. try:
  237. self.get_each_user_videos(author_id)
  238. except Exception as e:
  239. log(
  240. task="crawler_sohu_videos",
  241. function="crawler_sohu_hot_videos",
  242. message="crawler_sohu_hot_videos failed",
  243. status="failed",
  244. data={
  245. "error": str(e),
  246. "traceback": traceback.format_exc(),
  247. "author_od": author_id,
  248. },
  249. )