crawler_pipeline.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. """
  2. @author: luojunhui
  3. """
  4. import os
  5. import json
  6. from applications import log
  7. from applications.utils import download_sohu_video
  8. from applications.utils import download_gzh_video
  9. from applications.utils import download_toutiao_video
  10. from applications.utils import upload_to_oss
  11. from config import apolloConfig
  12. my_config = apolloConfig()
  13. empty_dict = {}
  14. sensitive_word_list = json.loads(my_config.getConfigValue("sensitive_word_list"))
  15. def whether_title_sensitive(title: str) -> bool:
  16. """
  17. title sensitive words filter
  18. """
  19. for word in sensitive_word_list:
  20. if word in title:
  21. return True
  22. return False
  23. def whether_duplicate_video_title(video_title: str, db_client) -> bool:
  24. """
  25. whether duplicate video title
  26. """
  27. sql = f"""
  28. select id from publish_single_video_source
  29. where article_title = %s;
  30. """
  31. duplicate_id = db_client.fetch(query=sql, params=(video_title,))
  32. if duplicate_id:
  33. return True
  34. return False
  35. def scrape_video_entities_process(video_item, db_client) -> dict:
  36. """
  37. video crawler pipeline
  38. """
  39. article_url = video_item["article_url"]
  40. platform = video_item["platform"]
  41. video_title = video_item["article_title"]
  42. # whether title sensitive
  43. if whether_title_sensitive(video_title):
  44. return empty_dict
  45. # whether duplicate video title
  46. if whether_duplicate_video_title(video_title, db_client):
  47. return empty_dict
  48. # download video
  49. match platform:
  50. case "toutiao":
  51. video_path = download_toutiao_video(article_url)
  52. case "gzh":
  53. video_path = download_gzh_video(article_url)
  54. case "hksp":
  55. video_path = ""
  56. case "sph":
  57. video_path = ""
  58. case "sohu":
  59. video_path = download_sohu_video(article_url)
  60. case "piaoquan":
  61. return video_item
  62. case _:
  63. return empty_dict
  64. if video_path:
  65. # upload video to oss
  66. oss_path = upload_to_oss(video_path)
  67. video_item["video_oss_path"] = oss_path
  68. os.remove(video_path)
  69. return video_item
  70. else:
  71. return empty_dict
  72. def whether_duplicate_article_title(article_title: str, db_client) -> bool:
  73. """
  74. whether duplicate video title
  75. """
  76. sql = f"""
  77. select article_id from crawler_meta_article
  78. where title = %s;
  79. """
  80. duplicate_id = db_client.fetch(query=sql, params=(article_title,))
  81. if duplicate_id:
  82. return True
  83. return False
  84. def scrape_article_entities_process(article_item, db_client) -> dict:
  85. """
  86. article crawler pipeline
  87. """
  88. article_title = article_item["article_title"]
  89. if whether_title_sensitive(article_title):
  90. article_item['title_sensitive'] = 1
  91. return article_item
  92. if whether_duplicate_article_title(article_title, db_client):
  93. return empty_dict
  94. return article_item