import re import time from .aliyun_log import AliyunLogger from common.scheduling_db import MysqlHelper class PiaoQuanPipeline: def __init__(self, platform, mode, rule_dict, env, item): self.platform = platform self.mode = mode self.item = item self.rule_dict = rule_dict self.env = env # 视频的发布时间限制 def publish_time_flag(self): # 判断发布时间 publish_time_stamp = self.item["publish_time_stamp"] update_time_stamp = self.item["update_time_stamp"] if ( int(time.time()) - publish_time_stamp > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000)) ) and ( int(time.time()) - update_time_stamp > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000)) ): AliyunLogger.logging( code="2001", platform=self.platform, mode=self.mode, data="", env=self.env, message="发布时间超过{}天".format(int(self.rule_dict.get('period', {}).get('max', 1000))) ) return False return True # 视频标题是否满足需求 def title_flag(self): title = self.item['video_title'] cleaned_title = re.sub(r'[^\w]', ' ', title) # 敏感词 # 获取敏感词列表 sensitive_words = [] if any(word in cleaned_title for word in sensitive_words): AliyunLogger.logging( code="2004", platform=self.platform, mode=self.mode, env=self.env, message="标题中包含敏感词", data=self.item ) return False return True # 视频基础下载规则 def download_rule_flag(self): # 格式化 video_dict:publish_time_stamp if self.item.get("publish_time_stamp"): self.item["publish_time"] = self.item["publish_time_stamp"] * 1000 # 格式化 video_dict:period if self.item.get("publish_time") and self.item.get("period", "noperiod") == "noperiod": self.item["period"] = int((int(time.time() * 1000) - self.item["publish_time"]) / (3600 * 24 * 1000)) # 格式化 rule_dict 最大值取值为 0 的问题 for key in self.item: if self.rule_dict.get(key): max_value = int(self.rule_dict[key]["max"]) if int(self.rule_dict[key]["max"]) > 0 else 999999999999999 if key == "peroid": flag = 0 <= int(self.item[key]) <= max_value AliyunLogger.logging( code="2003", platform=self.platform, mode=self.mode, env=self.env, data=self.item, message='{}: 0 <= {} <= {}, {}'.format(key, self.item[key], max_value, flag) ) if not flag: return flag else: flag = int(self.rule_dict[key]["min"]) <= int(self.item[key] <= max_value) AliyunLogger.logging( code="2003", platform=self.platform, mode=self.mode, env=self.env, data=self.item, message='{}: {} <= {} <= {}, {}'.format(key, self.rule_dict[key]["min"], self.item[key], max_value, flag) ) if not flag: return flag else: continue return True # 按照某个具体平台来去重 def repeat_video(self): # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """ out_id = self.item['out_video_id'] sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """ repeat_video = MysqlHelper.get_values( log_type=self.mode, crawler=self.platform, env=self.env, sql=sql, action='' ) if repeat_video: AliyunLogger.logging( code="2002", platform=self.platform, mode=self.mode, env=self.env, message="重复的视频", data=self.item ) return False return True def process_item(self): if not self.publish_time_flag(): # 记录相关日志 return False if not self.title_flag(): # 记录相关日志 return False if not self.repeat_video(): # 记录相关日志 return False if not self.download_rule_flag(): # 记录相关日志 return False AliyunLogger.logging( code="2000", platform=self.platform, mode=self.mode, env=self.env, data=self.item, message="该视频符合抓取条件,准备发往 ETL" ) return True