import hashlib import re import sys import os import time from application.common.feishu.feishu_utils import FeishuUtils sys.path.append(os.getcwd()) from datetime import datetime from application.common import MysqlHelper, AliyunLogger # from application.common.redis.pyredis import RedisClient class PiaoQuanPipeline(object): """ 爬虫管道——爬虫规则判断 """ def __init__(self, platform, mode, rule_dict, env, item, trace_id, account=None): self.platform = platform self.mode = mode self.item = item self.rule_dict = rule_dict self.env = env self.trace_id = trace_id self.mysql = MysqlHelper(env=env, mode=mode, platform=platform) self.aliyun_log = AliyunLogger(platform=platform, mode=mode, env=env) self.account = account # self.red = RedisClient() def feishu_time_list(self): summary = FeishuUtils.get_values_batch("KsoMsyP2ghleM9tzBfmcEEXBnXg", "RuLK77") for row in summary[1:]: channel = row[0] day_count = row[1] if channel: if channel == self.platform: return day_count else: return None return None def publish_time_flag(self): """ 判断发布时间是否过期 :return: True or False """ # 判断发布时间 publish_time_stamp = self.item["publish_time_stamp"] update_time_stamp = self.item["update_time_stamp"] max_d = self.rule_dict.get("period", {}).get("max", 1000) min_d = self.rule_dict.get("period", {}).get("min", 1000) days = max_d if max_d > min_d else min_d days_time = self.feishu_time_list() if days_time: days = int(days_time) if self.platform == "gongzhonghao": if ( int(time.time()) - publish_time_stamp > 3600 * 24 * days ) and ( int(time.time()) - update_time_stamp > 3600 * 24 * days ): self.aliyun_log.logging( code="2004", trace_id=self.trace_id, data=self.item, message="发布时间超过{}天".format(days), ) return False else: if days == 0: publish_time_stamp = int(time.time()) # 示例时间戳 is_today = datetime.fromtimestamp(publish_time_stamp).date() == datetime.today().date() if not is_today: return False elif ( int(time.time()) - publish_time_stamp > 3600 * 24 * days ): self.aliyun_log.logging( code="2004", trace_id=self.trace_id, data=self.item, message="发布时间超过{}天".format(days), ) return False return True def title_flag(self): """ 视频标题是否满足需求 :return: """ title = self.item["video_title"] cleaned_title = re.sub(r"[^\w]", " ", title) # 敏感词 # 获取敏感词列表 sensitive_words = [] if any(word in cleaned_title for word in sensitive_words): self.aliyun_log.logging( code="2003", trace_id=self.trace_id, message="标题中包含敏感词", data=self.item, account=self.account ) return False return True def download_rule_flag(self): """ 视频基础下载规则 :return: """ for key in self.item: if self.rule_dict.get(key): max_value = ( int(self.rule_dict[key]["max"]) if int(self.rule_dict[key]["max"]) > 0 else 999999999999999 ) if key == "peroid": # peroid是抓取周期天数 continue else: flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value if not flag: self.aliyun_log.logging( code="2004", trace_id=self.trace_id, data=self.item, message="{}: {} <= {} <= {}, {}".format( key, self.rule_dict[key]["min"], self.item[key], max_value, flag, ), account=self.account ) return flag else: continue return True def feishu_list(self): summary = FeishuUtils.get_values_batch("KsoMsyP2ghleM9tzBfmcEEXBnXg", "letS93") for row in summary[1:]: channel = row[0] day_count = row[1] if channel: if channel == self.platform: return day_count else: return None return None # 按照某个具体平台来去重 def repeat_video(self): """ 视频是否重复 :return: """ out_id = self.item["out_video_id"] day_count = self.feishu_list() if day_count: sql_2 = f"""select create_time from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}" AND create_time >= DATE_SUB(NOW(), INTERVAL {int(day_count)} DAY);""" repeat_video = self.mysql.select(sql=sql_2) if repeat_video: self.aliyun_log.logging( code="2002", trace_id=self.trace_id, message="重复的视频", data=self.item, account=self.account ) return False else: return True if self.platform == "zhufuniannianshunxinjixiang" or self.platform == "weiquanshipin" or self.platform == "piaoquangushi" or self.platform == "lepaoledong" or self.platform == "zhufukuaizhuan" or self.platform == "linglingkuailezhufu" or self.platform == "lepaoledongdijie": return True if self.platform == "jierizhufuhuakaifugui" or self.platform == "yuannifuqimanman" or self.platform == "haoyunzhufuduo" or self.platform == "quzhuan" or self.platform == "zhufudewenhou" or self.platform == "jierizhufuxingfujixiang" or self.platform == "haoyoushipin" or self.platform == "xinshiquan" or self.platform == "laonianshenghuokuaile" or self.platform == "laonianquan": return True if self.platform == "zhuwanwufusunew" and self.mode == "recommend": return True if self.platform == "jixiangxingfu" and self.mode == "recommend": return True if self.platform == "yuannifuqichangzai" and self.mode == "recommend": return True if self.platform == "benshanzhufu" and self.mode == "recommend": return True if self.platform == "zuihaodesongni" and self.mode == "recommend": return True if self.platform == "tiantianjufuqi" and self.mode == "recommend": return True # 判断加上标题去重 if self.mode == "recommend" and self.platform == "zhufuhaoyunbaofu": title = self.item["video_title"] sql = f""" select 1 from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}" and video_title="{title}"; """ else: sql = f""" select 1 from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """ repeat_video = self.mysql.select(sql=sql) if repeat_video: # 喜事多多平台 4 天去重一次 if self.platform == "xishiduoduo": sql_2 = f"""select create_time from crawler_video where out_video_id="{out_id}";""" video_time = self.mysql.select(sql=sql_2)[0][0].timestamp() if int(time.time()) - video_time >= 86400 * 4: return True # 小年糕推荐流和祝福圈子推荐流 3 天去重一次 elif self.platform == "xiaoniangaotuijianliu" or self.platform == "zhufuquanzituijianliu": sql_2 = f"""select create_time from crawler_video where out_video_id="{out_id}";""" video_time = self.mysql.select(sql=sql_2)[0][0].timestamp() if int(time.time()) - video_time >= 86400 * 3: return True self.aliyun_log.logging( code="2002", trace_id=self.trace_id, message="重复的视频", data=self.item, account=self.account ) return False return True # def mq_exists(self): # """ # 检测 mq 是否已经发送过了 # :return: # """ # if self.red.connect(): # index_txt = "{}-{}".format(self.platform, self.item['video_id']) # index_md5 = hashlib.md5(index_txt.encode()).hexdigest() # if self.red.select(index_md5): # self.aliyun_log.logging( # code="2007", # trace_id=self.trace_id, # message="该视频 mq 已经发送" # ) # return False # else: # self.red.insert(index_md5, int(time.time()), 43200) # return True # else: # return True def process_item(self): """ 全规则判断,符合规则的数据则return True :return: """ # 判断该 mq 是否已经发了 # if not self.mq_exists(): # return False if not self.publish_time_flag(): # 记录相关日志 return False if not self.title_flag(): # 记录相关日志 return False if not self.repeat_video(): # 记录相关日志 return False if not self.download_rule_flag(): # 记录相关日志 return False return True