pipeline.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. import hashlib
  2. import re
  3. import sys
  4. import os
  5. import time
  6. sys.path.append(os.getcwd())
  7. from application.common import MysqlHelper, AliyunLogger
  8. # from application.common.redis.pyredis import RedisClient
  9. class PiaoQuanPipeline(object):
  10. """
  11. 爬虫管道——爬虫规则判断
  12. """
  13. def __init__(self, platform, mode, rule_dict, env, item, trace_id, account=None):
  14. self.platform = platform
  15. self.mode = mode
  16. self.item = item
  17. self.rule_dict = rule_dict
  18. self.env = env
  19. self.trace_id = trace_id
  20. self.mysql = MysqlHelper(env=env, mode=mode, platform=platform)
  21. self.aliyun_log = AliyunLogger(platform=platform, mode=mode, env=env)
  22. self.account = account
  23. # self.red = RedisClient()
  24. def publish_time_flag(self):
  25. """
  26. 判断发布时间是否过期
  27. :return: True or False
  28. """
  29. # 判断发布时间
  30. publish_time_stamp = self.item["publish_time_stamp"]
  31. update_time_stamp = self.item["update_time_stamp"]
  32. max_d = self.rule_dict.get("period", {}).get("max", 1000)
  33. min_d = self.rule_dict.get("period", {}).get("min", 1000)
  34. days = max_d if max_d > min_d else min_d
  35. if self.platform == "gongzhonghao":
  36. if (
  37. int(time.time()) - publish_time_stamp
  38. > 3600 * 24 * days
  39. ) and (
  40. int(time.time()) - update_time_stamp
  41. > 3600 * 24 * days
  42. ):
  43. self.aliyun_log.logging(
  44. code="2004",
  45. trace_id=self.trace_id,
  46. data=self.item,
  47. message="发布时间超过{}天".format(days),
  48. )
  49. return False
  50. else:
  51. if (
  52. int(time.time()) - publish_time_stamp
  53. > 3600 * 24 * days
  54. ):
  55. self.aliyun_log.logging(
  56. code="2004",
  57. trace_id=self.trace_id,
  58. data=self.item,
  59. message="发布时间超过{}天".format(days),
  60. )
  61. return False
  62. return True
  63. def title_flag(self):
  64. """
  65. 视频标题是否满足需求
  66. :return:
  67. """
  68. title = self.item["video_title"]
  69. cleaned_title = re.sub(r"[^\w]", " ", title)
  70. # 敏感词
  71. # 获取敏感词列表
  72. sensitive_words = []
  73. if any(word in cleaned_title for word in sensitive_words):
  74. self.aliyun_log.logging(
  75. code="2003",
  76. trace_id=self.trace_id,
  77. message="标题中包含敏感词",
  78. data=self.item,
  79. account=self.account
  80. )
  81. return False
  82. return True
  83. def download_rule_flag(self):
  84. """
  85. 视频基础下载规则
  86. :return:
  87. """
  88. for key in self.item:
  89. if self.rule_dict.get(key):
  90. max_value = (
  91. int(self.rule_dict[key]["max"])
  92. if int(self.rule_dict[key]["max"]) > 0
  93. else 999999999999999
  94. )
  95. if key == "peroid": # peroid是抓取周期天数
  96. continue
  97. else:
  98. flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value
  99. if not flag:
  100. self.aliyun_log.logging(
  101. code="2004",
  102. trace_id=self.trace_id,
  103. data=self.item,
  104. message="{}: {} <= {} <= {}, {}".format(
  105. key,
  106. self.rule_dict[key]["min"],
  107. self.item[key],
  108. max_value,
  109. flag,
  110. ),
  111. account=self.account
  112. )
  113. return flag
  114. else:
  115. continue
  116. return True
  117. # 按照某个具体平台来去重
  118. def repeat_video(self):
  119. """
  120. 视频是否重复
  121. :return:
  122. """
  123. if self.platform == "zhuwanwufusunew" and self.mode == "recommend":
  124. return True
  125. if self.platform == "jixiangxingfu" and self.mode == "recommend":
  126. return True
  127. if self.platform == "yuannifuqichangzai" and self.mode == "recommend":
  128. return True
  129. if self.platform == "benshanzhufu" and self.mode == "recommend":
  130. return True
  131. if self.platform == "zuihaodesongni" and self.mode == "recommend":
  132. return True
  133. if self.platform == "tiantianjufuqi" and self.mode == "recommend":
  134. return True
  135. out_id = self.item["out_video_id"]
  136. # 判断加上标题去重
  137. if self.mode == "recommend" and self.platform == "zhufuhaoyunbaofu":
  138. title = self.item["video_title"]
  139. sql = f""" select 1 from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}" and video_title="{title}"; """
  140. else:
  141. sql = f""" select 1 from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
  142. repeat_video = self.mysql.select(sql=sql)
  143. if repeat_video:
  144. # 喜事多多平台 4 天去重一次
  145. if self.platform == "xishiduoduo":
  146. sql_2 = f"""select create_time from crawler_video where out_video_id="{out_id}";"""
  147. video_time = self.mysql.select(sql=sql_2)[0][0].timestamp()
  148. if int(time.time()) - video_time >= 86400 * 4:
  149. return True
  150. # 小年糕推荐流和祝福圈子推荐流 3 天去重一次
  151. elif self.platform == "xiaoniangaotuijianliu" or self.platform == "zhufuquanzituijianliu":
  152. sql_2 = f"""select create_time from crawler_video where out_video_id="{out_id}";"""
  153. video_time = self.mysql.select(sql=sql_2)[0][0].timestamp()
  154. if int(time.time()) - video_time >= 86400 * 3:
  155. return True
  156. self.aliyun_log.logging(
  157. code="2002",
  158. trace_id=self.trace_id,
  159. message="重复的视频",
  160. data=self.item,
  161. account=self.account
  162. )
  163. return False
  164. return True
  165. # def mq_exists(self):
  166. # """
  167. # 检测 mq 是否已经发送过了
  168. # :return:
  169. # """
  170. # if self.red.connect():
  171. # index_txt = "{}-{}".format(self.platform, self.item['video_id'])
  172. # index_md5 = hashlib.md5(index_txt.encode()).hexdigest()
  173. # if self.red.select(index_md5):
  174. # self.aliyun_log.logging(
  175. # code="2007",
  176. # trace_id=self.trace_id,
  177. # message="该视频 mq 已经发送"
  178. # )
  179. # return False
  180. # else:
  181. # self.red.insert(index_md5, int(time.time()), 43200)
  182. # return True
  183. # else:
  184. # return True
  185. def process_item(self):
  186. """
  187. 全规则判断,符合规则的数据则return True
  188. :return:
  189. """
  190. # 判断该 mq 是否已经发了
  191. # if not self.mq_exists():
  192. # return False
  193. if not self.publish_time_flag():
  194. # 记录相关日志
  195. return False
  196. if not self.title_flag():
  197. # 记录相关日志
  198. return False
  199. if not self.repeat_video():
  200. # 记录相关日志
  201. return False
  202. if not self.download_rule_flag():
  203. # 记录相关日志
  204. return False
  205. return True