import re import time from .aliyun_log import AliyunLogger from common.scheduling_db import MysqlHelper class PiaoQuanPipeline: def __init__(self, platform, mode, rule_dict, env, item): self.platform = platform self.mode = mode self.item = item self.rule_dict = rule_dict self.env = env # 视频的发布时间限制 def publish_time_flag(self): # 判断发布时间 publish_time_stamp = self.item["publish_time_stamp"] update_time_stamp = self.item["update_time_stamp"] if ( int(time.time()) - publish_time_stamp > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000)) ) and ( int(time.time()) - update_time_stamp > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000)) ): AliyunLogger.logging( code="2004", platform=self.platform, mode=self.mode, env=self.env, message="发布时间超过{}天".format( int(self.rule_dict.get("period", {}).get("max", 1000)) ), ) return False return True # 视频标题是否满足需求 def title_flag(self): title = self.item["video_title"] cleaned_title = re.sub(r"[^\w]", " ", title) # 敏感词 # 获取敏感词列表 sensitive_words = [] if any(word in cleaned_title for word in sensitive_words): AliyunLogger.logging( code="2004", platform=self.platform, mode=self.mode, env=self.env, message="标题中包含敏感词", data=self.item, ) return False return True # 视频基础下载规则 def download_rule_flag(self): # 格式化 video_dict:publish_time_stamp if self.item.get("publish_time_stamp"): self.item["publish_time"] = self.item["publish_time_stamp"] * 1000 # 格式化 video_dict:period if ( self.item.get("publish_time") and self.item.get("period", "noperiod") == "noperiod" ): self.item["period"] = int( (int(time.time() * 1000) - self.item["publish_time"]) / (3600 * 24 * 1000) ) # 格式化 rule_dict 最大值取值为 0 的问题 for key in self.item: if self.rule_dict.get(key): max_value = ( int(self.rule_dict[key]["max"]) if int(self.rule_dict[key]["max"]) > 0 else 999999999999999 ) if key == "peroid": flag = 0 <= int(self.item[key]) <= max_value AliyunLogger.logging( code="2004", platform=self.platform, mode=self.mode, env=self.env, data=self.item, message="{}: 0 <= {} <= {}, {}".format( key, self.item[key], max_value, flag ), ) if not flag: return flag else: flag = int(self.rule_dict[key]["min"]) <= int( self.item[key] <= max_value ) AliyunLogger.logging( code="2004", platform=self.platform, mode=self.mode, env=self.env, data=self.item, message="{}: {} <= {} <= {}, {}".format( key, self.rule_dict[key]["min"], self.item[key], max_value, flag, ), ) if not flag: return flag else: continue return True # 按照某个具体平台来去重 def repeat_video(self): # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """ out_id = self.item["out_video_id"] sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """ repeat_video = MysqlHelper.get_values( log_type=self.mode, crawler=self.platform, env=self.env, sql=sql, action="" ) if repeat_video: AliyunLogger.logging( code="2002", platform=self.platform, mode=self.mode, env=self.env, message="重复的视频", data=self.item, ) return False return True def process_item(self): if not self.publish_time_flag(): # 记录相关日志 return False if not self.title_flag(): # 记录相关日志 return False if not self.repeat_video(): # 记录相关日志 return False if not self.download_rule_flag(): # 记录相关日志 return False AliyunLogger.logging( code="1002", platform=self.platform, mode=self.mode, env=self.env, data=self.item, message="该视频符合抓取条件,准备发往 ETL", ) return True