pipeline.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. import hashlib
  2. import re
  3. import sys
  4. import os
  5. import time
  6. sys.path.append(os.getcwd())
  7. from application.common import MysqlHelper, AliyunLogger
  8. from application.common.redis.pyredis import RedisClient
  9. class PiaoQuanPipeline(object):
  10. """
  11. 爬虫管道——爬虫规则判断
  12. """
  13. def __init__(self, platform, mode, rule_dict, env, item, trace_id, account=None):
  14. self.platform = platform
  15. self.mode = mode
  16. self.item = item
  17. self.rule_dict = rule_dict
  18. self.env = env
  19. self.trace_id = trace_id
  20. self.mysql = MysqlHelper(env=env, mode=mode, platform=platform)
  21. self.aliyun_log = AliyunLogger(platform=platform, mode=mode, env=env)
  22. self.account = account
  23. self.red = RedisClient()
  24. def publish_time_flag(self):
  25. """
  26. 判断发布时间是否过期
  27. :return: True or False
  28. """
  29. # 判断发布时间
  30. publish_time_stamp = self.item["publish_time_stamp"]
  31. update_time_stamp = self.item["update_time_stamp"]
  32. max_d = self.rule_dict.get("period", {}).get("max", 1000)
  33. min_d = self.rule_dict.get("period", {}).get("min", 1000)
  34. days = max_d if max_d > min_d else min_d
  35. if self.platform == "gongzhonghao":
  36. if (
  37. int(time.time()) - publish_time_stamp
  38. > 3600 * 24 * days
  39. ) and (
  40. int(time.time()) - update_time_stamp
  41. > 3600 * 24 * days
  42. ):
  43. self.aliyun_log.logging(
  44. code="2004",
  45. trace_id=self.trace_id,
  46. data=self.item,
  47. message="发布时间超过{}天".format(days),
  48. )
  49. return False
  50. else:
  51. if (
  52. int(time.time()) - publish_time_stamp
  53. > 3600 * 24 * days
  54. ):
  55. self.aliyun_log.logging(
  56. code="2004",
  57. trace_id=self.trace_id,
  58. data=self.item,
  59. message="发布时间超过{}天".format(days),
  60. )
  61. return False
  62. return True
  63. def title_flag(self):
  64. """
  65. 视频标题是否满足需求
  66. :return:
  67. """
  68. title = self.item["video_title"]
  69. cleaned_title = re.sub(r"[^\w]", " ", title)
  70. # 敏感词
  71. # 获取敏感词列表
  72. sensitive_words = []
  73. if any(word in cleaned_title for word in sensitive_words):
  74. self.aliyun_log.logging(
  75. code="2003",
  76. trace_id=self.trace_id,
  77. message="标题中包含敏感词",
  78. data=self.item,
  79. account=self.account
  80. )
  81. return False
  82. return True
  83. def download_rule_flag(self):
  84. """
  85. 视频基础下载规则
  86. :return:
  87. """
  88. for key in self.item:
  89. if self.rule_dict.get(key):
  90. max_value = (
  91. int(self.rule_dict[key]["max"])
  92. if int(self.rule_dict[key]["max"]) > 0
  93. else 999999999999999
  94. )
  95. if key == "peroid": # peroid是抓取周期天数
  96. continue
  97. else:
  98. flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value
  99. if not flag:
  100. self.aliyun_log.logging(
  101. code="2004",
  102. trace_id=self.trace_id,
  103. data=self.item,
  104. message="{}: {} <= {} <= {}, {}".format(
  105. key,
  106. self.rule_dict[key]["min"],
  107. self.item[key],
  108. max_value,
  109. flag,
  110. ),
  111. account=self.account
  112. )
  113. return flag
  114. else:
  115. continue
  116. return True
  117. # 按照某个具体平台来去重
  118. def repeat_video(self):
  119. """
  120. 视频是否重复
  121. :return:
  122. """
  123. if self.platform == "zhuwanwufusunew" and self.mode == "recommend":
  124. return True
  125. if self.platform == "jixiangxingfu" and self.mode == "recommend":
  126. return True
  127. if self.platform == "yuannifuqichangzai" and self.mode == "recommend":
  128. return True
  129. if self.platform == "zuihaodesongni" and self.mode == "recommend":
  130. return True
  131. if self.platform == "tiantianjufuqi" and self.mode == "recommend":
  132. return True
  133. out_id = self.item["out_video_id"]
  134. # 判断加上标题去重
  135. if self.mode == "recommend" and self.platform == "zhufuhaoyunbaofu":
  136. title = self.item["video_title"]
  137. sql = f""" select 1 from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}" and video_title="{title}"; """
  138. else:
  139. sql = f""" select 1 from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
  140. repeat_video = self.mysql.select(sql=sql)
  141. if repeat_video:
  142. # 喜事多多平台 4 天去重一次
  143. if self.platform == "xishiduoduo":
  144. sql_2 = f"""select create_time from crawler_video where out_video_id="{out_id}";"""
  145. video_time = self.mysql.select(sql=sql_2)[0][0].timestamp()
  146. if int(time.time()) - video_time >= 86400 * 4:
  147. return True
  148. self.aliyun_log.logging(
  149. code="2002",
  150. trace_id=self.trace_id,
  151. message="重复的视频",
  152. data=self.item,
  153. account=self.account
  154. )
  155. return False
  156. return True
  157. def mq_exists(self):
  158. """
  159. 检测 mq 是否已经发送过了
  160. :return:
  161. """
  162. if self.red.connect():
  163. index_txt = "{}-{}".format(self.platform, self.item['video_id'])
  164. index_md5 = hashlib.md5(index_txt.encode()).hexdigest()
  165. if self.red.select(index_md5):
  166. self.aliyun_log.logging(
  167. code="2007",
  168. trace_id=self.trace_id,
  169. message="该视频 mq 已经发送"
  170. )
  171. return False
  172. else:
  173. self.red.insert(index_md5, int(time.time()), 43200)
  174. return True
  175. else:
  176. return True
  177. def process_item(self):
  178. """
  179. 全规则判断,符合规则的数据则return True
  180. :return:
  181. """
  182. # 判断该 mq 是否已经发了
  183. if not self.mq_exists():
  184. return False
  185. if not self.publish_time_flag():
  186. # 记录相关日志
  187. return False
  188. if not self.title_flag():
  189. # 记录相关日志
  190. return False
  191. if not self.repeat_video():
  192. # 记录相关日志
  193. return False
  194. if not self.download_rule_flag():
  195. # 记录相关日志
  196. return False
  197. return True