pipeline.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. import re
  2. import time
  3. from .aliyun_log import AliyunLogger
  4. from common.scheduling_db import MysqlHelper
  5. class PiaoQuanPipeline:
  6. def __init__(self, platform, mode, rule_dict, env, item):
  7. self.platform = platform
  8. self.mode = mode
  9. self.item = item
  10. self.rule_dict = rule_dict
  11. self.env = env
  12. # 视频的发布时间限制
  13. def publish_time_flag(self):
  14. # 判断发布时间
  15. publish_time_stamp = self.item["publish_time_stamp"]
  16. update_time_stamp = self.item["update_time_stamp"]
  17. if (
  18. int(time.time()) - publish_time_stamp
  19. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  20. ) and (
  21. int(time.time()) - update_time_stamp
  22. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  23. ):
  24. AliyunLogger.logging(
  25. code="2004",
  26. platform=self.platform,
  27. mode=self.mode,
  28. env=self.env,
  29. message="发布时间超过{}天".format(
  30. int(self.rule_dict.get("period", {}).get("max", 1000))
  31. ),
  32. )
  33. return False
  34. return True
  35. # 视频标题是否满足需求
  36. def title_flag(self):
  37. title = self.item["video_title"]
  38. cleaned_title = re.sub(r"[^\w]", " ", title)
  39. # 敏感词
  40. # 获取敏感词列表
  41. sensitive_words = []
  42. if any(word in cleaned_title for word in sensitive_words):
  43. AliyunLogger.logging(
  44. code="2004",
  45. platform=self.platform,
  46. mode=self.mode,
  47. env=self.env,
  48. message="标题中包含敏感词",
  49. data=self.item,
  50. )
  51. return False
  52. return True
  53. # 视频基础下载规则
  54. def download_rule_flag(self):
  55. # 格式化 video_dict:publish_time_stamp
  56. if self.item.get("publish_time_stamp"):
  57. self.item["publish_time"] = self.item["publish_time_stamp"] * 1000
  58. # 格式化 video_dict:period
  59. if (
  60. self.item.get("publish_time")
  61. and self.item.get("period", "noperiod") == "noperiod"
  62. ):
  63. self.item["period"] = int(
  64. (int(time.time() * 1000) - self.item["publish_time"])
  65. / (3600 * 24 * 1000)
  66. )
  67. # 格式化 rule_dict 最大值取值为 0 的问题
  68. for key in self.item:
  69. if self.rule_dict.get(key):
  70. max_value = (
  71. int(self.rule_dict[key]["max"])
  72. if int(self.rule_dict[key]["max"]) > 0
  73. else 999999999999999
  74. )
  75. if key == "peroid":
  76. flag = 0 <= int(self.item[key]) <= max_value
  77. AliyunLogger.logging(
  78. code="2004",
  79. platform=self.platform,
  80. mode=self.mode,
  81. env=self.env,
  82. data=self.item,
  83. message="{}: 0 <= {} <= {}, {}".format(
  84. key, self.item[key], max_value, flag
  85. ),
  86. )
  87. if not flag:
  88. return flag
  89. else:
  90. flag = int(self.rule_dict[key]["min"]) <= int(
  91. self.item[key] <= max_value
  92. )
  93. AliyunLogger.logging(
  94. code="2004",
  95. platform=self.platform,
  96. mode=self.mode,
  97. env=self.env,
  98. data=self.item,
  99. message="{}: {} <= {} <= {}, {}".format(
  100. key,
  101. self.rule_dict[key]["min"],
  102. self.item[key],
  103. max_value,
  104. flag,
  105. ),
  106. )
  107. if not flag:
  108. return flag
  109. else:
  110. continue
  111. return True
  112. # 按照某个具体平台来去重
  113. def repeat_video(self):
  114. # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """
  115. out_id = self.item["out_video_id"]
  116. sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
  117. repeat_video = MysqlHelper.get_values(
  118. log_type=self.mode, crawler=self.platform, env=self.env, sql=sql, action=""
  119. )
  120. if repeat_video:
  121. AliyunLogger.logging(
  122. code="2002",
  123. platform=self.platform,
  124. mode=self.mode,
  125. env=self.env,
  126. message="重复的视频",
  127. data=self.item,
  128. )
  129. return False
  130. return True
  131. def process_item(self):
  132. if not self.publish_time_flag():
  133. # 记录相关日志
  134. return False
  135. if not self.title_flag():
  136. # 记录相关日志
  137. return False
  138. if not self.repeat_video():
  139. # 记录相关日志
  140. return False
  141. if not self.download_rule_flag():
  142. # 记录相关日志
  143. return False
  144. AliyunLogger.logging(
  145. code="1002",
  146. platform=self.platform,
  147. mode=self.mode,
  148. env=self.env,
  149. data=self.item,
  150. message="该视频符合抓取条件,准备发往 ETL",
  151. )
  152. return True