pipeline.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. import re
  2. import time
  3. from application.common.log import AliyunLogger
  4. from application.common.mysql import MysqlHelper
  5. class PiaoQuanPipeline:
  6. def __init__(self, platform, mode, rule_dict, env, item, trace_id):
  7. self.platform = platform
  8. self.mode = mode
  9. self.item = item
  10. self.rule_dict = rule_dict
  11. self.env = env
  12. self.trace_id = trace_id
  13. self.mysql = MysqlHelper(env=env,mode=mode, platform=platform)
  14. self.aliyun_log = AliyunLogger(platform=platform, mode=mode, env=env)
  15. # 视频的发布时间限制, 属于是规则过滤
  16. def publish_time_flag(self):
  17. # 判断发布时间
  18. publish_time_stamp = self.item["publish_time_stamp"]
  19. update_time_stamp = self.item["update_time_stamp"]
  20. max_d = self.rule_dict.get("period", {}).get("max", 1000)
  21. min_d = self.rule_dict.get("period", {}).get("min", 1000)
  22. days = max_d if max_d > min_d else min_d
  23. if self.platform == "gongzhonghao":
  24. if (
  25. int(time.time()) - publish_time_stamp
  26. > 3600 * 24 * days
  27. ) and (
  28. int(time.time()) - update_time_stamp
  29. > 3600 * 24 * days
  30. ):
  31. self.aliyun_log.logging(
  32. code="2004",
  33. trace_id=self.trace_id,
  34. data=self.item,
  35. message="发布时间超过{}天".format(days),
  36. )
  37. return False
  38. else:
  39. if (
  40. int(time.time()) - publish_time_stamp
  41. > 3600 * 24 * days
  42. ):
  43. self.aliyun_log.logging(
  44. code="2004",
  45. trace_id=self.trace_id,
  46. data=self.item,
  47. message="发布时间超过{}天".format(days),
  48. )
  49. return False
  50. return True
  51. # 视频标题是否满足需求
  52. def title_flag(self):
  53. title = self.item["video_title"]
  54. cleaned_title = re.sub(r"[^\w]", " ", title)
  55. # 敏感词
  56. # 获取敏感词列表
  57. sensitive_words = []
  58. if any(word in cleaned_title for word in sensitive_words):
  59. self.aliyun_log.logging(
  60. code="2003",
  61. trace_id=self.trace_id,
  62. message="标题中包含敏感词",
  63. data=self.item,
  64. )
  65. return False
  66. return True
  67. # 视频基础下载规则
  68. def download_rule_flag(self):
  69. for key in self.item:
  70. if self.rule_dict.get(key):
  71. max_value = (
  72. int(self.rule_dict[key]["max"])
  73. if int(self.rule_dict[key]["max"]) > 0
  74. else 999999999999999
  75. )
  76. if key == "peroid": # peroid是抓取周期天数
  77. continue
  78. else:
  79. flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value
  80. if not flag:
  81. self.aliyun_log.logging(
  82. code="2004",
  83. trace_id=self.trace_id,
  84. data=self.item,
  85. message="{}: {} <= {} <= {}, {}".format(
  86. key,
  87. self.rule_dict[key]["min"],
  88. self.item[key],
  89. max_value,
  90. flag,
  91. ),
  92. )
  93. return flag
  94. else:
  95. continue
  96. return True
  97. # 按照某个具体平台来去重
  98. def repeat_video(self):
  99. # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """
  100. out_id = self.item["out_video_id"]
  101. sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
  102. repeat_video = self.mysql.select(sql=sql)
  103. if repeat_video:
  104. self.aliyun_log.logging(
  105. code="2002",
  106. trace_id=self.trace_id,
  107. message="重复的视频",
  108. data=self.item,
  109. )
  110. return False
  111. return True
  112. def process_item(self):
  113. if not self.publish_time_flag():
  114. # 记录相关日志
  115. return False
  116. if not self.title_flag():
  117. # 记录相关日志
  118. return False
  119. if not self.repeat_video():
  120. # 记录相关日志
  121. return False
  122. if not self.download_rule_flag():
  123. # 记录相关日志
  124. return False
  125. return True