pipeline.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. import re
  2. import time
  3. from .aliyun_log import AliyunLogger
  4. from common.scheduling_db import MysqlHelper
  5. class PiaoQuanPipeline:
  6. def __init__(self, platform, mode, rule_dict, env, item, trace_id):
  7. self.platform = platform
  8. self.mode = mode
  9. self.item = item
  10. self.rule_dict = rule_dict
  11. self.env = env
  12. self.trace_id = trace_id
  13. # 视频的发布时间限制, 属于是规则过滤
  14. def publish_time_flag(self):
  15. # 判断发布时间
  16. publish_time_stamp = self.item["publish_time_stamp"]
  17. update_time_stamp = self.item["update_time_stamp"]
  18. if (
  19. int(time.time()) - publish_time_stamp
  20. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  21. ) and (
  22. int(time.time()) - update_time_stamp
  23. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  24. ):
  25. AliyunLogger.logging(
  26. code="2004",
  27. trace_id=self.trace_id,
  28. platform=self.platform,
  29. mode=self.mode,
  30. env=self.env,
  31. message="发布时间超过{}天".format(
  32. int(self.rule_dict.get("period", {}).get("max", 1000))
  33. ),
  34. )
  35. return False
  36. return True
  37. # 视频标题是否满足需求
  38. def title_flag(self):
  39. title = self.item["video_title"]
  40. cleaned_title = re.sub(r"[^\w]", " ", title)
  41. # 敏感词
  42. # 获取敏感词列表
  43. sensitive_words = []
  44. if any(word in cleaned_title for word in sensitive_words):
  45. AliyunLogger.logging(
  46. code="2003",
  47. trace_id=self.trace_id,
  48. platform=self.platform,
  49. mode=self.mode,
  50. env=self.env,
  51. message="标题中包含敏感词",
  52. data=self.item,
  53. )
  54. return False
  55. return True
  56. # 视频基础下载规则
  57. def download_rule_flag(self):
  58. # 格式化 video_dict:publish_time_stamp
  59. if self.item.get("publish_time_stamp"):
  60. self.item["publish_time"] = self.item["publish_time_stamp"] * 1000
  61. # 格式化 video_dict:period
  62. if (
  63. self.item.get("publish_time")
  64. and self.item.get("period", "noperiod") == "noperiod"
  65. ):
  66. self.item["period"] = int(
  67. (int(time.time() * 1000) - self.item["publish_time"])
  68. / (3600 * 24 * 1000)
  69. )
  70. # 格式化 rule_dict 最大值取值为 0 的问题
  71. for key in self.item:
  72. if self.rule_dict.get(key):
  73. max_value = (
  74. int(self.rule_dict[key]["max"])
  75. if int(self.rule_dict[key]["max"]) > 0
  76. else 999999999999999
  77. )
  78. if key == "peroid":
  79. flag = 0 <= int(self.item[key]) <= max_value
  80. AliyunLogger.logging(
  81. code="2004",
  82. trace_id=self.trace_id,
  83. platform=self.platform,
  84. mode=self.mode,
  85. env=self.env,
  86. data=self.item,
  87. message="{}: 0 <= {} <= {}, {}".format(
  88. key, self.item[key], max_value, flag
  89. ),
  90. )
  91. if not flag:
  92. return flag
  93. else:
  94. flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value
  95. AliyunLogger.logging(
  96. code="2004",
  97. trace_id=self.trace_id,
  98. platform=self.platform,
  99. mode=self.mode,
  100. env=self.env,
  101. data=self.item,
  102. message="{}: {} <= {} <= {}, {}".format(
  103. key,
  104. self.rule_dict[key]["min"],
  105. self.item[key],
  106. max_value,
  107. flag,
  108. ),
  109. )
  110. if not flag:
  111. return flag
  112. else:
  113. continue
  114. return True
  115. # 按照某个具体平台来去重
  116. def repeat_video(self):
  117. # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """
  118. out_id = self.item["out_video_id"]
  119. sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
  120. repeat_video = MysqlHelper.get_values(
  121. log_type=self.mode, crawler=self.platform, env=self.env, sql=sql, action=""
  122. )
  123. if repeat_video:
  124. AliyunLogger.logging(
  125. code="2002",
  126. trace_id=self.trace_id,
  127. platform=self.platform,
  128. mode=self.mode,
  129. env=self.env,
  130. message="重复的视频",
  131. data=self.item,
  132. )
  133. return False
  134. return True
  135. def process_item(self):
  136. if not self.publish_time_flag():
  137. # 记录相关日志
  138. return False
  139. if not self.title_flag():
  140. # 记录相关日志
  141. return False
  142. if not self.repeat_video():
  143. # 记录相关日志
  144. return False
  145. if not self.download_rule_flag():
  146. # 记录相关日志
  147. return False
  148. return True