spider_task.py 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. """
  2. @author: luojunhui
  3. """
  4. from typing import Dict, List
  5. from applications.const import new_content_id_task_const
  6. async def whether_downloaded_videos_exists(content_id, article_crawler_video_table, db_client):
  7. """
  8. 判断该文章是否存在历史匹配视频
  9. :param content_id:
  10. :param article_crawler_video_table: 爬虫表
  11. :param db_client
  12. :return:
  13. """
  14. sql = f"""
  15. SELECT id
  16. FROM {article_crawler_video_table}
  17. WHERE content_id = '{content_id}'
  18. AND download_status = {new_content_id_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS}
  19. AND is_illegal = {new_content_id_task_const.VIDEO_SAFE};
  20. """
  21. res_tuple = await db_client.async_select(sql)
  22. if len(res_tuple) >= new_content_id_task_const.MIN_MATCH_VIDEO_NUM:
  23. return True
  24. else:
  25. return False
  26. async def get_downloaded_videos(content_id, article_crawler_video_table, db_client) -> List[Dict]:
  27. """
  28. 获取已下载的视频
  29. :return:
  30. """
  31. sql = f"""
  32. SELECT platform, play_count, like_count, video_oss_path, cover_oss_path, user_id
  33. FROM {article_crawler_video_table}
  34. WHERE content_id = '{content_id}'
  35. AND download_status = {new_content_id_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS}
  36. AND is_illegal = {new_content_id_task_const.VIDEO_SAFE}
  37. ORDER BY score DESC;
  38. """
  39. res_tuple = await db_client.async_select(sql)
  40. return [
  41. {
  42. "platform": i[0],
  43. "play_count": i[1],
  44. "like_count": i[2],
  45. "video_oss_path": i[3],
  46. "cover_oss_path": i[4],
  47. "uid": i[5]
  48. }
  49. for i in res_tuple
  50. ]
  51. async def update_crawler_table_with_exist_content_id(root_content_id, content_id, trace_id, article_crawler_video_table, db_client):
  52. """
  53. 用root_content_id 查询出已经下载过的视频信息,用new_content_id更新
  54. """
  55. select_sql = f"""
  56. SELECT *
  57. FROM {article_crawler_video_table}
  58. WHERE content_id = '{root_content_id}'
  59. AND download_status = {new_content_id_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS}
  60. AND is_illegal = {new_content_id_task_const.VIDEO_SAFE};
  61. """
  62. res_tuple = await db_client.async_select(select_sql)
  63. insert_list = [(content_id,) + row[2:-3] + (trace_id,) + row[-2:] for row in res_tuple]
  64. insert_sql = f"""
  65. INSERT INTO {article_crawler_video_table}
  66. (content_id, out_video_id, platform, video_title, play_count, like_count, share_count, publish_time, crawler_time, duration, video_url, cover_url, download_status, video_oss_path, cover_oss_path, user_id, trace_id, score, is_illegal)
  67. VALUES
  68. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  69. """
  70. await db_client.async_insert_many(sql=insert_sql, params_list=insert_list)