pipeline.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. import re
  2. import time
  3. from .aliyun_log import AliyunLogger
  4. from common.scheduling_db import MysqlHelper
  5. class PiaoQuanPipeline:
  6. def __init__(self, platform, mode, rule_dict, env, item, trace_id):
  7. self.platform = platform
  8. self.mode = mode
  9. self.item = item
  10. self.rule_dict = rule_dict
  11. self.env = env
  12. self.trace_id = trace_id
  13. # 视频的发布时间限制, 属于是规则过滤
  14. def publish_time_flag(self):
  15. # 判断发布时间
  16. publish_time_stamp = self.item["publish_time_stamp"]
  17. update_time_stamp = self.item["update_time_stamp"]
  18. if self.platform == "gongzhonghao":
  19. if (
  20. int(time.time()) - publish_time_stamp
  21. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  22. ) and (
  23. int(time.time()) - update_time_stamp
  24. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  25. ):
  26. AliyunLogger.logging(
  27. code="2004",
  28. trace_id=self.trace_id,
  29. platform=self.platform,
  30. mode=self.mode,
  31. env=self.env,
  32. message="发布时间超过{}天".format(
  33. int(self.rule_dict.get("period", {}).get("max", 1000))
  34. ),
  35. )
  36. return False
  37. else:
  38. if (
  39. int(time.time()) - publish_time_stamp
  40. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  41. ):
  42. AliyunLogger.logging(
  43. code="2004",
  44. trace_id=self.trace_id,
  45. platform=self.platform,
  46. mode=self.mode,
  47. env=self.env,
  48. message="发布时间超过{}天".format(
  49. int(self.rule_dict.get("period", {}).get("max", 1000))
  50. ),
  51. )
  52. return False
  53. return True
  54. # 视频标题是否满足需求
  55. def title_flag(self):
  56. title = self.item["video_title"]
  57. cleaned_title = re.sub(r"[^\w]", " ", title)
  58. # 敏感词
  59. # 获取敏感词列表
  60. sensitive_words = []
  61. if any(word in cleaned_title for word in sensitive_words):
  62. AliyunLogger.logging(
  63. code="2003",
  64. trace_id=self.trace_id,
  65. platform=self.platform,
  66. mode=self.mode,
  67. env=self.env,
  68. message="标题中包含敏感词",
  69. data=self.item,
  70. )
  71. return False
  72. return True
  73. # 视频基础下载规则
  74. def download_rule_flag(self):
  75. # 格式化 video_dict:publish_time_stamp
  76. # if self.item.get("publish_time_stamp"):
  77. # self.item["publish_time"] = self.item["publish_time_stamp"] * 1000
  78. # # 格式化 video_dict:period
  79. # if (
  80. # self.item.get("publish_time")
  81. # and self.item.get("period", "noperiod") == "noperiod"
  82. # ):
  83. # self.item["period"] = int(
  84. # (int(time.time() * 1000) - self.item["publish_time"])
  85. # / (3600 * 24 * 1000)
  86. # )
  87. # 格式化 rule_dict 最大值取值为 0 的问题
  88. for key in self.item:
  89. if self.rule_dict.get(key):
  90. max_value = (
  91. int(self.rule_dict[key]["max"])
  92. if int(self.rule_dict[key]["max"]) > 0
  93. else 999999999999999
  94. )
  95. if key == "peroid": # peroid是抓取周期天数
  96. continue
  97. # flag = 0 <= int(self.item[key]) <= max_value
  98. # if not flag:
  99. # AliyunLogger.logging(
  100. # code="2004",
  101. # trace_id=self.trace_id,
  102. # platform=self.platform,
  103. # mode=self.mode,
  104. # env=self.env,
  105. # data=self.item,
  106. # message="{}: 0 <= {} <= {}, {}".format(
  107. # key, self.item[key], max_value, flag
  108. # ),
  109. # )
  110. # return flag
  111. else:
  112. flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value
  113. if not flag:
  114. AliyunLogger.logging(
  115. code="2004",
  116. trace_id=self.trace_id,
  117. platform=self.platform,
  118. mode=self.mode,
  119. env=self.env,
  120. data=self.item,
  121. message="{}: {} <= {} <= {}, {}".format(
  122. key,
  123. self.rule_dict[key]["min"],
  124. self.item[key],
  125. max_value,
  126. flag,
  127. ),
  128. )
  129. return flag
  130. else:
  131. continue
  132. return True
  133. # 按照某个具体平台来去重
  134. def repeat_video(self):
  135. # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """
  136. out_id = self.item["out_video_id"]
  137. sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
  138. repeat_video = MysqlHelper.get_values(
  139. log_type=self.mode, crawler=self.platform, env=self.env, sql=sql, action=""
  140. )
  141. if repeat_video:
  142. AliyunLogger.logging(
  143. code="2002",
  144. trace_id=self.trace_id,
  145. platform=self.platform,
  146. mode=self.mode,
  147. env=self.env,
  148. message="重复的视频",
  149. data=self.item,
  150. )
  151. return False
  152. return True
  153. def process_item(self):
  154. if not self.publish_time_flag():
  155. # 记录相关日志
  156. return False
  157. if not self.title_flag():
  158. # 记录相关日志
  159. return False
  160. if not self.repeat_video():
  161. # 记录相关日志
  162. return False
  163. if not self.download_rule_flag():
  164. # 记录相关日志
  165. return False
  166. return True