pipeline.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. import re
  2. import sys
  3. import os
  4. import time
  5. sys.path.append(os.getcwd())
  6. from application.common import MysqlHelper, AliyunLogger
  7. class PiaoQuanPipeline(object):
  8. """
  9. 爬虫管道——爬虫规则判断
  10. """
  11. def __init__(self, platform, mode, rule_dict, env, item, trace_id, account=None):
  12. self.platform = platform
  13. self.mode = mode
  14. self.item = item
  15. self.rule_dict = rule_dict
  16. self.env = env
  17. self.trace_id = trace_id
  18. self.mysql = MysqlHelper(env=env, mode=mode, platform=platform)
  19. self.aliyun_log = AliyunLogger(platform=platform, mode=mode, env=env)
  20. self.account = account
  21. def publish_time_flag(self):
  22. """
  23. 判断发布时间是否过期
  24. :return: True or False
  25. """
  26. # 判断发布时间
  27. publish_time_stamp = self.item["publish_time_stamp"]
  28. update_time_stamp = self.item["update_time_stamp"]
  29. max_d = self.rule_dict.get("period", {}).get("max", 1000)
  30. min_d = self.rule_dict.get("period", {}).get("min", 1000)
  31. days = max_d if max_d > min_d else min_d
  32. if self.platform == "gongzhonghao":
  33. if (
  34. int(time.time()) - publish_time_stamp
  35. > 3600 * 24 * days
  36. ) and (
  37. int(time.time()) - update_time_stamp
  38. > 3600 * 24 * days
  39. ):
  40. self.aliyun_log.logging(
  41. code="2004",
  42. trace_id=self.trace_id,
  43. data=self.item,
  44. message="发布时间超过{}天".format(days),
  45. )
  46. return False
  47. else:
  48. if (
  49. int(time.time()) - publish_time_stamp
  50. > 3600 * 24 * days
  51. ):
  52. self.aliyun_log.logging(
  53. code="2004",
  54. trace_id=self.trace_id,
  55. data=self.item,
  56. message="发布时间超过{}天".format(days),
  57. )
  58. return False
  59. return True
  60. def title_flag(self):
  61. """
  62. 视频标题是否满足需求
  63. :return:
  64. """
  65. title = self.item["video_title"]
  66. cleaned_title = re.sub(r"[^\w]", " ", title)
  67. # 敏感词
  68. # 获取敏感词列表
  69. sensitive_words = []
  70. if any(word in cleaned_title for word in sensitive_words):
  71. self.aliyun_log.logging(
  72. code="2003",
  73. trace_id=self.trace_id,
  74. message="标题中包含敏感词",
  75. data=self.item,
  76. account=self.account
  77. )
  78. return False
  79. return True
  80. def download_rule_flag(self):
  81. """
  82. 视频基础下载规则
  83. :return:
  84. """
  85. for key in self.item:
  86. if self.rule_dict.get(key):
  87. max_value = (
  88. int(self.rule_dict[key]["max"])
  89. if int(self.rule_dict[key]["max"]) > 0
  90. else 999999999999999
  91. )
  92. if key == "peroid": # peroid是抓取周期天数
  93. continue
  94. else:
  95. flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value
  96. if not flag:
  97. self.aliyun_log.logging(
  98. code="2004",
  99. trace_id=self.trace_id,
  100. data=self.item,
  101. message="{}: {} <= {} <= {}, {}".format(
  102. key,
  103. self.rule_dict[key]["min"],
  104. self.item[key],
  105. max_value,
  106. flag,
  107. ),
  108. account=self.account
  109. )
  110. return flag
  111. else:
  112. continue
  113. return True
  114. # 按照某个具体平台来去重
  115. def repeat_video(self):
  116. """
  117. 视频是否重复
  118. :return:
  119. """
  120. if self.platform == "zhuwanwufusunew" and self.mode == "recommend":
  121. return True
  122. if self.platform == "jixiangxingfu" and self.mode == "recommend":
  123. return True
  124. out_id = self.item["out_video_id"]
  125. # 判断加上标题去重
  126. if self.mode == "recommend" and self.platform == "yuannifuqichangzai":
  127. title = self.item["video_title"]
  128. sql = f""" select 1 from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}" and video_title="{title}"; """
  129. # 判断加上标题去重
  130. elif self.mode == "recommend" and self.platform == "zhufuhaoyunbaofu":
  131. title = self.item["video_title"]
  132. sql = f""" select 1 from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}" and video_title="{title}"; """
  133. else:
  134. sql = f""" select 1 from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
  135. repeat_video = self.mysql.select(sql=sql)
  136. if repeat_video:
  137. # 喜事多多平台 4 天去重一次
  138. if self.platform == "xishiduoduo":
  139. sql_2 = f"""select create_time from crawler_video where out_video_id="{out_id}";"""
  140. video_time = self.mysql.select(sql=sql_2)[0][0].timestamp()
  141. if int(time.time()) - video_time >= 86400 * 4:
  142. return True
  143. self.aliyun_log.logging(
  144. code="2002",
  145. trace_id=self.trace_id,
  146. message="重复的视频",
  147. data=self.item,
  148. account=self.account
  149. )
  150. return False
  151. return True
  152. def process_item(self):
  153. """
  154. 全规则判断,符合规则的数据则return True
  155. :return:
  156. """
  157. if not self.publish_time_flag():
  158. # 记录相关日志
  159. return False
  160. if not self.title_flag():
  161. # 记录相关日志
  162. return False
  163. if not self.repeat_video():
  164. # 记录相关日志
  165. return False
  166. if not self.download_rule_flag():
  167. # 记录相关日志
  168. return False
  169. return True
  170. # if __name__ == '__main__':
  171. # sql_2 = f"""select create_time from crawler_video where video_id='18940470';"""
  172. # Mysql = MysqlHelper(platform="xishiduoduo", mode="recommend")
  173. # video_time = Mysql.select(sql=sql_2)
  174. # print(video_time)
  175. # print(video_time[0])
  176. # print(video_time[0][0])
  177. # print(type(video_time[0][0]))
  178. # print(video_time[0][0].timestamp())