crawler_pipeline.py 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. """
  2. @author: luojunhui
  3. """
  4. import os
  5. import json
  6. from applications.utils import download_gzh_video
  7. from applications.utils import download_toutiao_video
  8. from applications.utils import upload_to_oss
  9. from config import apolloConfig
  10. my_config = apolloConfig()
  11. empty_dict = {}
  12. sensitive_word_list = json.loads(my_config.getConfigValue("sensitive_word_list"))
  13. def whether_title_sensitive(title: str) -> bool:
  14. """
  15. title sensitive words filter
  16. """
  17. for word in sensitive_word_list:
  18. if word in title:
  19. return True
  20. return False
  21. def whether_duplicate_video_title(video_title: str, db_client) -> bool:
  22. """
  23. whether duplicate video title
  24. """
  25. sql = f"""
  26. select id from publish_single_video_source
  27. where article_title = %s;
  28. """
  29. duplicate_id = db_client.fetch(query=sql, params=(video_title,))
  30. if duplicate_id:
  31. return True
  32. return False
  33. def scrape_video_entities_process(video_item, db_client) -> dict:
  34. """
  35. video crawler pipeline
  36. """
  37. video_title = video_item["article_title"]
  38. # whether title sensitive
  39. if whether_title_sensitive(video_title):
  40. return empty_dict
  41. # whether duplicate video title
  42. if whether_duplicate_video_title(video_title, db_client):
  43. return empty_dict
  44. # download video
  45. article_url = video_item["article_url"]
  46. platform = video_item["platform"]
  47. match platform:
  48. case "toutiao":
  49. video_path = download_toutiao_video(article_url)
  50. case "gzh":
  51. video_path = download_gzh_video(article_url)
  52. case "hksp":
  53. video_path = ""
  54. case "sph":
  55. video_path = ""
  56. case _:
  57. return empty_dict
  58. if video_path:
  59. # upload video to oss
  60. oss_path = upload_to_oss(video_path)
  61. video_item["video_oss_path"] = oss_path
  62. os.remove(video_path)
  63. return video_item
  64. else:
  65. return empty_dict