crawler_pipeline.py 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. """
  2. @author: luojunhui
  3. """
  4. import os
  5. import json
  6. from applications import log
  7. from applications.utils import download_gzh_video
  8. from applications.utils import download_toutiao_video
  9. from applications.utils import upload_to_oss
  10. from config import apolloConfig
  11. my_config = apolloConfig()
  12. empty_dict = {}
  13. sensitive_word_list = json.loads(my_config.getConfigValue("sensitive_word_list"))
  14. def whether_title_sensitive(title: str) -> bool:
  15. """
  16. title sensitive words filter
  17. """
  18. for word in sensitive_word_list:
  19. if word in title:
  20. return True
  21. return False
  22. def whether_duplicate_video_title(video_title: str, db_client) -> bool:
  23. """
  24. whether duplicate video title
  25. """
  26. sql = f"""
  27. select id from publish_single_video_source
  28. where article_title = %s;
  29. """
  30. duplicate_id = db_client.fetch(query=sql, params=(video_title,))
  31. if duplicate_id:
  32. return True
  33. return False
  34. def scrape_video_entities_process(video_item, db_client) -> dict:
  35. """
  36. video crawler pipeline
  37. """
  38. article_url = video_item["article_url"]
  39. platform = video_item["platform"]
  40. video_title = video_item["article_title"]
  41. # whether title sensitive
  42. if whether_title_sensitive(video_title):
  43. print("title is sensitive")
  44. return empty_dict
  45. # whether duplicate video title
  46. if whether_duplicate_video_title(video_title, db_client):
  47. print("duplicate video title")
  48. return empty_dict
  49. # download video
  50. match platform:
  51. case "toutiao":
  52. video_path = download_toutiao_video(article_url)
  53. case "gzh":
  54. video_path = download_gzh_video(article_url)
  55. case "hksp":
  56. video_path = ""
  57. case "sph":
  58. video_path = ""
  59. case "sohu":
  60. video_path = download_toutiao_video(article_url)
  61. case _:
  62. return empty_dict
  63. if video_path:
  64. # upload video to oss
  65. oss_path = upload_to_oss(video_path)
  66. video_item["video_oss_path"] = oss_path
  67. os.remove(video_path)
  68. return video_item
  69. else:
  70. return empty_dict