pipeline.py 9.6 KB


  1. import re
  2. import time
  3. from .aliyun_log import AliyunLogger
  4. from common.scheduling_db import MysqlHelper
  5. class PiaoQuanPipeline:
  6. """
  7. pipeline code
  8. """
  9. def __init__(self, platform, mode, rule_dict, env, item, trace_id):
  10. self.platform = platform
  11. self.mode = mode
  12. self.item = item
  13. self.rule_dict = rule_dict
  14. self.env = env
  15. self.trace_id = trace_id
  16. def publish_time_flag(self):
  17. """
  18. 发布时间判断
  19. """
  20. publish_time_stamp = self.item["publish_time_stamp"]
  21. update_time_stamp = self.item["update_time_stamp"]
  22. max_d = self.rule_dict.get("period", {}).get("max", 1000)
  23. min_d = self.rule_dict.get("period", {}).get("min", 1000)
  24. days = max_d if max_d > min_d else min_d
  25. if self.platform == "gongzhonghao" or self.platform == "gongzhongxinhao":
  26. if (int(time.time()) - publish_time_stamp > 3600 * 24 * days) and (
  27. int(time.time()) - update_time_stamp > 3600 * 24 * days
  28. ):
  29. AliyunLogger.logging(
  30. code="2004",
  31. trace_id=self.trace_id,
  32. platform=self.platform,
  33. mode=self.mode,
  34. env=self.env,
  35. data=self.item,
  36. message="发布时间超过{}天".format(days),
  37. )
  38. return False
  39. else:
  40. if int(time.time()) - publish_time_stamp > 3600 * 24 * days:
  41. AliyunLogger.logging(
  42. code="2004",
  43. trace_id=self.trace_id,
  44. platform=self.platform,
  45. mode=self.mode,
  46. env=self.env,
  47. data=self.item,
  48. message="发布时间超过{}天".format(days),
  49. )
  50. return False
  51. return True
  52. def title_flag(self):
  53. """
  54. 视频标题是否满足需求
  55. """
  56. title = self.item["video_title"]
  57. cleaned_title = re.sub(r"[^\w]", " ", title)
  58. # 敏感词
  59. # 获取敏感词列表
  60. sensitive_words = []
  61. if any(word in cleaned_title for word in sensitive_words):
  62. AliyunLogger.logging(
  63. code="2003",
  64. trace_id=self.trace_id,
  65. platform=self.platform,
  66. mode=self.mode,
  67. env=self.env,
  68. message="标题中包含敏感词",
  69. data=self.item,
  70. )
  71. return False
  72. return True
  73. def download_rule_flag(self):
  74. """
  75. 视频基础下载规则
  76. """
  77. for key in self.item:
  78. if self.rule_dict.get(key):
  79. max_value = (
  80. int(self.rule_dict[key]["max"])
  81. if int(self.rule_dict[key]["max"]) > 0
  82. else 999999999999999
  83. )
  84. if key == "peroid": # peroid是抓取周期天数
  85. continue
  86. else:
  87. flag = (
  88. int(self.rule_dict[key]["min"])
  89. <= int(self.item[key])
  90. <= max_value
  91. )
  92. if not flag:
  93. AliyunLogger.logging(
  94. code="2004",
  95. trace_id=self.trace_id,
  96. platform=self.platform,
  97. mode=self.mode,
  98. env=self.env,
  99. data=self.item,
  100. message="{}: {} <= {} <= {}, {}".format(
  101. key,
  102. self.rule_dict[key]["min"],
  103. self.item[key],
  104. max_value,
  105. flag,
  106. ),
  107. )
  108. return flag
  109. else:
  110. continue
  111. return True
  112. def repeat_video(self):
  113. """
  114. 按照某个具体平台来去重
  115. """
  116. # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """
  117. if self.platform == "jingdianfuqiwang" and self.mode == "recommend":
  118. return True
  119. if self.platform == "zhujinshanjinmei" and self.mode == "recommend":
  120. return True
  121. if self.platform == "zhubaisuizhihao" and self.mode == "recommend":
  122. return True
  123. out_id = self.item["out_video_id"]
  124. sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
  125. repeat_video = MysqlHelper.get_values(
  126. log_type=self.mode, crawler=self.platform, env=self.env, sql=sql, action=""
  127. )
  128. if repeat_video:
  129. AliyunLogger.logging(
  130. code="2002",
  131. trace_id=self.trace_id,
  132. platform=self.platform,
  133. mode=self.mode,
  134. env=self.env,
  135. message="重复的视频",
  136. data=self.item,
  137. )
  138. return False
  139. return True
  140. def process_item(self):
  141. """
  142. 判断是否符合规则
  143. """
  144. if not self.publish_time_flag():
  145. # 记录相关日志
  146. return False
  147. if not self.title_flag():
  148. # 记录相关日志
  149. return False
  150. if not self.repeat_video():
  151. # 记录相关日志
  152. return False
  153. if not self.download_rule_flag():
  154. # 记录相关日志
  155. return False
  156. return True
  157. class PiaoQuanPipelineTest:
  158. def __init__(self, platform, mode, rule_dict, env, item, trace_id):
  159. self.platform = platform
  160. self.mode = mode
  161. self.item = item
  162. self.rule_dict = rule_dict
  163. self.env = env
  164. self.trace_id = trace_id
  165. # 视频的发布时间限制, 属于是规则过滤
  166. def publish_time_flag(self):
  167. # 判断发布时间
  168. publish_time_stamp = self.item["publish_time_stamp"]
  169. update_time_stamp = self.item["update_time_stamp"]
  170. if self.platform == "gongzhonghao" or self.platform == "gongzhongxinhao":
  171. if (
  172. int(time.time()) - publish_time_stamp
  173. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1))
  174. ) and (
  175. int(time.time()) - update_time_stamp
  176. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1))
  177. ):
  178. message = "发布时间超过{}天".format(
  179. int(self.rule_dict.get("period", {}).get("max", 1))
  180. )
  181. print(message)
  182. return False
  183. else:
  184. if int(time.time()) - publish_time_stamp > 3600 * 24 * int(
  185. self.rule_dict.get("period", {}).get("max", 1000)
  186. ):
  187. message = "发布时间超过{}天".format(
  188. int(self.rule_dict.get("period", {}).get("max", 1000))
  189. )
  190. print(message)
  191. return False
  192. return True
  193. # 视频标题是否满足需求
  194. def title_flag(self):
  195. title = self.item["video_title"]
  196. cleaned_title = re.sub(r"[^\w]", " ", title)
  197. # 敏感词
  198. # 获取敏感词列表
  199. sensitive_words = []
  200. if any(word in cleaned_title for word in sensitive_words):
  201. message = "标题中包含敏感词"
  202. print(message)
  203. return False
  204. return True
  205. # 视频基础下载规则
  206. def download_rule_flag(self):
  207. for key in self.item:
  208. if self.rule_dict.get(key):
  209. max_value = (
  210. int(self.rule_dict[key]["max"])
  211. if int(self.rule_dict[key]["max"]) > 0
  212. else 999999999999999
  213. )
  214. if key == "peroid": # peroid是抓取周期天数
  215. continue
  216. else:
  217. flag = (
  218. int(self.rule_dict[key]["min"])
  219. <= int(self.item[key])
  220. <= max_value
  221. )
  222. if not flag:
  223. message = "{}: {} <= {} <= {}, {}".format(
  224. key,
  225. self.rule_dict[key]["min"],
  226. self.item[key],
  227. max_value,
  228. flag,
  229. )
  230. print(message)
  231. return flag
  232. else:
  233. continue
  234. return True
  235. # 按照某个具体平台来去重
  236. def repeat_video(self):
  237. # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """
  238. out_id = self.item["out_video_id"]
  239. sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
  240. repeat_video = MysqlHelper.get_values(
  241. log_type=self.mode, crawler=self.platform, env=self.env, sql=sql, action=""
  242. )
  243. if repeat_video:
  244. message = "重复的视频"
  245. print(message)
  246. return False
  247. return True
  248. def process_item(self):
  249. if not self.publish_time_flag():
  250. # 记录相关日志
  251. return False
  252. if not self.title_flag():
  253. # 记录相关日志
  254. return False
  255. if not self.repeat_video():
  256. # 记录相关日志
  257. return False
  258. if not self.download_rule_flag():
  259. # 记录相关日志
  260. return False
  261. return True