pipeline.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. import re
  2. import time
  3. from .aliyun_log import AliyunLogger
  4. from common.scheduling_db import MysqlHelper
  5. class PiaoQuanPipeline:
  6. def __init__(self, platform, mode, rule_dict, env, item, trace_id):
  7. self.platform = platform
  8. self.mode = mode
  9. self.item = item
  10. self.rule_dict = rule_dict
  11. self.env = env
  12. self.trace_id = trace_id
  13. # 视频的发布时间限制, 属于是规则过滤
  14. def publish_time_flag(self):
  15. # 判断发布时间
  16. publish_time_stamp = self.item["publish_time_stamp"]
  17. update_time_stamp = self.item["update_time_stamp"]
  18. if self.platform == "gongzhonghao":
  19. if (
  20. int(time.time()) - publish_time_stamp
  21. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  22. ) and (
  23. int(time.time()) - update_time_stamp
  24. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  25. ):
  26. AliyunLogger.logging(
  27. code="2004",
  28. trace_id=self.trace_id,
  29. platform=self.platform,
  30. mode=self.mode,
  31. env=self.env,
  32. data=self.item,
  33. message="发布时间超过{}天".format(
  34. int(self.rule_dict.get("period", {}).get("max", 1000))
  35. ),
  36. )
  37. return False
  38. else:
  39. if (
  40. int(time.time()) - publish_time_stamp
  41. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  42. ):
  43. AliyunLogger.logging(
  44. code="2004",
  45. trace_id=self.trace_id,
  46. platform=self.platform,
  47. mode=self.mode,
  48. env=self.env,
  49. data=self.item,
  50. message="发布时间超过{}天".format(
  51. int(self.rule_dict.get("period", {}).get("max", 1000))
  52. ),
  53. )
  54. return False
  55. return True
  56. # 视频标题是否满足需求
  57. def title_flag(self):
  58. title = self.item["video_title"]
  59. cleaned_title = re.sub(r"[^\w]", " ", title)
  60. # 敏感词
  61. # 获取敏感词列表
  62. sensitive_words = []
  63. if any(word in cleaned_title for word in sensitive_words):
  64. AliyunLogger.logging(
  65. code="2003",
  66. trace_id=self.trace_id,
  67. platform=self.platform,
  68. mode=self.mode,
  69. env=self.env,
  70. message="标题中包含敏感词",
  71. data=self.item,
  72. )
  73. return False
  74. return True
  75. # 视频基础下载规则
  76. def download_rule_flag(self):
  77. # 格式化 video_dict:publish_time_stamp
  78. # if self.item.get("publish_time_stamp"):
  79. # self.item["publish_time"] = self.item["publish_time_stamp"] * 1000
  80. # # 格式化 video_dict:period
  81. # if (
  82. # self.item.get("publish_time")
  83. # and self.item.get("period", "noperiod") == "noperiod"
  84. # ):
  85. # self.item["period"] = int(
  86. # (int(time.time() * 1000) - self.item["publish_time"])
  87. # / (3600 * 24 * 1000)
  88. # )
  89. # 格式化 rule_dict 最大值取值为 0 的问题
  90. for key in self.item:
  91. if self.rule_dict.get(key):
  92. max_value = (
  93. int(self.rule_dict[key]["max"])
  94. if int(self.rule_dict[key]["max"]) > 0
  95. else 999999999999999
  96. )
  97. if key == "peroid": # peroid是抓取周期天数
  98. continue
  99. # flag = 0 <= int(self.item[key]) <= max_value
  100. # if not flag:
  101. # AliyunLogger.logging(
  102. # code="2004",
  103. # trace_id=self.trace_id,
  104. # platform=self.platform,
  105. # mode=self.mode,
  106. # env=self.env,
  107. # data=self.item,
  108. # message="{}: 0 <= {} <= {}, {}".format(
  109. # key, self.item[key], max_value, flag
  110. # ),
  111. # )
  112. # return flag
  113. else:
  114. flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value
  115. if not flag:
  116. AliyunLogger.logging(
  117. code="2004",
  118. trace_id=self.trace_id,
  119. platform=self.platform,
  120. mode=self.mode,
  121. env=self.env,
  122. data=self.item,
  123. message="{}: {} <= {} <= {}, {}".format(
  124. key,
  125. self.rule_dict[key]["min"],
  126. self.item[key],
  127. max_value,
  128. flag,
  129. ),
  130. )
  131. return flag
  132. else:
  133. continue
  134. return True
  135. # 按照某个具体平台来去重
  136. def repeat_video(self):
  137. # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """
  138. out_id = self.item["out_video_id"]
  139. sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
  140. repeat_video = MysqlHelper.get_values(
  141. log_type=self.mode, crawler=self.platform, env=self.env, sql=sql, action=""
  142. )
  143. if repeat_video:
  144. AliyunLogger.logging(
  145. code="2002",
  146. trace_id=self.trace_id,
  147. platform=self.platform,
  148. mode=self.mode,
  149. env=self.env,
  150. message="重复的视频",
  151. data=self.item,
  152. )
  153. return False
  154. return True
  155. def process_item(self):
  156. if not self.publish_time_flag():
  157. # 记录相关日志
  158. return False
  159. if not self.title_flag():
  160. # 记录相关日志
  161. return False
  162. if not self.repeat_video():
  163. # 记录相关日志
  164. return False
  165. if not self.download_rule_flag():
  166. # 记录相关日志
  167. return False
  168. return True
  169. class PiaoQuanPipelineTest:
  170. def __init__(self, platform, mode, rule_dict, env, item, trace_id):
  171. self.platform = platform
  172. self.mode = mode
  173. self.item = item
  174. self.rule_dict = rule_dict
  175. self.env = env
  176. self.trace_id = trace_id
  177. # 视频的发布时间限制, 属于是规则过滤
  178. def publish_time_flag(self):
  179. # 判断发布时间
  180. publish_time_stamp = self.item["publish_time_stamp"]
  181. update_time_stamp = self.item["update_time_stamp"]
  182. if self.platform == "gongzhonghao":
  183. if (
  184. int(time.time()) - publish_time_stamp
  185. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  186. ) and (
  187. int(time.time()) - update_time_stamp
  188. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  189. ):
  190. message = "发布时间超过{}天".format(
  191. int(self.rule_dict.get("period", {}).get("max", 1000))
  192. )
  193. print(message)
  194. return False
  195. else:
  196. if (
  197. int(time.time()) - publish_time_stamp
  198. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  199. ):
  200. message = "发布时间超过{}天".format(
  201. int(self.rule_dict.get("period", {}).get("max", 1000))
  202. )
  203. print(message)
  204. return False
  205. return True
  206. # 视频标题是否满足需求
  207. def title_flag(self):
  208. title = self.item["video_title"]
  209. cleaned_title = re.sub(r"[^\w]", " ", title)
  210. # 敏感词
  211. # 获取敏感词列表
  212. sensitive_words = []
  213. if any(word in cleaned_title for word in sensitive_words):
  214. message = "标题中包含敏感词"
  215. print(message)
  216. return False
  217. return True
  218. # 视频基础下载规则
  219. def download_rule_flag(self):
  220. for key in self.item:
  221. if self.rule_dict.get(key):
  222. max_value = (
  223. int(self.rule_dict[key]["max"])
  224. if int(self.rule_dict[key]["max"]) > 0
  225. else 999999999999999
  226. )
  227. if key == "peroid": # peroid是抓取周期天数
  228. continue
  229. else:
  230. flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value
  231. if not flag:
  232. message = "{}: {} <= {} <= {}, {}".format(
  233. key,
  234. self.rule_dict[key]["min"],
  235. self.item[key],
  236. max_value,
  237. flag,
  238. )
  239. print(message)
  240. return flag
  241. else:
  242. continue
  243. return True
  244. # 按照某个具体平台来去重
  245. def repeat_video(self):
  246. # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """
  247. out_id = self.item["out_video_id"]
  248. sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
  249. repeat_video = MysqlHelper.get_values(
  250. log_type=self.mode, crawler=self.platform, env=self.env, sql=sql, action=""
  251. )
  252. if repeat_video:
  253. message = "重复的视频"
  254. return False
  255. return True
  256. def process_item(self):
  257. if not self.publish_time_flag():
  258. # 记录相关日志
  259. return False
  260. if not self.title_flag():
  261. # 记录相关日志
  262. return False
  263. if not self.repeat_video():
  264. # 记录相关日志
  265. return False
  266. if not self.download_rule_flag():
  267. # 记录相关日志
  268. return False
  269. return True