pipeline.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. import re
  2. import time
  3. from .aliyun_log import AliyunLogger
  4. from common.scheduling_db import MysqlHelper
  5. class PiaoQuanPipeline:
  6. def __init__(self, platform, mode, rule_dict, env, item, trace_id):
  7. self.platform = platform
  8. self.mode = mode
  9. self.item = item
  10. self.rule_dict = rule_dict
  11. self.env = env
  12. self.trace_id = trace_id
  13. # 视频的发布时间限制, 属于是规则过滤
  14. def publish_time_flag(self):
  15. # 判断发布时间
  16. publish_time_stamp = self.item["publish_time_stamp"]
  17. update_time_stamp = self.item["update_time_stamp"]
  18. if self.platform == "gongzhonghao":
  19. if (
  20. int(time.time()) - publish_time_stamp
  21. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  22. ) and (
  23. int(time.time()) - update_time_stamp
  24. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  25. ):
  26. AliyunLogger.logging(
  27. code="2004",
  28. trace_id=self.trace_id,
  29. platform=self.platform,
  30. mode=self.mode,
  31. env=self.env,
  32. data=self.item,
  33. message="发布时间超过{}天".format(
  34. int(self.rule_dict.get("period", {}).get("max", 1000))
  35. ),
  36. )
  37. return False
  38. else:
  39. if (
  40. int(time.time()) - publish_time_stamp
  41. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  42. ):
  43. AliyunLogger.logging(
  44. code="2004",
  45. trace_id=self.trace_id,
  46. platform=self.platform,
  47. mode=self.mode,
  48. env=self.env,
  49. data=self.item,
  50. message="发布时间超过{}天".format(
  51. int(self.rule_dict.get("period", {}).get("max", 1000))
  52. ),
  53. )
  54. return False
  55. return True
  56. # 视频标题是否满足需求
  57. def title_flag(self):
  58. title = self.item["video_title"]
  59. cleaned_title = re.sub(r"[^\w]", " ", title)
  60. # 敏感词
  61. # 获取敏感词列表
  62. sensitive_words = []
  63. if any(word in cleaned_title for word in sensitive_words):
  64. AliyunLogger.logging(
  65. code="2003",
  66. trace_id=self.trace_id,
  67. platform=self.platform,
  68. mode=self.mode,
  69. env=self.env,
  70. message="标题中包含敏感词",
  71. data=self.item,
  72. )
  73. return False
  74. return True
  75. # 视频基础下载规则
  76. def download_rule_flag(self):
  77. for key in self.item:
  78. if self.rule_dict.get(key):
  79. max_value = (
  80. int(self.rule_dict[key]["max"])
  81. if int(self.rule_dict[key]["max"]) > 0
  82. else 999999999999999
  83. )
  84. if key == "peroid": # peroid是抓取周期天数
  85. continue
  86. else:
  87. flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value
  88. if not flag:
  89. AliyunLogger.logging(
  90. code="2004",
  91. trace_id=self.trace_id,
  92. platform=self.platform,
  93. mode=self.mode,
  94. env=self.env,
  95. data=self.item,
  96. message="{}: {} <= {} <= {}, {}".format(
  97. key,
  98. self.rule_dict[key]["min"],
  99. self.item[key],
  100. max_value,
  101. flag,
  102. ),
  103. )
  104. return flag
  105. else:
  106. continue
  107. return True
  108. # 按照某个具体平台来去重
  109. def repeat_video(self):
  110. # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """
  111. out_id = self.item["out_video_id"]
  112. sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
  113. repeat_video = MysqlHelper.get_values(
  114. log_type=self.mode, crawler=self.platform, env=self.env, sql=sql, action=""
  115. )
  116. if repeat_video:
  117. AliyunLogger.logging(
  118. code="2002",
  119. trace_id=self.trace_id,
  120. platform=self.platform,
  121. mode=self.mode,
  122. env=self.env,
  123. message="重复的视频",
  124. data=self.item,
  125. )
  126. return False
  127. return True
  128. def process_item(self):
  129. if not self.publish_time_flag():
  130. # 记录相关日志
  131. return False
  132. if not self.title_flag():
  133. # 记录相关日志
  134. return False
  135. if not self.repeat_video():
  136. # 记录相关日志
  137. return False
  138. if not self.download_rule_flag():
  139. # 记录相关日志
  140. return False
  141. return True
  142. class PiaoQuanPipelineTest:
  143. def __init__(self, platform, mode, rule_dict, env, item, trace_id):
  144. self.platform = platform
  145. self.mode = mode
  146. self.item = item
  147. self.rule_dict = rule_dict
  148. self.env = env
  149. self.trace_id = trace_id
  150. # 视频的发布时间限制, 属于是规则过滤
  151. def publish_time_flag(self):
  152. # 判断发布时间
  153. publish_time_stamp = self.item["publish_time_stamp"]
  154. update_time_stamp = self.item["update_time_stamp"]
  155. if self.platform == "gongzhonghao":
  156. if (
  157. int(time.time()) - publish_time_stamp
  158. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  159. ) and (
  160. int(time.time()) - update_time_stamp
  161. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  162. ):
  163. message = "发布时间超过{}天".format(
  164. int(self.rule_dict.get("period", {}).get("max", 1000))
  165. )
  166. print(message)
  167. return False
  168. else:
  169. if (
  170. int(time.time()) - publish_time_stamp
  171. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  172. ):
  173. message = "发布时间超过{}天".format(
  174. int(self.rule_dict.get("period", {}).get("max", 1000))
  175. )
  176. print(message)
  177. return False
  178. return True
  179. # 视频标题是否满足需求
  180. def title_flag(self):
  181. title = self.item["video_title"]
  182. cleaned_title = re.sub(r"[^\w]", " ", title)
  183. # 敏感词
  184. # 获取敏感词列表
  185. sensitive_words = []
  186. if any(word in cleaned_title for word in sensitive_words):
  187. message = "标题中包含敏感词"
  188. print(message)
  189. return False
  190. return True
  191. # 视频基础下载规则
  192. def download_rule_flag(self):
  193. for key in self.item:
  194. if self.rule_dict.get(key):
  195. max_value = (
  196. int(self.rule_dict[key]["max"])
  197. if int(self.rule_dict[key]["max"]) > 0
  198. else 999999999999999
  199. )
  200. if key == "peroid": # peroid是抓取周期天数
  201. continue
  202. else:
  203. flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value
  204. if not flag:
  205. message = "{}: {} <= {} <= {}, {}".format(
  206. key,
  207. self.rule_dict[key]["min"],
  208. self.item[key],
  209. max_value,
  210. flag,
  211. )
  212. print(message)
  213. return flag
  214. else:
  215. continue
  216. return True
  217. # 按照某个具体平台来去重
  218. def repeat_video(self):
  219. # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """
  220. out_id = self.item["out_video_id"]
  221. sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
  222. repeat_video = MysqlHelper.get_values(
  223. log_type=self.mode, crawler=self.platform, env=self.env, sql=sql, action=""
  224. )
  225. if repeat_video:
  226. message = "重复的视频"
  227. return False
  228. return True
  229. def process_item(self):
  230. if not self.publish_time_flag():
  231. # 记录相关日志
  232. return False
  233. if not self.title_flag():
  234. # 记录相关日志
  235. return False
  236. if not self.repeat_video():
  237. # 记录相关日志
  238. return False
  239. if not self.download_rule_flag():
  240. # 记录相关日志
  241. return False
  242. return True