pipeline.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. import re
  2. import sys
  3. import os
  4. import time
  5. sys.path.append(os.getcwd())
  6. from application.common import MysqlHelper, AliyunLogger
  7. class PiaoQuanPipeline:
  8. """
  9. 爬虫管道——爬虫规则判断
  10. """
  11. def __init__(self, platform, mode, rule_dict, env, item, trace_id):
  12. self.platform = platform
  13. self.mode = mode
  14. self.item = item
  15. self.rule_dict = rule_dict
  16. self.env = env
  17. self.trace_id = trace_id
  18. self.mysql = MysqlHelper(env=env, mode=mode, platform=platform)
  19. self.aliyun_log = AliyunLogger(platform=platform, mode=mode, env=env)
  20. def publish_time_flag(self):
  21. """
  22. 判断发布时间是否过期
  23. :return: True or False
  24. """
  25. # 判断发布时间
  26. publish_time_stamp = self.item["publish_time_stamp"]
  27. update_time_stamp = self.item["update_time_stamp"]
  28. max_d = self.rule_dict.get("period", {}).get("max", 1000)
  29. min_d = self.rule_dict.get("period", {}).get("min", 1000)
  30. days = max_d if max_d > min_d else min_d
  31. if self.platform == "gongzhonghao":
  32. if (
  33. int(time.time()) - publish_time_stamp
  34. > 3600 * 24 * days
  35. ) and (
  36. int(time.time()) - update_time_stamp
  37. > 3600 * 24 * days
  38. ):
  39. self.aliyun_log.logging(
  40. code="2004",
  41. trace_id=self.trace_id,
  42. data=self.item,
  43. message="发布时间超过{}天".format(days),
  44. )
  45. return False
  46. else:
  47. if (
  48. int(time.time()) - publish_time_stamp
  49. > 3600 * 24 * days
  50. ):
  51. self.aliyun_log.logging(
  52. code="2004",
  53. trace_id=self.trace_id,
  54. data=self.item,
  55. message="发布时间超过{}天".format(days),
  56. )
  57. return False
  58. return True
  59. def title_flag(self):
  60. """
  61. 视频标题是否满足需求
  62. :return:
  63. """
  64. title = self.item["video_title"]
  65. cleaned_title = re.sub(r"[^\w]", " ", title)
  66. # 敏感词
  67. # 获取敏感词列表
  68. sensitive_words = []
  69. if any(word in cleaned_title for word in sensitive_words):
  70. self.aliyun_log.logging(
  71. code="2003",
  72. trace_id=self.trace_id,
  73. message="标题中包含敏感词",
  74. data=self.item,
  75. )
  76. return False
  77. return True
  78. def download_rule_flag(self):
  79. """
  80. 视频基础下载规则
  81. :return:
  82. """
  83. for key in self.item:
  84. if self.rule_dict.get(key):
  85. max_value = (
  86. int(self.rule_dict[key]["max"])
  87. if int(self.rule_dict[key]["max"]) > 0
  88. else 999999999999999
  89. )
  90. if key == "peroid": # peroid是抓取周期天数
  91. continue
  92. else:
  93. flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value
  94. if not flag:
  95. self.aliyun_log.logging(
  96. code="2004",
  97. trace_id=self.trace_id,
  98. data=self.item,
  99. message="{}: {} <= {} <= {}, {}".format(
  100. key,
  101. self.rule_dict[key]["min"],
  102. self.item[key],
  103. max_value,
  104. flag,
  105. ),
  106. )
  107. return flag
  108. else:
  109. continue
  110. return True
  111. # 按照某个具体平台来去重
  112. def repeat_video(self):
  113. """
  114. 视频是否重复
  115. :return:
  116. """
  117. # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """
  118. out_id = self.item["out_video_id"]
  119. sql = f""" select 1 from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
  120. repeat_video = self.mysql.select(sql=sql)
  121. # print(repeat_video)
  122. if repeat_video:
  123. self.aliyun_log.logging(
  124. code="2002",
  125. trace_id=self.trace_id,
  126. message="重复的视频",
  127. data=self.item,
  128. )
  129. return False
  130. return True
  131. def process_item(self):
  132. """
  133. 全规则判断,符合规则的数据则return True
  134. :return:
  135. """
  136. if not self.publish_time_flag():
  137. # 记录相关日志
  138. return False
  139. if not self.title_flag():
  140. # 记录相关日志
  141. return False
  142. if not self.repeat_video():
  143. # 记录相关日志
  144. return False
  145. if not self.download_rule_flag():
  146. # 记录相关日志
  147. return False
  148. return True