pipeline.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. import re
  2. import sys
  3. import os
  4. import time
  5. sys.path.append(os.getcwd())
  6. from application.common import MysqlHelper, AliyunLogger
  7. class PiaoQuanPipeline:
  8. def __init__(self, platform, mode, rule_dict, env, item, trace_id):
  9. self.platform = platform
  10. self.mode = mode
  11. self.item = item
  12. self.rule_dict = rule_dict
  13. self.env = env
  14. self.trace_id = trace_id
  15. self.mysql = MysqlHelper(env=env,mode=mode, platform=platform)
  16. self.aliyun_log = AliyunLogger(platform=platform, mode=mode, env=env)
  17. # 视频的发布时间限制, 属于是规则过滤
  18. def publish_time_flag(self):
  19. # 判断发布时间
  20. publish_time_stamp = self.item["publish_time_stamp"]
  21. update_time_stamp = self.item["update_time_stamp"]
  22. max_d = self.rule_dict.get("period", {}).get("max", 1000)
  23. min_d = self.rule_dict.get("period", {}).get("min", 1000)
  24. days = max_d if max_d > min_d else min_d
  25. if self.platform == "gongzhonghao":
  26. if (
  27. int(time.time()) - publish_time_stamp
  28. > 3600 * 24 * days
  29. ) and (
  30. int(time.time()) - update_time_stamp
  31. > 3600 * 24 * days
  32. ):
  33. self.aliyun_log.logging(
  34. code="2004",
  35. trace_id=self.trace_id,
  36. data=self.item,
  37. message="发布时间超过{}天".format(days),
  38. )
  39. return False
  40. else:
  41. if (
  42. int(time.time()) - publish_time_stamp
  43. > 3600 * 24 * days
  44. ):
  45. self.aliyun_log.logging(
  46. code="2004",
  47. trace_id=self.trace_id,
  48. data=self.item,
  49. message="发布时间超过{}天".format(days),
  50. )
  51. return False
  52. return True
  53. # 视频标题是否满足需求
  54. def title_flag(self):
  55. title = self.item["video_title"]
  56. cleaned_title = re.sub(r"[^\w]", " ", title)
  57. # 敏感词
  58. # 获取敏感词列表
  59. sensitive_words = []
  60. if any(word in cleaned_title for word in sensitive_words):
  61. self.aliyun_log.logging(
  62. code="2003",
  63. trace_id=self.trace_id,
  64. message="标题中包含敏感词",
  65. data=self.item,
  66. )
  67. return False
  68. return True
  69. # 视频基础下载规则
  70. def download_rule_flag(self):
  71. for key in self.item:
  72. if self.rule_dict.get(key):
  73. max_value = (
  74. int(self.rule_dict[key]["max"])
  75. if int(self.rule_dict[key]["max"]) > 0
  76. else 999999999999999
  77. )
  78. if key == "peroid": # peroid是抓取周期天数
  79. continue
  80. else:
  81. flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value
  82. if not flag:
  83. self.aliyun_log.logging(
  84. code="2004",
  85. trace_id=self.trace_id,
  86. data=self.item,
  87. message="{}: {} <= {} <= {}, {}".format(
  88. key,
  89. self.rule_dict[key]["min"],
  90. self.item[key],
  91. max_value,
  92. flag,
  93. ),
  94. )
  95. return flag
  96. else:
  97. continue
  98. return True
  99. # 按照某个具体平台来去重
  100. def repeat_video(self):
  101. # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """
  102. out_id = self.item["out_video_id"]
  103. sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
  104. repeat_video = self.mysql.select(sql=sql)
  105. if repeat_video:
  106. self.aliyun_log.logging(
  107. code="2002",
  108. trace_id=self.trace_id,
  109. message="重复的视频",
  110. data=self.item,
  111. )
  112. return False
  113. return True
  114. def process_item(self):
  115. if not self.publish_time_flag():
  116. # 记录相关日志
  117. return False
  118. if not self.title_flag():
  119. # 记录相关日志
  120. return False
  121. if not self.repeat_video():
  122. # 记录相关日志
  123. return False
  124. if not self.download_rule_flag():
  125. # 记录相关日志
  126. return False
  127. return True