pipeline.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. import hashlib
  2. import re
  3. import sys
  4. import os
  5. import time
  6. sys.path.append(os.getcwd())
  7. from application.common import MysqlHelper, AliyunLogger
  8. from application.common.redis.pyredis import RedisClient
  9. class PiaoQuanPipeline(object):
  10. """
  11. 爬虫管道——爬虫规则判断
  12. """
  13. def __init__(self, platform, mode, rule_dict, env, item, trace_id, account=None):
  14. self.platform = platform
  15. self.mode = mode
  16. self.item = item
  17. self.rule_dict = rule_dict
  18. self.env = env
  19. self.trace_id = trace_id
  20. self.mysql = MysqlHelper(env=env, mode=mode, platform=platform)
  21. self.aliyun_log = AliyunLogger(platform=platform, mode=mode, env=env)
  22. self.account = account
  23. self.red = RedisClient()
  24. def publish_time_flag(self):
  25. """
  26. 判断发布时间是否过期
  27. :return: True or False
  28. """
  29. # 判断发布时间
  30. publish_time_stamp = self.item["publish_time_stamp"]
  31. update_time_stamp = self.item["update_time_stamp"]
  32. max_d = self.rule_dict.get("period", {}).get("max", 1000)
  33. min_d = self.rule_dict.get("period", {}).get("min", 1000)
  34. days = max_d if max_d > min_d else min_d
  35. if self.platform == "gongzhonghao":
  36. if (
  37. int(time.time()) - publish_time_stamp
  38. > 3600 * 24 * days
  39. ) and (
  40. int(time.time()) - update_time_stamp
  41. > 3600 * 24 * days
  42. ):
  43. self.aliyun_log.logging(
  44. code="2004",
  45. trace_id=self.trace_id,
  46. data=self.item,
  47. message="发布时间超过{}天".format(days),
  48. )
  49. return False
  50. else:
  51. if (
  52. int(time.time()) - publish_time_stamp
  53. > 3600 * 24 * days
  54. ):
  55. self.aliyun_log.logging(
  56. code="2004",
  57. trace_id=self.trace_id,
  58. data=self.item,
  59. message="发布时间超过{}天".format(days),
  60. )
  61. return False
  62. return True
  63. def title_flag(self):
  64. """
  65. 视频标题是否满足需求
  66. :return:
  67. """
  68. title = self.item["video_title"]
  69. cleaned_title = re.sub(r"[^\w]", " ", title)
  70. # 敏感词
  71. # 获取敏感词列表
  72. sensitive_words = []
  73. if any(word in cleaned_title for word in sensitive_words):
  74. self.aliyun_log.logging(
  75. code="2003",
  76. trace_id=self.trace_id,
  77. message="标题中包含敏感词",
  78. data=self.item,
  79. account=self.account
  80. )
  81. return False
  82. return True
  83. def download_rule_flag(self):
  84. """
  85. 视频基础下载规则
  86. :return:
  87. """
  88. for key in self.item:
  89. if self.rule_dict.get(key):
  90. max_value = (
  91. int(self.rule_dict[key]["max"])
  92. if int(self.rule_dict[key]["max"]) > 0
  93. else 999999999999999
  94. )
  95. if key == "peroid": # peroid是抓取周期天数
  96. continue
  97. else:
  98. flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value
  99. if not flag:
  100. self.aliyun_log.logging(
  101. code="2004",
  102. trace_id=self.trace_id,
  103. data=self.item,
  104. message="{}: {} <= {} <= {}, {}".format(
  105. key,
  106. self.rule_dict[key]["min"],
  107. self.item[key],
  108. max_value,
  109. flag,
  110. ),
  111. account=self.account
  112. )
  113. return flag
  114. else:
  115. continue
  116. return True
  117. # 按照某个具体平台来去重
  118. def repeat_video(self):
  119. """
  120. 视频是否重复
  121. :return:
  122. """
  123. if self.platform == "zhuwanwufusunew" and self.mode == "recommend":
  124. return True
  125. if self.platform == "jixiangxingfu" and self.mode == "recommend":
  126. return True
  127. out_id = self.item["out_video_id"]
  128. # 判断加上标题去重
  129. if self.mode == "recommend" and self.platform == "yuannifuqichangzai":
  130. title = self.item["video_title"]
  131. sql = f""" select 1 from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}" and video_title="{title}"; """
  132. # 判断加上标题去重
  133. elif self.mode == "recommend" and self.platform == "zhufuhaoyunbaofu":
  134. title = self.item["video_title"]
  135. sql = f""" select 1 from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}" and video_title="{title}"; """
  136. else:
  137. sql = f""" select 1 from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
  138. repeat_video = self.mysql.select(sql=sql)
  139. if repeat_video:
  140. # 喜事多多平台 4 天去重一次
  141. if self.platform == "xishiduoduo":
  142. sql_2 = f"""select create_time from crawler_video where out_video_id="{out_id}";"""
  143. video_time = self.mysql.select(sql=sql_2)[0][0].timestamp()
  144. if int(time.time()) - video_time >= 86400 * 4:
  145. return True
  146. self.aliyun_log.logging(
  147. code="2002",
  148. trace_id=self.trace_id,
  149. message="重复的视频",
  150. data=self.item,
  151. account=self.account
  152. )
  153. return False
  154. return True
  155. def mq_exists(self):
  156. """
  157. 检测 mq 是否已经发送过了
  158. :return:
  159. """
  160. if self.red.connect():
  161. index_txt = "{}-{}".format(self.platform, self.item['video_id'])
  162. index_md5 = hashlib.md5(index_txt.encode())
  163. if self.red.select(index_md5):
  164. self.aliyun_log.logging(
  165. code="2007",
  166. trace_id=self.trace_id,
  167. message="该视频 mq 已经发送"
  168. )
  169. return False
  170. else:
  171. self.red.insert(index_md5, int(time.time()), 43200)
  172. return True
  173. else:
  174. return True
  175. def process_item(self):
  176. """
  177. 全规则判断,符合规则的数据则return True
  178. :return:
  179. """
  180. # 判断该 mq 是否已经发了
  181. if not self.mq_exists():
  182. return False
  183. if not self.publish_time_flag():
  184. # 记录相关日志
  185. return False
  186. if not self.title_flag():
  187. # 记录相关日志
  188. return False
  189. if not self.repeat_video():
  190. # 记录相关日志
  191. return False
  192. if not self.download_rule_flag():
  193. # 记录相关日志
  194. return False
  195. return True