pipeline.py 9.3 KB


  1. import hashlib
  2. import re
  3. import sys
  4. import os
  5. import time
  6. from application.common.feishu.feishu_utils import FeishuUtils
  7. sys.path.append(os.getcwd())
  8. from application.common import MysqlHelper, AliyunLogger
  9. # from application.common.redis.pyredis import RedisClient
  10. class PiaoQuanPipeline(object):
  11. """
  12. 爬虫管道——爬虫规则判断
  13. """
  14. def __init__(self, platform, mode, rule_dict, env, item, trace_id, account=None):
  15. self.platform = platform
  16. self.mode = mode
  17. self.item = item
  18. self.rule_dict = rule_dict
  19. self.env = env
  20. self.trace_id = trace_id
  21. self.mysql = MysqlHelper(env=env, mode=mode, platform=platform)
  22. self.aliyun_log = AliyunLogger(platform=platform, mode=mode, env=env)
  23. self.account = account
  24. # self.red = RedisClient()
  25. def publish_time_flag(self):
  26. """
  27. 判断发布时间是否过期
  28. :return: True or False
  29. """
  30. # 判断发布时间
  31. publish_time_stamp = self.item["publish_time_stamp"]
  32. update_time_stamp = self.item["update_time_stamp"]
  33. max_d = self.rule_dict.get("period", {}).get("max", 1000)
  34. min_d = self.rule_dict.get("period", {}).get("min", 1000)
  35. days = max_d if max_d > min_d else min_d
  36. if self.platform == "gongzhonghao":
  37. if (
  38. int(time.time()) - publish_time_stamp
  39. > 3600 * 24 * days
  40. ) and (
  41. int(time.time()) - update_time_stamp
  42. > 3600 * 24 * days
  43. ):
  44. self.aliyun_log.logging(
  45. code="2004",
  46. trace_id=self.trace_id,
  47. data=self.item,
  48. message="发布时间超过{}天".format(days),
  49. )
  50. return False
  51. else:
  52. if (
  53. int(time.time()) - publish_time_stamp
  54. > 3600 * 24 * days
  55. ):
  56. self.aliyun_log.logging(
  57. code="2004",
  58. trace_id=self.trace_id,
  59. data=self.item,
  60. message="发布时间超过{}天".format(days),
  61. )
  62. return False
  63. return True
  64. def title_flag(self):
  65. """
  66. 视频标题是否满足需求
  67. :return:
  68. """
  69. title = self.item["video_title"]
  70. cleaned_title = re.sub(r"[^\w]", " ", title)
  71. # 敏感词
  72. # 获取敏感词列表
  73. sensitive_words = []
  74. if any(word in cleaned_title for word in sensitive_words):
  75. self.aliyun_log.logging(
  76. code="2003",
  77. trace_id=self.trace_id,
  78. message="标题中包含敏感词",
  79. data=self.item,
  80. account=self.account
  81. )
  82. return False
  83. return True
  84. def download_rule_flag(self):
  85. """
  86. 视频基础下载规则
  87. :return:
  88. """
  89. for key in self.item:
  90. if self.rule_dict.get(key):
  91. max_value = (
  92. int(self.rule_dict[key]["max"])
  93. if int(self.rule_dict[key]["max"]) > 0
  94. else 999999999999999
  95. )
  96. if key == "peroid": # peroid是抓取周期天数
  97. continue
  98. else:
  99. flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value
  100. if not flag:
  101. self.aliyun_log.logging(
  102. code="2004",
  103. trace_id=self.trace_id,
  104. data=self.item,
  105. message="{}: {} <= {} <= {}, {}".format(
  106. key,
  107. self.rule_dict[key]["min"],
  108. self.item[key],
  109. max_value,
  110. flag,
  111. ),
  112. account=self.account
  113. )
  114. return flag
  115. else:
  116. continue
  117. return True
  118. def feishu_list(self):
  119. summary = FeishuUtils.get_values_batch("KsoMsyP2ghleM9tzBfmcEEXBnXg", "letS93")
  120. for row in summary[1:]:
  121. channel = row[0]
  122. day_count = row[1]
  123. if channel:
  124. if channel == self.platform:
  125. return day_count
  126. else:
  127. return None
  128. return None
  129. # 按照某个具体平台来去重
  130. def repeat_video(self):
  131. """
  132. 视频是否重复
  133. :return:
  134. """
  135. out_id = self.item["out_video_id"]
  136. day_count = self.feishu_list()
  137. if day_count:
  138. sql_2 = f"""select create_time from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}" AND create_time >= DATE_SUB(NOW(), INTERVAL {int(day_count)} DAY);"""
  139. repeat_video = self.mysql.select(sql=sql_2)
  140. if repeat_video:
  141. self.aliyun_log.logging(
  142. code="2002",
  143. trace_id=self.trace_id,
  144. message="重复的视频",
  145. data=self.item,
  146. account=self.account
  147. )
  148. return False
  149. else:
  150. return True
  151. if self.platform == "zhufuniannianshunxinjixiang" or self.platform == "weiquanshipin" or self.platform == "piaoquangushi" or self.platform == "lepaoledong" or self.platform == "zhufukuaizhuan" or self.platform == "linglingkuailezhufu" or self.platform == "lepaoledongdijie":
  152. return True
  153. if self.platform == "jierizhufuhuakaifugui" or self.platform == "yuannifuqimanman" or self.platform == "haoyunzhufuduo":
  154. return True
  155. if self.platform == "zhuwanwufusunew" and self.mode == "recommend":
  156. return True
  157. if self.platform == "jixiangxingfu" and self.mode == "recommend":
  158. return True
  159. if self.platform == "yuannifuqichangzai" and self.mode == "recommend":
  160. return True
  161. if self.platform == "benshanzhufu" and self.mode == "recommend":
  162. return True
  163. if self.platform == "zuihaodesongni" and self.mode == "recommend":
  164. return True
  165. if self.platform == "tiantianjufuqi" and self.mode == "recommend":
  166. return True
  167. # 判断加上标题去重
  168. if self.mode == "recommend" and self.platform == "zhufuhaoyunbaofu":
  169. title = self.item["video_title"]
  170. sql = f""" select 1 from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}" and video_title="{title}"; """
  171. else:
  172. sql = f""" select 1 from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
  173. repeat_video = self.mysql.select(sql=sql)
  174. if repeat_video:
  175. # 喜事多多平台 4 天去重一次
  176. if self.platform == "xishiduoduo":
  177. sql_2 = f"""select create_time from crawler_video where out_video_id="{out_id}";"""
  178. video_time = self.mysql.select(sql=sql_2)[0][0].timestamp()
  179. if int(time.time()) - video_time >= 86400 * 4:
  180. return True
  181. # 小年糕推荐流和祝福圈子推荐流 3 天去重一次
  182. elif self.platform == "xiaoniangaotuijianliu" or self.platform == "zhufuquanzituijianliu":
  183. sql_2 = f"""select create_time from crawler_video where out_video_id="{out_id}";"""
  184. video_time = self.mysql.select(sql=sql_2)[0][0].timestamp()
  185. if int(time.time()) - video_time >= 86400 * 3:
  186. return True
  187. self.aliyun_log.logging(
  188. code="2002",
  189. trace_id=self.trace_id,
  190. message="重复的视频",
  191. data=self.item,
  192. account=self.account
  193. )
  194. return False
  195. return True
  196. # def mq_exists(self):
  197. # """
  198. # 检测 mq 是否已经发送过了
  199. # :return:
  200. # """
  201. # if self.red.connect():
  202. # index_txt = "{}-{}".format(self.platform, self.item['video_id'])
  203. # index_md5 = hashlib.md5(index_txt.encode()).hexdigest()
  204. # if self.red.select(index_md5):
  205. # self.aliyun_log.logging(
  206. # code="2007",
  207. # trace_id=self.trace_id,
  208. # message="该视频 mq 已经发送"
  209. # )
  210. # return False
  211. # else:
  212. # self.red.insert(index_md5, int(time.time()), 43200)
  213. # return True
  214. # else:
  215. # return True
  216. def process_item(self):
  217. """
  218. 全规则判断,符合规则的数据则return True
  219. :return:
  220. """
  221. # 判断该 mq 是否已经发了
  222. # if not self.mq_exists():
  223. # return False
  224. if not self.publish_time_flag():
  225. # 记录相关日志
  226. return False
  227. if not self.title_flag():
  228. # 记录相关日志
  229. return False
  230. if not self.repeat_video():
  231. # 记录相关日志
  232. return False
  233. if not self.download_rule_flag():
  234. # 记录相关日志
  235. return False
  236. return True