import re import time from .aliyun_log import AliyunLogger from common.scheduling_db import MysqlHelper class PiaoQuanPipeline: def __init__(self, platform, mode, rule_dict, env, item, trace_id): self.platform = platform self.mode = mode self.item = item self.rule_dict = rule_dict self.env = env self.trace_id = trace_id # 视频的发布时间限制, 属于是规则过滤 def publish_time_flag(self): # 判断发布时间 publish_time_stamp = self.item["publish_time_stamp"] update_time_stamp = self.item["update_time_stamp"] if self.platform == "gongzhonghao": if ( int(time.time()) - publish_time_stamp > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000)) ) and ( int(time.time()) - update_time_stamp > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000)) ): AliyunLogger.logging( code="2004", trace_id=self.trace_id, platform=self.platform, mode=self.mode, env=self.env, data=self.item, message="发布时间超过{}天".format( int(self.rule_dict.get("period", {}).get("max", 1000)) ), ) return False else: if ( int(time.time()) - publish_time_stamp > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000)) ): AliyunLogger.logging( code="2004", trace_id=self.trace_id, platform=self.platform, mode=self.mode, env=self.env, data=self.item, message="发布时间超过{}天".format( int(self.rule_dict.get("period", {}).get("max", 1000)) ), ) return False return True # 视频标题是否满足需求 def title_flag(self): title = self.item["video_title"] cleaned_title = re.sub(r"[^\w]", " ", title) # 敏感词 # 获取敏感词列表 sensitive_words = [] if any(word in cleaned_title for word in sensitive_words): AliyunLogger.logging( code="2003", trace_id=self.trace_id, platform=self.platform, mode=self.mode, env=self.env, message="标题中包含敏感词", data=self.item, ) return False return True # 视频基础下载规则 def download_rule_flag(self): for key in self.item: if self.rule_dict.get(key): max_value = ( int(self.rule_dict[key]["max"]) if int(self.rule_dict[key]["max"]) > 0 else 999999999999999 ) if key == "peroid": # peroid是抓取周期天数 continue else: flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value if not flag: AliyunLogger.logging( code="2004", trace_id=self.trace_id, platform=self.platform, mode=self.mode, env=self.env, data=self.item, message="{}: {} <= {} <= {}, {}".format( key, self.rule_dict[key]["min"], self.item[key], max_value, flag, ), ) return flag else: continue return True # 按照某个具体平台来去重 def repeat_video(self): # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """ out_id = self.item["out_video_id"] sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """ repeat_video = MysqlHelper.get_values( log_type=self.mode, crawler=self.platform, env=self.env, sql=sql, action="" ) if repeat_video: AliyunLogger.logging( code="2002", trace_id=self.trace_id, platform=self.platform, mode=self.mode, env=self.env, message="重复的视频", data=self.item, ) return False return True def process_item(self): if not self.publish_time_flag(): # 记录相关日志 return False if not self.title_flag(): # 记录相关日志 return False if not self.repeat_video(): # 记录相关日志 return False if not self.download_rule_flag(): # 记录相关日志 return False return True class PiaoQuanPipelineTest: def __init__(self, platform, mode, rule_dict, env, item, trace_id): self.platform = platform self.mode = mode self.item = item self.rule_dict = rule_dict self.env = env self.trace_id = trace_id # 视频的发布时间限制, 属于是规则过滤 def publish_time_flag(self): # 判断发布时间 publish_time_stamp = self.item["publish_time_stamp"] update_time_stamp = self.item["update_time_stamp"] if self.platform == "gongzhonghao": if ( int(time.time()) - publish_time_stamp > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000)) ) and ( int(time.time()) - update_time_stamp > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000)) ): message = "发布时间超过{}天".format( int(self.rule_dict.get("period", {}).get("max", 1000)) ) print(message) return False else: if ( int(time.time()) - publish_time_stamp > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000)) ): message = "发布时间超过{}天".format( int(self.rule_dict.get("period", {}).get("max", 1000)) ) print(message) return False return True # 视频标题是否满足需求 def title_flag(self): title = self.item["video_title"] cleaned_title = re.sub(r"[^\w]", " ", title) # 敏感词 # 获取敏感词列表 sensitive_words = [] if any(word in cleaned_title for word in sensitive_words): message = "标题中包含敏感词" print(message) return False return True # 视频基础下载规则 def download_rule_flag(self): for key in self.item: if self.rule_dict.get(key): max_value = ( int(self.rule_dict[key]["max"]) if int(self.rule_dict[key]["max"]) > 0 else 999999999999999 ) if key == "peroid": # peroid是抓取周期天数 continue else: flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value if not flag: message = "{}: {} <= {} <= {}, {}".format( key, self.rule_dict[key]["min"], self.item[key], max_value, flag, ) print(message) return flag else: continue return True # 按照某个具体平台来去重 def repeat_video(self): # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """ out_id = self.item["out_video_id"] sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """ repeat_video = MysqlHelper.get_values( log_type=self.mode, crawler=self.platform, env=self.env, sql=sql, action="" ) if repeat_video: message = "重复的视频" return False return True def process_item(self): if not self.publish_time_flag(): # 记录相关日志 return False if not self.title_flag(): # 记录相关日志 return False if not self.repeat_video(): # 记录相关日志 return False if not self.download_rule_flag(): # 记录相关日志 return False return True