spider_task.py 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. """
  2. @author: luojunhui
  3. """
  4. from typing import Dict, List
  5. from applications.const import new_content_id_task_const
  6. async def whether_downloaded_videos_exists(content_id, article_crawler_video_table, db_client):
  7. """
  8. 判断该文章是否存在历史匹配视频
  9. :param content_id:
  10. :param article_crawler_video_table: 爬虫表
  11. :param db_client
  12. :return:
  13. """
  14. sql = f"""
  15. SELECT id
  16. FROM {article_crawler_video_table}
  17. WHERE content_id = '{content_id}'
  18. AND download_status = {new_content_id_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS};
  19. """
  20. res_tuple = await db_client.async_select(sql)
  21. if len(res_tuple) >= new_content_id_task_const.MIN_MATCH_VIDEO_NUM:
  22. return True
  23. else:
  24. return False
  25. async def get_downloaded_videos(content_id, article_crawler_video_table, db_client) -> List[Dict]:
  26. """
  27. 获取已下载的视频
  28. :return:
  29. """
  30. sql = f"""
  31. SELECT platform, play_count, like_count, video_oss_path, cover_oss_path, user_id
  32. FROM {article_crawler_video_table}
  33. WHERE content_id = '{content_id}'
  34. AND download_status = {new_content_id_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS}
  35. ORDER BY score DESC;
  36. """
  37. res_tuple = await db_client.async_select(sql)
  38. return [
  39. {
  40. "platform": i[0],
  41. "play_count": i[1],
  42. "like_count": i[2],
  43. "video_oss_path": i[3],
  44. "cover_oss_path": i[4],
  45. "uid": i[5]
  46. }
  47. for i in res_tuple
  48. ]
  49. async def update_crawler_table_with_exist_content_id(root_content_id, content_id, trace_id, article_crawler_video_table, db_client):
  50. """
  51. 用root_content_id 查询出已经下载过的视频信息,用new_content_id更新
  52. """
  53. select_sql = f"""
  54. SELECT out_video_id, platform, video_title, play_count, like_count, share_count, publish_time, crawler_time,
  55. duration, video_url, cover_url, download_status, video_oss_path, cover_oss_path, user_id, score, is_illegal
  56. FROM {article_crawler_video_table}
  57. WHERE content_id = '{root_content_id}'
  58. AND download_status = {new_content_id_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS}
  59. AND is_illegal = {new_content_id_task_const.VIDEO_SAFE};
  60. """
  61. res_tuple = await db_client.async_select(select_sql)
  62. if res_tuple:
  63. insert_list = [(trace_id, content_id) + row for row in res_tuple]
  64. insert_sql = f"""
  65. INSERT INTO {article_crawler_video_table}
  66. (trace_id, content_id, out_video_id, platform, video_title, play_count, like_count, share_count, publish_time, crawler_time,
  67. duration, video_url, cover_url, download_status, video_oss_path, cover_oss_path, user_id, score, is_illegal)
  68. VALUES
  69. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  70. """
  71. affected_rows = await db_client.async_insert_many(sql=insert_sql, params_list=insert_list)
  72. return affected_rows
  73. else:
  74. return 0