pipeline.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. import hashlib
  2. import re
  3. import sys
  4. import os
  5. import time
  6. from application.common.feishu.feishu_utils import FeishuUtils
  7. sys.path.append(os.getcwd())
  8. from datetime import datetime
  9. from application.common import MysqlHelper, AliyunLogger
  10. # from application.common.redis.pyredis import RedisClient
  11. class PiaoQuanPipeline(object):
  12. """
  13. 爬虫管道——爬虫规则判断
  14. """
  15. def __init__(self, platform, mode, rule_dict, env, item, trace_id, account=None):
  16. self.platform = platform
  17. self.mode = mode
  18. self.item = item
  19. self.rule_dict = rule_dict
  20. self.env = env
  21. self.trace_id = trace_id
  22. self.mysql = MysqlHelper(env=env, mode=mode, platform=platform)
  23. self.aliyun_log = AliyunLogger(platform=platform, mode=mode, env=env)
  24. self.account = account
  25. # self.red = RedisClient()
  26. def feishu_time_list(self):
  27. summary = FeishuUtils.get_values_batch("KsoMsyP2ghleM9tzBfmcEEXBnXg", "RuLK77")
  28. for row in summary[1:]:
  29. channel = row[0]
  30. day_count = row[1]
  31. if channel:
  32. if channel == self.platform:
  33. return day_count
  34. else:
  35. return None
  36. return None
  37. def publish_time_flag(self):
  38. """
  39. 判断发布时间是否过期
  40. :return: True or False
  41. """
  42. # 判断发布时间
  43. publish_time_stamp = self.item["publish_time_stamp"]
  44. update_time_stamp = self.item["update_time_stamp"]
  45. max_d = self.rule_dict.get("period", {}).get("max", 1000)
  46. min_d = self.rule_dict.get("period", {}).get("min", 1000)
  47. days = max_d if max_d > min_d else min_d
  48. days_time = self.feishu_time_list()
  49. if days_time:
  50. days = int(days_time)
  51. if self.platform == "gongzhonghao":
  52. if (
  53. int(time.time()) - publish_time_stamp
  54. > 3600 * 24 * days
  55. ) and (
  56. int(time.time()) - update_time_stamp
  57. > 3600 * 24 * days
  58. ):
  59. self.aliyun_log.logging(
  60. code="2004",
  61. trace_id=self.trace_id,
  62. data=self.item,
  63. message="发布时间超过{}天".format(days),
  64. )
  65. return False
  66. else:
  67. if days == 0:
  68. publish_time_stamp = int(time.time()) # 示例时间戳
  69. is_today = datetime.fromtimestamp(publish_time_stamp).date() == datetime.today().date()
  70. if not is_today:
  71. return False
  72. elif (
  73. int(time.time()) - publish_time_stamp
  74. > 3600 * 24 * days
  75. ):
  76. self.aliyun_log.logging(
  77. code="2004",
  78. trace_id=self.trace_id,
  79. data=self.item,
  80. message="发布时间超过{}天".format(days),
  81. )
  82. return False
  83. return True
  84. def title_flag(self):
  85. """
  86. 视频标题是否满足需求
  87. :return:
  88. """
  89. title = self.item["video_title"]
  90. cleaned_title = re.sub(r"[^\w]", " ", title)
  91. # 敏感词
  92. # 获取敏感词列表
  93. sensitive_words = []
  94. if any(word in cleaned_title for word in sensitive_words):
  95. self.aliyun_log.logging(
  96. code="2003",
  97. trace_id=self.trace_id,
  98. message="标题中包含敏感词",
  99. data=self.item,
  100. account=self.account
  101. )
  102. return False
  103. return True
  104. def download_rule_flag(self):
  105. """
  106. 视频基础下载规则
  107. :return:
  108. """
  109. for key in self.item:
  110. if self.rule_dict.get(key):
  111. max_value = (
  112. int(self.rule_dict[key]["max"])
  113. if int(self.rule_dict[key]["max"]) > 0
  114. else 999999999999999
  115. )
  116. if key == "peroid": # peroid是抓取周期天数
  117. continue
  118. else:
  119. flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value
  120. if not flag:
  121. self.aliyun_log.logging(
  122. code="2004",
  123. trace_id=self.trace_id,
  124. data=self.item,
  125. message="{}: {} <= {} <= {}, {}".format(
  126. key,
  127. self.rule_dict[key]["min"],
  128. self.item[key],
  129. max_value,
  130. flag,
  131. ),
  132. account=self.account
  133. )
  134. return flag
  135. else:
  136. continue
  137. return True
  138. def feishu_list(self):
  139. summary = FeishuUtils.get_values_batch("KsoMsyP2ghleM9tzBfmcEEXBnXg", "letS93")
  140. for row in summary[1:]:
  141. channel = row[0]
  142. day_count = row[1]
  143. if channel:
  144. if channel == self.platform:
  145. return day_count
  146. else:
  147. return None
  148. return None
  149. # 按照某个具体平台来去重
  150. def repeat_video(self):
  151. """
  152. 视频是否重复
  153. :return:
  154. """
  155. out_id = self.item["out_video_id"]
  156. day_count = self.feishu_list()
  157. if day_count:
  158. sql_2 = f"""select create_time from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}" AND create_time >= DATE_SUB(NOW(), INTERVAL {int(day_count)} DAY);"""
  159. repeat_video = self.mysql.select(sql=sql_2)
  160. if repeat_video:
  161. self.aliyun_log.logging(
  162. code="2002",
  163. trace_id=self.trace_id,
  164. message="重复的视频",
  165. data=self.item,
  166. account=self.account
  167. )
  168. return False
  169. else:
  170. return True
  171. if self.platform == "zhufuniannianshunxinjixiang" or self.platform == "weiquanshipin" or self.platform == "piaoquangushi" or self.platform == "lepaoledong" or self.platform == "zhufukuaizhuan" or self.platform == "linglingkuailezhufu" or self.platform == "lepaoledongdijie":
  172. return True
  173. if self.platform == "jierizhufuhuakaifugui" or self.platform == "yuannifuqimanman" or self.platform == "haoyunzhufuduo" or self.platform == "quzhuan" or self.platform == "zhufudewenhou" or self.platform == "jierizhufuxingfujixiang" or self.platform == "haoyoushipin" or self.platform == "xinshiquan" or self.platform == "laonianshenghuokuaile" or self.platform == "laonianquan":
  174. return True
  175. if self.platform == "zhuwanwufusunew" and self.mode == "recommend":
  176. return True
  177. if self.platform == "jixiangxingfu" and self.mode == "recommend":
  178. return True
  179. if self.platform == "yuannifuqichangzai" and self.mode == "recommend":
  180. return True
  181. if self.platform == "benshanzhufu" and self.mode == "recommend":
  182. return True
  183. if self.platform == "zuihaodesongni" and self.mode == "recommend":
  184. return True
  185. if self.platform == "tiantianjufuqi" and self.mode == "recommend":
  186. return True
  187. # 判断加上标题去重
  188. if self.mode == "recommend" and self.platform == "zhufuhaoyunbaofu":
  189. title = self.item["video_title"]
  190. sql = f""" select 1 from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}" and video_title="{title}"; """
  191. else:
  192. sql = f""" select 1 from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
  193. repeat_video = self.mysql.select(sql=sql)
  194. if repeat_video:
  195. # 喜事多多平台 4 天去重一次
  196. if self.platform == "xishiduoduo":
  197. sql_2 = f"""select create_time from crawler_video where out_video_id="{out_id}";"""
  198. video_time = self.mysql.select(sql=sql_2)[0][0].timestamp()
  199. if int(time.time()) - video_time >= 86400 * 4:
  200. return True
  201. # 小年糕推荐流和祝福圈子推荐流 3 天去重一次
  202. elif self.platform == "xiaoniangaotuijianliu" or self.platform == "zhufuquanzituijianliu":
  203. sql_2 = f"""select create_time from crawler_video where out_video_id="{out_id}";"""
  204. video_time = self.mysql.select(sql=sql_2)[0][0].timestamp()
  205. if int(time.time()) - video_time >= 86400 * 3:
  206. return True
  207. self.aliyun_log.logging(
  208. code="2002",
  209. trace_id=self.trace_id,
  210. message="重复的视频",
  211. data=self.item,
  212. account=self.account
  213. )
  214. return False
  215. return True
  216. # def mq_exists(self):
  217. # """
  218. # 检测 mq 是否已经发送过了
  219. # :return:
  220. # """
  221. # if self.red.connect():
  222. # index_txt = "{}-{}".format(self.platform, self.item['video_id'])
  223. # index_md5 = hashlib.md5(index_txt.encode()).hexdigest()
  224. # if self.red.select(index_md5):
  225. # self.aliyun_log.logging(
  226. # code="2007",
  227. # trace_id=self.trace_id,
  228. # message="该视频 mq 已经发送"
  229. # )
  230. # return False
  231. # else:
  232. # self.red.insert(index_md5, int(time.time()), 43200)
  233. # return True
  234. # else:
  235. # return True
  236. def process_item(self):
  237. """
  238. 全规则判断,符合规则的数据则return True
  239. :return:
  240. """
  241. # 判断该 mq 是否已经发了
  242. # if not self.mq_exists():
  243. # return False
  244. if not self.publish_time_flag():
  245. # 记录相关日志
  246. return False
  247. if not self.title_flag():
  248. # 记录相关日志
  249. return False
  250. if not self.repeat_video():
  251. # 记录相关日志
  252. return False
  253. if not self.download_rule_flag():
  254. # 记录相关日志
  255. return False
  256. return True