pipeline.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. import re
  2. import sys
  3. import os
  4. import time
  5. from application.config.common.feishu.feishu_utils import FeishuUtils
  6. sys.path.append(os.getcwd())
  7. from datetime import datetime
  8. from application.config.common import MysqlHelper, AliyunLogger
  9. from application.config.common import RedisClient
  10. class PiaoQuanPipeline(object):
  11. """
  12. 爬虫管道——爬虫规则判断
  13. """
  14. def __init__(self, platform, mode, rule_dict, env, item, trace_id, account=None):
  15. self.platform = platform
  16. self.mode = mode
  17. self.item = item
  18. self.rule_dict = rule_dict
  19. self.env = env
  20. self.trace_id = trace_id
  21. self.mysql = MysqlHelper(env=env, mode=mode, platform=platform)
  22. self.aliyun_log = AliyunLogger(platform=platform, mode=mode, env=env)
  23. self.account = account
  24. self.red = RedisClient()
  25. def feishu_time_list(self):
  26. summary = FeishuUtils.get_values_batch("KsoMsyP2ghleM9tzBfmcEEXBnXg", "RuLK77")
  27. for row in summary[1:]:
  28. channel = row[0]
  29. day_count = row[1]
  30. if channel:
  31. if channel == self.platform:
  32. return day_count
  33. else:
  34. return None
  35. return None
  36. def publish_time_flag(self):
  37. """
  38. 判断发布时间是否过期
  39. :return: True or False
  40. """
  41. # 判断发布时间
  42. publish_time_stamp = self.item["publish_time_stamp"]
  43. update_time_stamp = self.item["update_time_stamp"]
  44. max_d = self.rule_dict.get("period", {}).get("max", 1000)
  45. min_d = self.rule_dict.get("period", {}).get("min", 1000)
  46. days = max_d if max_d > min_d else min_d
  47. days_time = self.feishu_time_list()
  48. if days_time:
  49. days = int(days_time)
  50. if self.platform == "gongzhonghao":
  51. if (
  52. int(time.time()) - publish_time_stamp
  53. > 3600 * 24 * days
  54. ) and (
  55. int(time.time()) - update_time_stamp
  56. > 3600 * 24 * days
  57. ):
  58. self.aliyun_log.logging(
  59. code="2004",
  60. trace_id=self.trace_id,
  61. data=self.item,
  62. message="发布时间超过{}天".format(days),
  63. )
  64. return False
  65. else:
  66. if days == 0:
  67. publish_time_stamp = int(time.time()) # 示例时间戳
  68. is_today = datetime.fromtimestamp(publish_time_stamp).date() == datetime.today().date()
  69. if not is_today:
  70. return False
  71. elif (
  72. int(time.time()) - publish_time_stamp
  73. > 3600 * 24 * days
  74. ):
  75. self.aliyun_log.logging(
  76. code="2004",
  77. trace_id=self.trace_id,
  78. data=self.item,
  79. message="发布时间超过{}天".format(days),
  80. )
  81. return False
  82. return True
  83. def title_flag(self):
  84. """
  85. 视频标题是否满足需求
  86. :return:
  87. """
  88. title = self.item["video_title"]
  89. cleaned_title = re.sub(r"[^\w]", " ", title)
  90. # 敏感词
  91. # 获取敏感词列表
  92. sensitive_words = []
  93. if any(word in cleaned_title for word in sensitive_words):
  94. self.aliyun_log.logging(
  95. code="2003",
  96. trace_id=self.trace_id,
  97. message="标题中包含敏感词",
  98. data=self.item,
  99. account=self.account
  100. )
  101. return False
  102. return True
  103. def download_rule_flag(self):
  104. """
  105. 视频基础下载规则
  106. :return:
  107. """
  108. for key in self.item:
  109. if self.rule_dict.get(key):
  110. max_value = (
  111. int(self.rule_dict[key]["max"])
  112. if int(self.rule_dict[key]["max"]) > 0
  113. else 999999999999999
  114. )
  115. if key == "peroid": # peroid是抓取周期天数
  116. continue
  117. else:
  118. flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value
  119. if not flag:
  120. self.aliyun_log.logging(
  121. code="2004",
  122. trace_id=self.trace_id,
  123. data=self.item,
  124. message="{}: {} <= {} <= {}, {}".format(
  125. key,
  126. self.rule_dict[key]["min"],
  127. self.item[key],
  128. max_value,
  129. flag,
  130. ),
  131. account=self.account
  132. )
  133. return flag
  134. else:
  135. continue
  136. return True
  137. def feishu_list(self):
  138. summary = FeishuUtils.get_values_batch("KsoMsyP2ghleM9tzBfmcEEXBnXg", "letS93")
  139. for row in summary[1:]:
  140. channel = row[0]
  141. day_count = row[1]
  142. if channel:
  143. if channel == self.platform:
  144. return day_count
  145. else:
  146. return None
  147. return None
  148. # 按照某个具体平台来去重
  149. def repeat_video(self):
  150. """
  151. 视频是否重复
  152. :return:
  153. """
  154. out_id = self.item["out_video_id"]
  155. day_count = self.feishu_list()
  156. if day_count:
  157. sql_2 = f"""select create_time from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}" AND create_time >= DATE_SUB(NOW(), INTERVAL {int(day_count)} DAY);"""
  158. repeat_video = self.mysql.select(sql=sql_2)
  159. if repeat_video:
  160. self.aliyun_log.logging(
  161. code="2002",
  162. trace_id=self.trace_id,
  163. message="重复的视频",
  164. data=self.item,
  165. account=self.account
  166. )
  167. return False
  168. else:
  169. return True
  170. if self.platform == "zhufuniannianshunxinjixiang" or self.platform == "weiquanshipin" or self.platform == "piaoquangushi" or self.platform == "lepaoledong" or self.platform == "zhufukuaizhuan" or self.platform == "linglingkuailezhufu" or self.platform == "lepaoledongdijie":
  171. return True
  172. if self.platform == "jierizhufuhuakaifugui" or self.platform == "yuannifuqimanman" or self.platform == "haoyunzhufuduo" or self.platform == "quzhuan" or self.platform == "zhufudewenhou" or self.platform == "jierizhufuxingfujixiang" or self.platform == "haoyoushipin" or self.platform == "xinshiquan" or self.platform == "laonianshenghuokuaile" or self.platform == "laonianquan":
  173. return True
  174. if self.platform == "zhuwanwufusunew" and self.mode == "recommend":
  175. return True
  176. if self.platform == "jixiangxingfu" and self.mode == "recommend":
  177. return True
  178. if self.platform == "yuannifuqichangzai" and self.mode == "recommend":
  179. return True
  180. if self.platform == "benshanzhufu" and self.mode == "recommend":
  181. return True
  182. if self.platform == "zuihaodesongni" and self.mode == "recommend":
  183. return True
  184. if self.platform == "tiantianjufuqi" and self.mode == "recommend":
  185. return True
  186. # 判断加上标题去重
  187. if self.mode == "recommend" and self.platform == "zhufuhaoyunbaofu":
  188. title = self.item["video_title"]
  189. sql = f""" select 1 from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}" and video_title="{title}"; """
  190. else:
  191. sql = f""" select 1 from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
  192. repeat_video = self.mysql.select(sql=sql)
  193. if repeat_video:
  194. # 喜事多多平台 4 天去重一次
  195. if self.platform == "xishiduoduo":
  196. sql_2 = f"""select create_time from crawler_video where out_video_id="{out_id}";"""
  197. video_time = self.mysql.select(sql=sql_2)[0][0].timestamp()
  198. if int(time.time()) - video_time >= 86400 * 4:
  199. return True
  200. # 小年糕推荐流和祝福圈子推荐流 3 天去重一次
  201. elif self.platform == "xiaoniangaotuijianliu" or self.platform == "zhufuquanzituijianliu":
  202. sql_2 = f"""select create_time from crawler_video where out_video_id="{out_id}";"""
  203. video_time = self.mysql.select(sql=sql_2)[0][0].timestamp()
  204. if int(time.time()) - video_time >= 86400 * 3:
  205. return True
  206. self.aliyun_log.logging(
  207. code="2002",
  208. trace_id=self.trace_id,
  209. message="重复的视频",
  210. data=self.item,
  211. account=self.account
  212. )
  213. return False
  214. return True
  215. # def mq_exists(self):
  216. # """
  217. # 检测 mq 是否已经发送过了
  218. # :return:
  219. # """
  220. # if self.red.connect():
  221. # index_txt = "{}-{}".format(self.platform, self.item['video_id'])
  222. # index_md5 = hashlib.md5(index_txt.encode()).hexdigest()
  223. # if self.red.select(index_md5):
  224. # self.aliyun_log.logging(
  225. # code="2007",
  226. # trace_id=self.trace_id,
  227. # message="该视频 mq 已经发送"
  228. # )
  229. # return False
  230. # else:
  231. # self.red.insert(index_md5, int(time.time()), 43200)
  232. # return True
  233. # else:
  234. # return True
  235. def process_item(self):
  236. """
  237. 全规则判断,符合规则的数据则return True
  238. :return:
  239. """
  240. # 判断该 mq 是否已经发了
  241. # if not self.mq_exists():
  242. # return False
  243. if not self.publish_time_flag():
  244. # 记录相关日志
  245. return False
  246. if not self.title_flag():
  247. # 记录相关日志
  248. return False
  249. if not self.repeat_video():
  250. # 记录相关日志
  251. return False
  252. if not self.download_rule_flag():
  253. # 记录相关日志
  254. return False
  255. return True