pipeline.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. import re
  2. import time
  3. from .aliyun_log import AliyunLogger
  4. from common.scheduling_db import MysqlHelper
  5. class PiaoQuanPipeline:
  6. def __init__(self, platform, mode, rule_dict, env, item, trace_id):
  7. self.platform = platform
  8. self.mode = mode
  9. self.item = item
  10. self.rule_dict = rule_dict
  11. self.env = env
  12. self.trace_id = trace_id
  13. # 视频的发布时间限制, 属于是规则过滤
  14. def publish_time_flag(self):
  15. # 判断发布时间
  16. publish_time_stamp = self.item["publish_time_stamp"]
  17. update_time_stamp = self.item["update_time_stamp"]
  18. max_d = self.rule_dict.get("period", {}).get("max", 1000)
  19. min_d = self.rule_dict.get("period", {}).get("min", 1000)
  20. days = max_d if max_d > min_d else min_d
  21. if self.platform == "gongzhonghao" or self.platform == "gongzhongxinhao":
  22. if (
  23. int(time.time()) - publish_time_stamp
  24. > 3600 * 24 * days
  25. ) and (
  26. int(time.time()) - update_time_stamp
  27. > 3600 * 24 * days
  28. ):
  29. AliyunLogger.logging(
  30. code="2004",
  31. trace_id=self.trace_id,
  32. platform=self.platform,
  33. mode=self.mode,
  34. env=self.env,
  35. data=self.item,
  36. message="发布时间超过{}天".format(days),
  37. )
  38. return False
  39. else:
  40. if (
  41. int(time.time()) - publish_time_stamp
  42. > 3600 * 24 * days
  43. ):
  44. AliyunLogger.logging(
  45. code="2004",
  46. trace_id=self.trace_id,
  47. platform=self.platform,
  48. mode=self.mode,
  49. env=self.env,
  50. data=self.item,
  51. message="发布时间超过{}天".format(days),
  52. )
  53. return False
  54. return True
  55. # 视频标题是否满足需求
  56. def title_flag(self):
  57. title = self.item["video_title"]
  58. cleaned_title = re.sub(r"[^\w]", " ", title)
  59. # 敏感词
  60. # 获取敏感词列表
  61. sensitive_words = []
  62. if any(word in cleaned_title for word in sensitive_words):
  63. AliyunLogger.logging(
  64. code="2003",
  65. trace_id=self.trace_id,
  66. platform=self.platform,
  67. mode=self.mode,
  68. env=self.env,
  69. message="标题中包含敏感词",
  70. data=self.item,
  71. )
  72. return False
  73. return True
  74. # 视频基础下载规则
  75. def download_rule_flag(self):
  76. for key in self.item:
  77. if self.rule_dict.get(key):
  78. max_value = (
  79. int(self.rule_dict[key]["max"])
  80. if int(self.rule_dict[key]["max"]) > 0
  81. else 999999999999999
  82. )
  83. if key == "peroid": # peroid是抓取周期天数
  84. continue
  85. else:
  86. flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value
  87. if not flag:
  88. AliyunLogger.logging(
  89. code="2004",
  90. trace_id=self.trace_id,
  91. platform=self.platform,
  92. mode=self.mode,
  93. env=self.env,
  94. data=self.item,
  95. message="{}: {} <= {} <= {}, {}".format(
  96. key,
  97. self.rule_dict[key]["min"],
  98. self.item[key],
  99. max_value,
  100. flag,
  101. ),
  102. )
  103. return flag
  104. else:
  105. continue
  106. return True
  107. # 按照某个具体平台来去重
  108. def repeat_video(self):
  109. # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """
  110. if self.platform == "jingdianfuqiwang" and self.mode == "recommend":
  111. return True
  112. out_id = self.item["out_video_id"]
  113. sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
  114. repeat_video = MysqlHelper.get_values(
  115. log_type=self.mode, crawler=self.platform, env=self.env, sql=sql, action=""
  116. )
  117. if repeat_video:
  118. AliyunLogger.logging(
  119. code="2002",
  120. trace_id=self.trace_id,
  121. platform=self.platform,
  122. mode=self.mode,
  123. env=self.env,
  124. message="重复的视频",
  125. data=self.item,
  126. )
  127. return False
  128. return True
  129. def process_item(self):
  130. if not self.publish_time_flag():
  131. # 记录相关日志
  132. return False
  133. if not self.title_flag():
  134. # 记录相关日志
  135. return False
  136. if not self.repeat_video():
  137. # 记录相关日志
  138. return False
  139. if not self.download_rule_flag():
  140. # 记录相关日志
  141. return False
  142. return True
  143. class PiaoQuanPipelineTest:
  144. def __init__(self, platform, mode, rule_dict, env, item, trace_id):
  145. self.platform = platform
  146. self.mode = mode
  147. self.item = item
  148. self.rule_dict = rule_dict
  149. self.env = env
  150. self.trace_id = trace_id
  151. # 视频的发布时间限制, 属于是规则过滤
  152. def publish_time_flag(self):
  153. # 判断发布时间
  154. publish_time_stamp = self.item["publish_time_stamp"]
  155. update_time_stamp = self.item["update_time_stamp"]
  156. if self.platform == "gongzhonghao" or self.platform == "gongzhongxinhao":
  157. if (
  158. int(time.time()) - publish_time_stamp
  159. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1))
  160. ) and (
  161. int(time.time()) - update_time_stamp
  162. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1))
  163. ):
  164. message = "发布时间超过{}天".format(
  165. int(self.rule_dict.get("period", {}).get("max", 1))
  166. )
  167. print(message)
  168. return False
  169. else:
  170. if (
  171. int(time.time()) - publish_time_stamp
  172. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  173. ):
  174. message = "发布时间超过{}天".format(
  175. int(self.rule_dict.get("period", {}).get("max", 1000))
  176. )
  177. print(message)
  178. return False
  179. return True
  180. # 视频标题是否满足需求
  181. def title_flag(self):
  182. title = self.item["video_title"]
  183. cleaned_title = re.sub(r"[^\w]", " ", title)
  184. # 敏感词
  185. # 获取敏感词列表
  186. sensitive_words = []
  187. if any(word in cleaned_title for word in sensitive_words):
  188. message = "标题中包含敏感词"
  189. print(message)
  190. return False
  191. return True
  192. # 视频基础下载规则
  193. def download_rule_flag(self):
  194. for key in self.item:
  195. if self.rule_dict.get(key):
  196. max_value = (
  197. int(self.rule_dict[key]["max"])
  198. if int(self.rule_dict[key]["max"]) > 0
  199. else 999999999999999
  200. )
  201. if key == "peroid": # peroid是抓取周期天数
  202. continue
  203. else:
  204. flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value
  205. if not flag:
  206. message = "{}: {} <= {} <= {}, {}".format(
  207. key,
  208. self.rule_dict[key]["min"],
  209. self.item[key],
  210. max_value,
  211. flag,
  212. )
  213. print(message)
  214. return flag
  215. else:
  216. continue
  217. return True
  218. # 按照某个具体平台来去重
  219. def repeat_video(self):
  220. # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """
  221. out_id = self.item["out_video_id"]
  222. sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
  223. repeat_video = MysqlHelper.get_values(
  224. log_type=self.mode, crawler=self.platform, env=self.env, sql=sql, action=""
  225. )
  226. if repeat_video:
  227. message = "重复的视频"
  228. print(message)
  229. return False
  230. return True
  231. def process_item(self):
  232. if not self.publish_time_flag():
  233. # 记录相关日志
  234. return False
  235. if not self.title_flag():
  236. # 记录相关日志
  237. return False
  238. if not self.repeat_video():
  239. # 记录相关日志
  240. return False
  241. if not self.download_rule_flag():
  242. # 记录相关日志
  243. return False
  244. return True