pipeline.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. import re
  2. import time
  3. from .aliyun_log import AliyunLogger
  4. from common.scheduling_db import MysqlHelper
  5. class PiaoQuanPipeline:
  6. def __init__(self, platform, mode, rule_dict, env, item):
  7. self.platform = platform
  8. self.mode = mode
  9. self.item = item
  10. self.rule_dict = rule_dict
  11. self.env = env
  12. # 视频的发布时间限制
  13. def publish_time_flag(self):
  14. # 判断发布时间
  15. publish_time_stamp = self.item["publish_time_stamp"]
  16. update_time_stamp = self.item["update_time_stamp"]
  17. if (
  18. int(time.time()) - publish_time_stamp
  19. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  20. ) and (
  21. int(time.time()) - update_time_stamp
  22. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  23. ):
  24. AliyunLogger.logging(
  25. code="2001",
  26. platform=self.platform,
  27. mode=self.mode,
  28. data="",
  29. env=self.env,
  30. message="发布时间超过{}天".format(int(self.rule_dict.get('period', {}).get('max', 1000)))
  31. )
  32. return False
  33. return True
  34. # 视频标题是否满足需求
  35. def title_flag(self):
  36. title = self.item['video_title']
  37. cleaned_title = re.sub(r'[^\w]', ' ', title)
  38. # 敏感词
  39. # 获取敏感词列表
  40. sensitive_words = []
  41. if any(word in cleaned_title for word in sensitive_words):
  42. AliyunLogger.logging(
  43. code="2004",
  44. platform=self.platform,
  45. mode=self.mode,
  46. env=self.env,
  47. message="标题中包含敏感词",
  48. data=self.item
  49. )
  50. return False
  51. return True
  52. # 视频基础下载规则
  53. def download_rule_flag(self):
  54. # 格式化 video_dict:publish_time_stamp
  55. if self.item.get("publish_time_stamp"):
  56. self.item["publish_time"] = self.item["publish_time_stamp"] * 1000
  57. # 格式化 video_dict:period
  58. if self.item.get("publish_time") and self.item.get("period", "noperiod") == "noperiod":
  59. self.item["period"] = int((int(time.time() * 1000) - self.item["publish_time"]) / (3600 * 24 * 1000))
  60. # 格式化 rule_dict 最大值取值为 0 的问题
  61. for key in self.item:
  62. if self.rule_dict.get(key):
  63. max_value = int(self.rule_dict[key]["max"]) if int(self.rule_dict[key]["max"]) > 0 else 999999999999999
  64. if key == "peroid":
  65. flag = 0 <= int(self.item[key]) <= max_value
  66. AliyunLogger.logging(
  67. code="2003",
  68. platform=self.platform,
  69. mode=self.mode,
  70. env=self.env,
  71. data=self.item,
  72. message='{}: 0 <= {} <= {}, {}'.format(key, self.item[key], max_value, flag)
  73. )
  74. if not flag:
  75. return flag
  76. else:
  77. flag = int(self.rule_dict[key]["min"]) <= int(self.item[key] <= max_value)
  78. AliyunLogger.logging(
  79. code="2003",
  80. platform=self.platform,
  81. mode=self.mode,
  82. env=self.env,
  83. data=self.item,
  84. message='{}: {} <= {} <= {}, {}'.format(key, self.rule_dict[key]["min"], self.item[key], max_value, flag)
  85. )
  86. if not flag:
  87. return flag
  88. else:
  89. continue
  90. return True
  91. # 按照某个具体平台来去重
  92. def repeat_video(self):
  93. # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """
  94. out_id = self.item['out_video_id']
  95. sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
  96. repeat_video = MysqlHelper.get_values(
  97. log_type=self.mode,
  98. crawler=self.platform,
  99. env=self.env,
  100. sql=sql,
  101. action=''
  102. )
  103. if repeat_video:
  104. AliyunLogger.logging(
  105. code="2002",
  106. platform=self.platform,
  107. mode=self.mode,
  108. env=self.env,
  109. message="重复的视频",
  110. data=self.item
  111. )
  112. return False
  113. return True
  114. def process_item(self):
  115. if not self.publish_time_flag():
  116. # 记录相关日志
  117. return False
  118. if not self.title_flag():
  119. # 记录相关日志
  120. return False
  121. if not self.repeat_video():
  122. # 记录相关日志
  123. return False
  124. if not self.download_rule_flag():
  125. # 记录相关日志
  126. return False
  127. AliyunLogger.logging(
  128. code="2000",
  129. platform=self.platform,
  130. mode=self.mode,
  131. env=self.env,
  132. data=self.item,
  133. message="该视频符合抓取条件,准备发往 ETL"
  134. )
  135. return True