pipeline_dev.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. import re
  2. import time
  3. class PiaoQuanPipelineTest:
  4. def __init__(self, platform, mode, rule_dict, env, item, trace_id):
  5. self.platform = platform
  6. self.mode = mode
  7. self.item = item
  8. self.rule_dict = rule_dict
  9. self.env = env
  10. self.trace_id = trace_id
  11. # 视频的发布时间限制, 属于是规则过滤
  12. def publish_time_flag(self):
  13. # 判断发布时间
  14. publish_time_stamp = self.item["publish_time_stamp"]
  15. update_time_stamp = self.item["update_time_stamp"]
  16. if self.platform == "gongzhonghao":
  17. if (
  18. int(time.time()) - publish_time_stamp
  19. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  20. ) and (
  21. int(time.time()) - update_time_stamp
  22. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  23. ):
  24. message = "发布时间超过{}天".format(
  25. int(self.rule_dict.get("period", {}).get("max", 1000))
  26. )
  27. print(message)
  28. return False
  29. else:
  30. if (
  31. int(time.time()) - publish_time_stamp
  32. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  33. ):
  34. message = "发布时间超过{}天".format(
  35. int(self.rule_dict.get("period", {}).get("max", 1000))
  36. )
  37. print(message)
  38. return False
  39. return True
  40. # 视频标题是否满足需求
  41. def title_flag(self):
  42. title = self.item["video_title"]
  43. cleaned_title = re.sub(r"[^\w]", " ", title)
  44. # 敏感词
  45. # 获取敏感词列表
  46. sensitive_words = []
  47. if any(word in cleaned_title for word in sensitive_words):
  48. message = "标题中包含敏感词"
  49. print(message)
  50. return False
  51. return True
  52. # 视频基础下载规则
  53. def download_rule_flag(self):
  54. for key in self.item:
  55. if self.rule_dict.get(key):
  56. max_value = (
  57. int(self.rule_dict[key]["max"])
  58. if int(self.rule_dict[key]["max"]) > 0
  59. else 999999999999999
  60. )
  61. if key == "peroid": # peroid是抓取周期天数
  62. continue
  63. else:
  64. flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value
  65. if not flag:
  66. message = "{}: {} <= {} <= {}, {}".format(
  67. key,
  68. self.rule_dict[key]["min"],
  69. self.item[key],
  70. max_value,
  71. flag,
  72. )
  73. print(message)
  74. return flag
  75. else:
  76. continue
  77. return True
  78. # 按照某个具体平台来去重
  79. def repeat_video(self):
  80. # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """
  81. # out_id = self.item["out_video_id"]
  82. # sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
  83. # repeat_video = MysqlHelper.get_values(
  84. # log_type=self.mode, crawler=self.platform, env=self.env, sql=sql, action=""
  85. # )
  86. # if repeat_video:
  87. # message = "重复的视频"
  88. # print(message)
  89. # return False
  90. return True
  91. def process_item(self):
  92. if not self.publish_time_flag():
  93. # 记录相关日志
  94. return False
  95. if not self.title_flag():
  96. # 记录相关日志
  97. return False
  98. if not self.repeat_video():
  99. # 记录相关日志
  100. return False
  101. if not self.download_rule_flag():
  102. # 记录相关日志
  103. return False
  104. return True