pipeline.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. import re
  2. import time
  3. from .aliyun_log import AliyunLogger
  4. from common.scheduling_db import MysqlHelper
  5. class PiaoQuanPipeline:
  6. """
  7. pipeline code
  8. """
  9. def __init__(self, platform, mode, rule_dict, env, item, trace_id):
  10. self.platform = platform
  11. self.mode = mode
  12. self.item = item
  13. self.rule_dict = rule_dict
  14. self.env = env
  15. self.trace_id = trace_id
  16. def publish_time_flag(self):
  17. """
  18. 发布时间判断
  19. """
  20. publish_time_stamp = self.item["publish_time_stamp"]
  21. update_time_stamp = self.item["update_time_stamp"]
  22. max_d = self.rule_dict.get("period", {}).get("max", 1000)
  23. min_d = self.rule_dict.get("period", {}).get("min", 1000)
  24. days = max_d if max_d > min_d else min_d
  25. if self.platform == "gongzhonghao" or self.platform == "gongzhongxinhao":
  26. if (int(time.time()) - publish_time_stamp > 3600 * 24 * days) and (
  27. int(time.time()) - update_time_stamp > 3600 * 24 * days
  28. ):
  29. AliyunLogger.logging(
  30. code="2004",
  31. trace_id=self.trace_id,
  32. platform=self.platform,
  33. mode=self.mode,
  34. env=self.env,
  35. data=self.item,
  36. message="发布时间超过{}天".format(days),
  37. )
  38. return False
  39. else:
  40. if int(time.time()) - publish_time_stamp > 3600 * 24 * days:
  41. AliyunLogger.logging(
  42. code="2004",
  43. trace_id=self.trace_id,
  44. platform=self.platform,
  45. mode=self.mode,
  46. env=self.env,
  47. data=self.item,
  48. message="发布时间超过{}天".format(days),
  49. )
  50. return False
  51. return True
  52. def title_flag(self):
  53. """
  54. 视频标题是否满足需求
  55. """
  56. title = self.item["video_title"]
  57. cleaned_title = re.sub(r"[^\w]", " ", title)
  58. # 敏感词
  59. # 获取敏感词列表
  60. sensitive_words = []
  61. if any(word in cleaned_title for word in sensitive_words):
  62. AliyunLogger.logging(
  63. code="2003",
  64. trace_id=self.trace_id,
  65. platform=self.platform,
  66. mode=self.mode,
  67. env=self.env,
  68. message="标题中包含敏感词",
  69. data=self.item,
  70. )
  71. return False
  72. return True
  73. def download_rule_flag(self):
  74. """
  75. 视频基础下载规则
  76. """
  77. for key in self.item:
  78. if self.rule_dict.get(key):
  79. max_value = (
  80. int(self.rule_dict[key]["max"])
  81. if int(self.rule_dict[key]["max"]) > 0
  82. else 999999999999999
  83. )
  84. if key == "peroid": # peroid是抓取周期天数
  85. continue
  86. else:
  87. flag = (
  88. int(self.rule_dict[key]["min"])
  89. <= int(self.item[key])
  90. <= max_value
  91. )
  92. if not flag:
  93. AliyunLogger.logging(
  94. code="2004",
  95. trace_id=self.trace_id,
  96. platform=self.platform,
  97. mode=self.mode,
  98. env=self.env,
  99. data=self.item,
  100. message="{}: {} <= {} <= {}, {}".format(
  101. key,
  102. self.rule_dict[key]["min"],
  103. self.item[key],
  104. max_value,
  105. flag,
  106. ),
  107. )
  108. return flag
  109. else:
  110. continue
  111. return True
  112. def repeat_video(self):
  113. """
  114. 按照某个具体平台来去重
  115. """
  116. # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """
  117. if self.platform == "jingdianfuqiwang" and self.mode == "recommend":
  118. return True
  119. if self.platform == "zhujinshanjinmei" and self.mode == "recommend":
  120. return True
  121. out_id = self.item["out_video_id"]
  122. sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
  123. repeat_video = MysqlHelper.get_values(
  124. log_type=self.mode, crawler=self.platform, env=self.env, sql=sql, action=""
  125. )
  126. if repeat_video:
  127. AliyunLogger.logging(
  128. code="2002",
  129. trace_id=self.trace_id,
  130. platform=self.platform,
  131. mode=self.mode,
  132. env=self.env,
  133. message="重复的视频",
  134. data=self.item,
  135. )
  136. return False
  137. return True
  138. def process_item(self):
  139. """
  140. 判断是否符合规则
  141. """
  142. if not self.publish_time_flag():
  143. # 记录相关日志
  144. return False
  145. if not self.title_flag():
  146. # 记录相关日志
  147. return False
  148. if not self.repeat_video():
  149. # 记录相关日志
  150. return False
  151. if not self.download_rule_flag():
  152. # 记录相关日志
  153. return False
  154. return True
  155. class PiaoQuanPipelineTest:
  156. def __init__(self, platform, mode, rule_dict, env, item, trace_id):
  157. self.platform = platform
  158. self.mode = mode
  159. self.item = item
  160. self.rule_dict = rule_dict
  161. self.env = env
  162. self.trace_id = trace_id
  163. # 视频的发布时间限制, 属于是规则过滤
  164. def publish_time_flag(self):
  165. # 判断发布时间
  166. publish_time_stamp = self.item["publish_time_stamp"]
  167. update_time_stamp = self.item["update_time_stamp"]
  168. if self.platform == "gongzhonghao" or self.platform == "gongzhongxinhao":
  169. if (
  170. int(time.time()) - publish_time_stamp
  171. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1))
  172. ) and (
  173. int(time.time()) - update_time_stamp
  174. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1))
  175. ):
  176. message = "发布时间超过{}天".format(
  177. int(self.rule_dict.get("period", {}).get("max", 1))
  178. )
  179. print(message)
  180. return False
  181. else:
  182. if int(time.time()) - publish_time_stamp > 3600 * 24 * int(
  183. self.rule_dict.get("period", {}).get("max", 1000)
  184. ):
  185. message = "发布时间超过{}天".format(
  186. int(self.rule_dict.get("period", {}).get("max", 1000))
  187. )
  188. print(message)
  189. return False
  190. return True
  191. # 视频标题是否满足需求
  192. def title_flag(self):
  193. title = self.item["video_title"]
  194. cleaned_title = re.sub(r"[^\w]", " ", title)
  195. # 敏感词
  196. # 获取敏感词列表
  197. sensitive_words = []
  198. if any(word in cleaned_title for word in sensitive_words):
  199. message = "标题中包含敏感词"
  200. print(message)
  201. return False
  202. return True
  203. # 视频基础下载规则
  204. def download_rule_flag(self):
  205. for key in self.item:
  206. if self.rule_dict.get(key):
  207. max_value = (
  208. int(self.rule_dict[key]["max"])
  209. if int(self.rule_dict[key]["max"]) > 0
  210. else 999999999999999
  211. )
  212. if key == "peroid": # peroid是抓取周期天数
  213. continue
  214. else:
  215. flag = (
  216. int(self.rule_dict[key]["min"])
  217. <= int(self.item[key])
  218. <= max_value
  219. )
  220. if not flag:
  221. message = "{}: {} <= {} <= {}, {}".format(
  222. key,
  223. self.rule_dict[key]["min"],
  224. self.item[key],
  225. max_value,
  226. flag,
  227. )
  228. print(message)
  229. return flag
  230. else:
  231. continue
  232. return True
  233. # 按照某个具体平台来去重
  234. def repeat_video(self):
  235. # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """
  236. out_id = self.item["out_video_id"]
  237. sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
  238. repeat_video = MysqlHelper.get_values(
  239. log_type=self.mode, crawler=self.platform, env=self.env, sql=sql, action=""
  240. )
  241. if repeat_video:
  242. message = "重复的视频"
  243. print(message)
  244. return False
  245. return True
  246. def process_item(self):
  247. if not self.publish_time_flag():
  248. # 记录相关日志
  249. return False
  250. if not self.title_flag():
  251. # 记录相关日志
  252. return False
  253. if not self.repeat_video():
  254. # 记录相关日志
  255. return False
  256. if not self.download_rule_flag():
  257. # 记录相关日志
  258. return False
  259. return True