pipeline.py 9.7 KB


  1. import re
  2. import time
  3. from .aliyun_log import AliyunLogger
  4. from common.scheduling_db import MysqlHelper
  5. class PiaoQuanPipeline:
  6. def __init__(self, platform, mode, rule_dict, env, item, trace_id):
  7. self.platform = platform
  8. self.mode = mode
  9. self.item = item
  10. self.rule_dict = rule_dict
  11. self.env = env
  12. self.trace_id = trace_id
  13. # 视频的发布时间限制, 属于是规则过滤
  14. def publish_time_flag(self):
  15. # 判断发布时间
  16. publish_time_stamp = self.item["publish_time_stamp"]
  17. update_time_stamp = self.item["update_time_stamp"]
  18. max_d = self.rule_dict.get("period", {}).get("max", 1000)
  19. min_d = self.rule_dict.get("period", {}).get("min", 1000)
  20. days = max_d if max_d > min_d else min_d
  21. if self.platform == "gongzhonghao":
  22. if (
  23. int(time.time()) - publish_time_stamp
  24. > 3600 * 24 * days
  25. ) and (
  26. int(time.time()) - update_time_stamp
  27. > 3600 * 24 * days
  28. ):
  29. AliyunLogger.logging(
  30. code="2004",
  31. trace_id=self.trace_id,
  32. platform=self.platform,
  33. mode=self.mode,
  34. env=self.env,
  35. data=self.item,
  36. message="发布时间超过{}天".format(days),
  37. )
  38. return False
  39. if self.platform == "gongzhongxinhao":
  40. if (
  41. int(time.time()) - publish_time_stamp
  42. > 3600 * 24 * days
  43. ) and (
  44. int(time.time()) - update_time_stamp
  45. > 3600 * 24 * days
  46. ):
  47. AliyunLogger.logging(
  48. code="2004",
  49. trace_id=self.trace_id,
  50. platform=self.platform,
  51. mode=self.mode,
  52. env=self.env,
  53. data=self.item,
  54. message="发布时间超过{}天".format(days),
  55. )
  56. return False
  57. else:
  58. if (
  59. int(time.time()) - publish_time_stamp
  60. > 3600 * 24 * days
  61. ):
  62. AliyunLogger.logging(
  63. code="2004",
  64. trace_id=self.trace_id,
  65. platform=self.platform,
  66. mode=self.mode,
  67. env=self.env,
  68. data=self.item,
  69. message="发布时间超过{}天".format(days),
  70. )
  71. return False
  72. return True
  73. # 视频标题是否满足需求
  74. def title_flag(self):
  75. title = self.item["video_title"]
  76. cleaned_title = re.sub(r"[^\w]", " ", title)
  77. # 敏感词
  78. # 获取敏感词列表
  79. sensitive_words = []
  80. if any(word in cleaned_title for word in sensitive_words):
  81. AliyunLogger.logging(
  82. code="2003",
  83. trace_id=self.trace_id,
  84. platform=self.platform,
  85. mode=self.mode,
  86. env=self.env,
  87. message="标题中包含敏感词",
  88. data=self.item,
  89. )
  90. return False
  91. return True
  92. # 视频基础下载规则
  93. def download_rule_flag(self):
  94. for key in self.item:
  95. if self.rule_dict.get(key):
  96. max_value = (
  97. int(self.rule_dict[key]["max"])
  98. if int(self.rule_dict[key]["max"]) > 0
  99. else 999999999999999
  100. )
  101. if key == "peroid": # peroid是抓取周期天数
  102. continue
  103. else:
  104. flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value
  105. if not flag:
  106. AliyunLogger.logging(
  107. code="2004",
  108. trace_id=self.trace_id,
  109. platform=self.platform,
  110. mode=self.mode,
  111. env=self.env,
  112. data=self.item,
  113. message="{}: {} <= {} <= {}, {}".format(
  114. key,
  115. self.rule_dict[key]["min"],
  116. self.item[key],
  117. max_value,
  118. flag,
  119. ),
  120. )
  121. return flag
  122. else:
  123. continue
  124. return True
  125. # 按照某个具体平台来去重
  126. def repeat_video(self):
  127. # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """
  128. out_id = self.item["out_video_id"]
  129. sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
  130. repeat_video = MysqlHelper.get_values(
  131. log_type=self.mode, crawler=self.platform, env=self.env, sql=sql, action=""
  132. )
  133. if repeat_video:
  134. AliyunLogger.logging(
  135. code="2002",
  136. trace_id=self.trace_id,
  137. platform=self.platform,
  138. mode=self.mode,
  139. env=self.env,
  140. message="重复的视频",
  141. data=self.item,
  142. )
  143. return False
  144. return True
  145. def process_item(self):
  146. if not self.publish_time_flag():
  147. # 记录相关日志
  148. return False
  149. if not self.title_flag():
  150. # 记录相关日志
  151. return False
  152. if not self.repeat_video():
  153. # 记录相关日志
  154. return False
  155. if not self.download_rule_flag():
  156. # 记录相关日志
  157. return False
  158. return True
  159. class PiaoQuanPipelineTest:
  160. def __init__(self, platform, mode, rule_dict, env, item, trace_id):
  161. self.platform = platform
  162. self.mode = mode
  163. self.item = item
  164. self.rule_dict = rule_dict
  165. self.env = env
  166. self.trace_id = trace_id
  167. # 视频的发布时间限制, 属于是规则过滤
  168. def publish_time_flag(self):
  169. # 判断发布时间
  170. publish_time_stamp = self.item["publish_time_stamp"]
  171. update_time_stamp = self.item["update_time_stamp"]
  172. if self.platform == "gongzhonghao":
  173. if (
  174. int(time.time()) - publish_time_stamp
  175. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  176. ) and (
  177. int(time.time()) - update_time_stamp
  178. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  179. ):
  180. message = "发布时间超过{}天".format(
  181. int(self.rule_dict.get("period", {}).get("max", 1000))
  182. )
  183. print(message)
  184. return False
  185. else:
  186. if (
  187. int(time.time()) - publish_time_stamp
  188. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  189. ):
  190. message = "发布时间超过{}天".format(
  191. int(self.rule_dict.get("period", {}).get("max", 1000))
  192. )
  193. print(message)
  194. return False
  195. return True
  196. # 视频标题是否满足需求
  197. def title_flag(self):
  198. title = self.item["video_title"]
  199. cleaned_title = re.sub(r"[^\w]", " ", title)
  200. # 敏感词
  201. # 获取敏感词列表
  202. sensitive_words = []
  203. if any(word in cleaned_title for word in sensitive_words):
  204. message = "标题中包含敏感词"
  205. print(message)
  206. return False
  207. return True
  208. # 视频基础下载规则
  209. def download_rule_flag(self):
  210. for key in self.item:
  211. if self.rule_dict.get(key):
  212. max_value = (
  213. int(self.rule_dict[key]["max"])
  214. if int(self.rule_dict[key]["max"]) > 0
  215. else 999999999999999
  216. )
  217. if key == "peroid": # peroid是抓取周期天数
  218. continue
  219. else:
  220. flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value
  221. if not flag:
  222. message = "{}: {} <= {} <= {}, {}".format(
  223. key,
  224. self.rule_dict[key]["min"],
  225. self.item[key],
  226. max_value,
  227. flag,
  228. )
  229. print(message)
  230. return flag
  231. else:
  232. continue
  233. return True
  234. # 按照某个具体平台来去重
  235. def repeat_video(self):
  236. # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """
  237. out_id = self.item["out_video_id"]
  238. sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
  239. repeat_video = MysqlHelper.get_values(
  240. log_type=self.mode, crawler=self.platform, env=self.env, sql=sql, action=""
  241. )
  242. if repeat_video:
  243. message = "重复的视频"
  244. print(message)
  245. return False
  246. return True
  247. def process_item(self):
  248. if not self.publish_time_flag():
  249. # 记录相关日志
  250. return False
  251. if not self.title_flag():
  252. # 记录相关日志
  253. return False
  254. if not self.repeat_video():
  255. # 记录相关日志
  256. return False
  257. if not self.download_rule_flag():
  258. # 记录相关日志
  259. return False
  260. return True