spider_task.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. """
  2. @author: luojunhui
  3. """
  4. from typing import Dict, List
  5. from applications.const import new_content_id_task_const
  6. async def whether_downloaded_videos_exists(content_id, article_crawler_video_table, db_client):
  7. """
  8. 判断该文章是否存在历史匹配视频
  9. :param content_id:
  10. :param article_crawler_video_table: 爬虫表
  11. :param db_client
  12. :return:
  13. """
  14. sql = f"""
  15. SELECT id
  16. FROM {article_crawler_video_table}
  17. WHERE content_id = '{content_id}'
  18. AND download_status = {new_content_id_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS}
  19. AND is_illegal = {new_content_id_task_const.VIDEO_SAFE};
  20. """
  21. res_tuple = await db_client.async_select(sql)
  22. if len(res_tuple) >= new_content_id_task_const.MIN_MATCH_VIDEO_NUM:
  23. return True
  24. else:
  25. return False
  26. async def get_downloaded_videos(content_id, article_crawler_video_table, db_client) -> List[Dict]:
  27. """
  28. 获取已下载的视频
  29. :return:
  30. """
  31. sql = f"""
  32. SELECT platform, play_count, like_count, video_oss_path, cover_oss_path, user_id
  33. FROM {article_crawler_video_table}
  34. WHERE content_id = '{content_id}'
  35. AND download_status = {new_content_id_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS}
  36. AND is_illegal = {new_content_id_task_const.VIDEO_SAFE}
  37. ORDER BY score DESC;
  38. """
  39. res_tuple = await db_client.async_select(sql)
  40. return [
  41. {
  42. "platform": i[0],
  43. "play_count": i[1],
  44. "like_count": i[2],
  45. "video_oss_path": i[3],
  46. "cover_oss_path": i[4],
  47. "uid": i[5]
  48. }
  49. for i in res_tuple
  50. ]
  51. async def update_crawler_table_with_exist_content_id(root_content_id, content_id, trace_id, article_crawler_video_table, db_client):
  52. """
  53. 用root_content_id 查询出已经下载过的视频信息,用new_content_id更新
  54. """
  55. select_sql = f"""
  56. SELECT out_video_id, platform, video_title, play_count, like_count, share_count, publish_time, crawler_time,
  57. duration, video_url, cover_url, download_status, video_oss_path, cover_oss_path, user_id, score, is_illegal
  58. FROM {article_crawler_video_table}
  59. WHERE content_id = '{root_content_id}'
  60. AND download_status = {new_content_id_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS}
  61. AND is_illegal = {new_content_id_task_const.VIDEO_SAFE};
  62. """
  63. res_tuple = await db_client.async_select(select_sql)
  64. insert_list = [(trace_id, content_id) + row for row in res_tuple]
  65. insert_sql = f"""
  66. INSERT INTO {article_crawler_video_table}
  67. (trace_id, content_id, out_video_id, platform, video_title, play_count, like_count, share_count, publish_time, crawler_time,
  68. duration, video_url, cover_url, download_status, video_oss_path, cover_oss_path, user_id, score, is_illegal)
  69. VALUES
  70. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  71. """
  72. await db_client.async_insert_many(sql=insert_sql, params_list=insert_list)