pipeline.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. import re
  2. import time
  3. from .aliyun_log import AliyunLogger
  4. from common.scheduling_db import MysqlHelper
  5. class PiaoQuanPipeline:
  6. """
  7. pipeline code
  8. """
  9. def __init__(self, platform, mode, rule_dict, env, item, trace_id):
  10. self.platform = platform
  11. self.mode = mode
  12. self.item = item
  13. self.rule_dict = rule_dict
  14. self.env = env
  15. self.trace_id = trace_id
  16. def publish_time_flag(self):
  17. """
  18. 发布时间判断
  19. """
  20. publish_time_stamp = self.item["publish_time_stamp"]
  21. update_time_stamp = self.item["update_time_stamp"]
  22. max_d = self.rule_dict.get("period", {}).get("max", 1000)
  23. min_d = self.rule_dict.get("period", {}).get("min", 1000)
  24. days = max_d if max_d > min_d else min_d
  25. if self.platform == "gongzhonghao" or self.platform == "gongzhongxinhao":
  26. if (int(time.time()) - publish_time_stamp > 3600 * 24 * days) and (
  27. int(time.time()) - update_time_stamp > 3600 * 24 * days
  28. ):
  29. AliyunLogger.logging(
  30. code="2004",
  31. trace_id=self.trace_id,
  32. platform=self.platform,
  33. mode=self.mode,
  34. env=self.env,
  35. data=self.item,
  36. message="发布时间超过{}天".format(days),
  37. )
  38. return False
  39. else:
  40. if int(time.time()) - publish_time_stamp > 3600 * 24 * days:
  41. AliyunLogger.logging(
  42. code="2004",
  43. trace_id=self.trace_id,
  44. platform=self.platform,
  45. mode=self.mode,
  46. env=self.env,
  47. data=self.item,
  48. message="发布时间超过{}天".format(days),
  49. )
  50. return False
  51. return True
  52. def title_flag(self):
  53. """
  54. 视频标题是否满足需求
  55. """
  56. title = self.item["video_title"]
  57. cleaned_title = re.sub(r"[^\w]", " ", title)
  58. # 敏感词
  59. # 获取敏感词列表
  60. sensitive_words = []
  61. if any(word in cleaned_title for word in sensitive_words):
  62. AliyunLogger.logging(
  63. code="2003",
  64. trace_id=self.trace_id,
  65. platform=self.platform,
  66. mode=self.mode,
  67. env=self.env,
  68. message="标题中包含敏感词",
  69. data=self.item,
  70. )
  71. return False
  72. return True
  73. def download_rule_flag(self):
  74. """
  75. 视频基础下载规则
  76. """
  77. for key in self.item:
  78. if self.rule_dict.get(key):
  79. max_value = (
  80. int(self.rule_dict[key]["max"])
  81. if int(self.rule_dict[key]["max"]) > 0
  82. else 999999999999999
  83. )
  84. if key == "peroid": # peroid是抓取周期天数
  85. continue
  86. else:
  87. flag = (
  88. int(self.rule_dict[key]["min"])
  89. <= int(self.item[key])
  90. <= max_value
  91. )
  92. if not flag:
  93. AliyunLogger.logging(
  94. code="2004",
  95. trace_id=self.trace_id,
  96. platform=self.platform,
  97. mode=self.mode,
  98. env=self.env,
  99. data=self.item,
  100. message="{}: {} <= {} <= {}, {}".format(
  101. key,
  102. self.rule_dict[key]["min"],
  103. self.item[key],
  104. max_value,
  105. flag,
  106. ),
  107. )
  108. return flag
  109. else:
  110. continue
  111. return True
  112. def repeat_video(self):
  113. """
  114. 按照某个具体平台来去重
  115. """
  116. # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """
  117. if self.platform == "jingdianfuqiwang" and self.mode == "recommend":
  118. return True
  119. out_id = self.item["out_video_id"]
  120. sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
  121. repeat_video = MysqlHelper.get_values(
  122. log_type=self.mode, crawler=self.platform, env=self.env, sql=sql, action=""
  123. )
  124. if repeat_video:
  125. AliyunLogger.logging(
  126. code="2002",
  127. trace_id=self.trace_id,
  128. platform=self.platform,
  129. mode=self.mode,
  130. env=self.env,
  131. message="重复的视频",
  132. data=self.item,
  133. )
  134. return False
  135. return True
  136. def process_item(self):
  137. """
  138. 判断是否符合规则
  139. """
  140. if not self.publish_time_flag():
  141. # 记录相关日志
  142. return False
  143. if not self.title_flag():
  144. # 记录相关日志
  145. return False
  146. if not self.repeat_video():
  147. # 记录相关日志
  148. return False
  149. if not self.download_rule_flag():
  150. # 记录相关日志
  151. return False
  152. return True
  153. class PiaoQuanPipelineTest:
  154. def __init__(self, platform, mode, rule_dict, env, item, trace_id):
  155. self.platform = platform
  156. self.mode = mode
  157. self.item = item
  158. self.rule_dict = rule_dict
  159. self.env = env
  160. self.trace_id = trace_id
  161. # 视频的发布时间限制, 属于是规则过滤
  162. def publish_time_flag(self):
  163. # 判断发布时间
  164. publish_time_stamp = self.item["publish_time_stamp"]
  165. update_time_stamp = self.item["update_time_stamp"]
  166. if self.platform == "gongzhonghao" or self.platform == "gongzhongxinhao":
  167. if (
  168. int(time.time()) - publish_time_stamp
  169. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1))
  170. ) and (
  171. int(time.time()) - update_time_stamp
  172. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1))
  173. ):
  174. message = "发布时间超过{}天".format(
  175. int(self.rule_dict.get("period", {}).get("max", 1))
  176. )
  177. print(message)
  178. return False
  179. else:
  180. if int(time.time()) - publish_time_stamp > 3600 * 24 * int(
  181. self.rule_dict.get("period", {}).get("max", 1000)
  182. ):
  183. message = "发布时间超过{}天".format(
  184. int(self.rule_dict.get("period", {}).get("max", 1000))
  185. )
  186. print(message)
  187. return False
  188. return True
  189. # 视频标题是否满足需求
  190. def title_flag(self):
  191. title = self.item["video_title"]
  192. cleaned_title = re.sub(r"[^\w]", " ", title)
  193. # 敏感词
  194. # 获取敏感词列表
  195. sensitive_words = []
  196. if any(word in cleaned_title for word in sensitive_words):
  197. message = "标题中包含敏感词"
  198. print(message)
  199. return False
  200. return True
  201. # 视频基础下载规则
  202. def download_rule_flag(self):
  203. for key in self.item:
  204. if self.rule_dict.get(key):
  205. max_value = (
  206. int(self.rule_dict[key]["max"])
  207. if int(self.rule_dict[key]["max"]) > 0
  208. else 999999999999999
  209. )
  210. if key == "peroid": # peroid是抓取周期天数
  211. continue
  212. else:
  213. flag = (
  214. int(self.rule_dict[key]["min"])
  215. <= int(self.item[key])
  216. <= max_value
  217. )
  218. if not flag:
  219. message = "{}: {} <= {} <= {}, {}".format(
  220. key,
  221. self.rule_dict[key]["min"],
  222. self.item[key],
  223. max_value,
  224. flag,
  225. )
  226. print(message)
  227. return flag
  228. else:
  229. continue
  230. return True
  231. # 按照某个具体平台来去重
  232. def repeat_video(self):
  233. # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """
  234. out_id = self.item["out_video_id"]
  235. sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
  236. repeat_video = MysqlHelper.get_values(
  237. log_type=self.mode, crawler=self.platform, env=self.env, sql=sql, action=""
  238. )
  239. if repeat_video:
  240. message = "重复的视频"
  241. print(message)
  242. return False
  243. return True
  244. def process_item(self):
  245. if not self.publish_time_flag():
  246. # 记录相关日志
  247. return False
  248. if not self.title_flag():
  249. # 记录相关日志
  250. return False
  251. if not self.repeat_video():
  252. # 记录相关日志
  253. return False
  254. if not self.download_rule_flag():
  255. # 记录相关日志
  256. return False
  257. return True