pipeline.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. import re
  2. import time
  3. from .aliyun_log import AliyunLogger
  4. from common.scheduling_db import MysqlHelper
  5. class PiaoQuanPipeline:
  6. def __init__(self, platform, mode, rule_dict, env, item, trace_id):
  7. self.platform = platform
  8. self.mode = mode
  9. self.item = item
  10. self.rule_dict = rule_dict
  11. self.env = env
  12. self.trace_id = trace_id
  13. # 视频的发布时间限制, 属于是规则过滤
  14. def publish_time_flag(self):
  15. # 判断发布时间
  16. publish_time_stamp = self.item["publish_time_stamp"]
  17. update_time_stamp = self.item["update_time_stamp"]
  18. if (
  19. int(time.time()) - publish_time_stamp
  20. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  21. ) and (
  22. int(time.time()) - update_time_stamp
  23. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  24. ):
  25. AliyunLogger.logging(
  26. code="2004",
  27. trace_id=self.trace_id,
  28. platform=self.platform,
  29. mode=self.mode,
  30. env=self.env,
  31. message="发布时间超过{}天".format(
  32. int(self.rule_dict.get("period", {}).get("max", 1000))
  33. ),
  34. )
  35. return False
  36. return True
  37. # 视频标题是否满足需求
  38. def title_flag(self):
  39. title = self.item["video_title"]
  40. cleaned_title = re.sub(r"[^\w]", " ", title)
  41. # 敏感词
  42. # 获取敏感词列表
  43. sensitive_words = []
  44. if any(word in cleaned_title for word in sensitive_words):
  45. AliyunLogger.logging(
  46. code="2003",
  47. trace_id=self.trace_id,
  48. platform=self.platform,
  49. mode=self.mode,
  50. env=self.env,
  51. message="标题中包含敏感词",
  52. data=self.item,
  53. )
  54. return False
  55. return True
  56. # 视频基础下载规则
  57. def download_rule_flag(self):
  58. # 格式化 video_dict:publish_time_stamp
  59. if self.item.get("publish_time_stamp"):
  60. self.item["publish_time"] = self.item["publish_time_stamp"] * 1000
  61. # 格式化 video_dict:period
  62. if (
  63. self.item.get("publish_time")
  64. and self.item.get("period", "noperiod") == "noperiod"
  65. ):
  66. self.item["period"] = int(
  67. (int(time.time() * 1000) - self.item["publish_time"])
  68. / (3600 * 24 * 1000)
  69. )
  70. # 格式化 rule_dict 最大值取值为 0 的问题
  71. for key in self.item:
  72. if self.rule_dict.get(key):
  73. max_value = (
  74. int(self.rule_dict[key]["max"])
  75. if int(self.rule_dict[key]["max"]) > 0
  76. else 999999999999999
  77. )
  78. if key == "peroid":
  79. flag = 0 <= int(self.item[key]) <= max_value
  80. AliyunLogger.logging(
  81. code="2004",
  82. trace_id=self.trace_id,
  83. platform=self.platform,
  84. mode=self.mode,
  85. env=self.env,
  86. data=self.item,
  87. message="{}: 0 <= {} <= {}, {}".format(
  88. key, self.item[key], max_value, flag
  89. ),
  90. )
  91. if not flag:
  92. return flag
  93. else:
  94. flag = int(self.rule_dict[key]["min"]) <= int(
  95. self.item[key] <= max_value
  96. )
  97. AliyunLogger.logging(
  98. code="2004",
  99. trace_id=self.trace_id,
  100. platform=self.platform,
  101. mode=self.mode,
  102. env=self.env,
  103. data=self.item,
  104. message="{}: {} <= {} <= {}, {}".format(
  105. key,
  106. self.rule_dict[key]["min"],
  107. self.item[key],
  108. max_value,
  109. flag,
  110. ),
  111. )
  112. if not flag:
  113. return flag
  114. else:
  115. continue
  116. return True
  117. # 按照某个具体平台来去重
  118. def repeat_video(self):
  119. # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """
  120. out_id = self.item["out_video_id"]
  121. sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
  122. repeat_video = MysqlHelper.get_values(
  123. log_type=self.mode, crawler=self.platform, env=self.env, sql=sql, action=""
  124. )
  125. if repeat_video:
  126. AliyunLogger.logging(
  127. code="2002",
  128. trace_id=self.trace_id,
  129. platform=self.platform,
  130. mode=self.mode,
  131. env=self.env,
  132. message="重复的视频",
  133. data=self.item,
  134. )
  135. print("video_repeat")
  136. return False
  137. return True
  138. def process_item(self):
  139. if not self.publish_time_flag():
  140. # 记录相关日志
  141. return False
  142. if not self.title_flag():
  143. # 记录相关日志
  144. return False
  145. if not self.repeat_video():
  146. # 记录相关日志
  147. return False
  148. if not self.download_rule_flag():
  149. # 记录相关日志
  150. return False
  151. return True