pipeline.py 7.2 KB


  1. import hashlib
  2. import re
  3. import sys
  4. import os
  5. import time
  6. sys.path.append(os.getcwd())
  7. from application.common import MysqlHelper, AliyunLogger
  8. from application.common.redis.pyredis import RedisClient
  9. class PiaoQuanPipeline(object):
  10. """
  11. 爬虫管道——爬虫规则判断
  12. """
  13. def __init__(self, platform, mode, rule_dict, env, item, trace_id, account=None):
  14. self.platform = platform
  15. self.mode = mode
  16. self.item = item
  17. self.rule_dict = rule_dict
  18. self.env = env
  19. self.trace_id = trace_id
  20. self.mysql = MysqlHelper(env=env, mode=mode, platform=platform)
  21. self.aliyun_log = AliyunLogger(platform=platform, mode=mode, env=env)
  22. self.account = account
  23. self.red = RedisClient()
  24. def publish_time_flag(self):
  25. """
  26. 判断发布时间是否过期
  27. :return: True or False
  28. """
  29. # 判断发布时间
  30. publish_time_stamp = self.item["publish_time_stamp"]
  31. update_time_stamp = self.item["update_time_stamp"]
  32. max_d = self.rule_dict.get("period", {}).get("max", 1000)
  33. min_d = self.rule_dict.get("period", {}).get("min", 1000)
  34. days = max_d if max_d > min_d else min_d
  35. if self.platform == "gongzhonghao":
  36. if (
  37. int(time.time()) - publish_time_stamp
  38. > 3600 * 24 * days
  39. ) and (
  40. int(time.time()) - update_time_stamp
  41. > 3600 * 24 * days
  42. ):
  43. self.aliyun_log.logging(
  44. code="2004",
  45. trace_id=self.trace_id,
  46. data=self.item,
  47. message="发布时间超过{}天".format(days),
  48. )
  49. return False
  50. else:
  51. if (
  52. int(time.time()) - publish_time_stamp
  53. > 3600 * 24 * days
  54. ):
  55. self.aliyun_log.logging(
  56. code="2004",
  57. trace_id=self.trace_id,
  58. data=self.item,
  59. message="发布时间超过{}天".format(days),
  60. )
  61. return False
  62. return True
  63. def title_flag(self):
  64. """
  65. 视频标题是否满足需求
  66. :return:
  67. """
  68. title = self.item["video_title"]
  69. cleaned_title = re.sub(r"[^\w]", " ", title)
  70. # 敏感词
  71. # 获取敏感词列表
  72. sensitive_words = []
  73. if any(word in cleaned_title for word in sensitive_words):
  74. self.aliyun_log.logging(
  75. code="2003",
  76. trace_id=self.trace_id,
  77. message="标题中包含敏感词",
  78. data=self.item,
  79. account=self.account
  80. )
  81. return False
  82. return True
  83. def download_rule_flag(self):
  84. """
  85. 视频基础下载规则
  86. :return:
  87. """
  88. for key in self.item:
  89. if self.rule_dict.get(key):
  90. max_value = (
  91. int(self.rule_dict[key]["max"])
  92. if int(self.rule_dict[key]["max"]) > 0
  93. else 999999999999999
  94. )
  95. if key == "peroid": # peroid是抓取周期天数
  96. continue
  97. else:
  98. flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value
  99. if not flag:
  100. self.aliyun_log.logging(
  101. code="2004",
  102. trace_id=self.trace_id,
  103. data=self.item,
  104. message="{}: {} <= {} <= {}, {}".format(
  105. key,
  106. self.rule_dict[key]["min"],
  107. self.item[key],
  108. max_value,
  109. flag,
  110. ),
  111. account=self.account
  112. )
  113. return flag
  114. else:
  115. continue
  116. return True
  117. # 按照某个具体平台来去重
  118. def repeat_video(self):
  119. """
  120. 视频是否重复
  121. :return:
  122. """
  123. if self.platform == "zhuwanwufusunew" and self.mode == "recommend":
  124. return True
  125. if self.platform == "jixiangxingfu" and self.mode == "recommend":
  126. return True
  127. out_id = self.item["out_video_id"]
  128. # 判断加上标题去重
  129. if self.mode == "recommend" and self.platform == "yuannifuqichangzai":
  130. title = self.item["video_title"]
  131. sql = f""" select 1 from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}" and video_title="{title}"; """
  132. # 判断加上标题去重
  133. elif self.mode == "recommend" and self.platform == "zhufuhaoyunbaofu":
  134. title = self.item["video_title"]
  135. sql = f""" select 1 from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}" and video_title="{title}"; """
  136. else:
  137. sql = f""" select 1 from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
  138. repeat_video = self.mysql.select(sql=sql)
  139. if repeat_video:
  140. # 喜事多多平台 4 天去重一次
  141. if self.platform == "xishiduoduo":
  142. sql_2 = f"""select create_time from crawler_video where out_video_id="{out_id}";"""
  143. video_time = self.mysql.select(sql=sql_2)[0][0].timestamp()
  144. if int(time.time()) - video_time >= 86400 * 4:
  145. return True
  146. self.aliyun_log.logging(
  147. code="2002",
  148. trace_id=self.trace_id,
  149. message="重复的视频",
  150. data=self.item,
  151. account=self.account
  152. )
  153. return False
  154. return True
  155. def mq_exists(self):
  156. """
  157. 检测 mq 是否已经发送过了
  158. :return:
  159. """
  160. if self.red.connect():
  161. index_txt = "{}-{}".format(self.platform, self.item['video_id'])
  162. index_md5 = hashlib.md5(index_txt.encode())
  163. if self.red.select(index_md5):
  164. self.aliyun_log.logging(
  165. code="2007",
  166. trace_id=self.trace_id,
  167. message="该视频 mq 已经发送"
  168. )
  169. return False
  170. else:
  171. self.red.insert(index_md5, int(time.time()), 43200)
  172. return True
  173. else:
  174. return True
  175. def process_item(self):
  176. """
  177. 全规则判断,符合规则的数据则return True
  178. :return:
  179. """
  180. # 判断该 mq 是否已经发了
  181. if not self.mq_exists():
  182. return False
  183. if not self.publish_time_flag():
  184. # 记录相关日志
  185. return False
  186. if not self.title_flag():
  187. # 记录相关日志
  188. return False
  189. if not self.repeat_video():
  190. # 记录相关日志
  191. return False
  192. if not self.download_rule_flag():
  193. # 记录相关日志
  194. return False
  195. return True