pipeline.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. import re
  2. import time
  3. from .aliyun_log import AliyunLogger
  4. from common.scheduling_db import MysqlHelper
  5. class PiaoQuanPipeline:
  6. def __init__(self, platform, mode, rule_dict, env, item, trace_id):
  7. self.platform = platform
  8. self.mode = mode
  9. self.item = item
  10. self.rule_dict = rule_dict
  11. self.env = env
  12. self.trace_id = trace_id
  13. # 视频的发布时间限制, 属于是规则过滤
  14. def publish_time_flag(self):
  15. # 判断发布时间
  16. publish_time_stamp = self.item["publish_time_stamp"]
  17. update_time_stamp = self.item["update_time_stamp"]
  18. if (
  19. int(time.time()) - publish_time_stamp
  20. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  21. ) and (
  22. int(time.time()) - update_time_stamp
  23. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  24. ):
  25. AliyunLogger.logging(
  26. code="2004",
  27. trace_id=self.trace_id,
  28. platform=self.platform,
  29. mode=self.mode,
  30. env=self.env,
  31. message="发布时间超过{}天".format(
  32. int(self.rule_dict.get("period", {}).get("max", 1000))
  33. ),
  34. )
  35. return False
  36. return True
  37. # 视频标题是否满足需求
  38. def title_flag(self):
  39. title = self.item["video_title"]
  40. cleaned_title = re.sub(r"[^\w]", " ", title)
  41. # 敏感词
  42. # 获取敏感词列表
  43. sensitive_words = []
  44. if any(word in cleaned_title for word in sensitive_words):
  45. AliyunLogger.logging(
  46. code="2003",
  47. trace_id=self.trace_id,
  48. platform=self.platform,
  49. mode=self.mode,
  50. env=self.env,
  51. message="标题中包含敏感词",
  52. data=self.item,
  53. )
  54. return False
  55. return True
  56. # 视频基础下载规则
  57. def download_rule_flag(self):
  58. # 格式化 video_dict:publish_time_stamp
  59. # if self.item.get("publish_time_stamp"):
  60. # self.item["publish_time"] = self.item["publish_time_stamp"] * 1000
  61. # # 格式化 video_dict:period
  62. # if (
  63. # self.item.get("publish_time")
  64. # and self.item.get("period", "noperiod") == "noperiod"
  65. # ):
  66. # self.item["period"] = int(
  67. # (int(time.time() * 1000) - self.item["publish_time"])
  68. # / (3600 * 24 * 1000)
  69. # )
  70. # 格式化 rule_dict 最大值取值为 0 的问题
  71. for key in self.item:
  72. if self.rule_dict.get(key):
  73. max_value = (
  74. int(self.rule_dict[key]["max"])
  75. if int(self.rule_dict[key]["max"]) > 0
  76. else 999999999999999
  77. )
  78. if key == "peroid": # peroid是抓取周期天数
  79. continue
  80. # flag = 0 <= int(self.item[key]) <= max_value
  81. # if not flag:
  82. # AliyunLogger.logging(
  83. # code="2004",
  84. # trace_id=self.trace_id,
  85. # platform=self.platform,
  86. # mode=self.mode,
  87. # env=self.env,
  88. # data=self.item,
  89. # message="{}: 0 <= {} <= {}, {}".format(
  90. # key, self.item[key], max_value, flag
  91. # ),
  92. # )
  93. # return flag
  94. else:
  95. flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value
  96. if not flag:
  97. AliyunLogger.logging(
  98. code="2004",
  99. trace_id=self.trace_id,
  100. platform=self.platform,
  101. mode=self.mode,
  102. env=self.env,
  103. data=self.item,
  104. message="{}: {} <= {} <= {}, {}".format(
  105. key,
  106. self.rule_dict[key]["min"],
  107. self.item[key],
  108. max_value,
  109. flag,
  110. ),
  111. )
  112. return flag
  113. else:
  114. continue
  115. return True
  116. # 按照某个具体平台来去重
  117. def repeat_video(self):
  118. # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """
  119. out_id = self.item["out_video_id"]
  120. sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
  121. repeat_video = MysqlHelper.get_values(
  122. log_type=self.mode, crawler=self.platform, env=self.env, sql=sql, action=""
  123. )
  124. if repeat_video:
  125. AliyunLogger.logging(
  126. code="2002",
  127. trace_id=self.trace_id,
  128. platform=self.platform,
  129. mode=self.mode,
  130. env=self.env,
  131. message="重复的视频",
  132. data=self.item,
  133. )
  134. return False
  135. return True
  136. def process_item(self):
  137. if not self.publish_time_flag():
  138. # 记录相关日志
  139. return False
  140. if not self.title_flag():
  141. # 记录相关日志
  142. return False
  143. if not self.repeat_video():
  144. # 记录相关日志
  145. return False
  146. if not self.download_rule_flag():
  147. # 记录相关日志
  148. return False
  149. return True