| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273 | import hashlibimport reimport sysimport osimport timefrom application.common.feishu.feishu_utils import FeishuUtilssys.path.append(os.getcwd())from datetime import datetimefrom application.common import MysqlHelper, AliyunLogger# from application.common.redis.pyredis import RedisClientclass PiaoQuanPipeline(object):    """    爬虫管道——爬虫规则判断    """    def __init__(self, platform, mode, rule_dict, env, item, trace_id, account=None):        self.platform = platform        self.mode = mode        self.item = item        self.rule_dict = rule_dict        self.env = env        self.trace_id = trace_id        self.mysql = MysqlHelper(env=env, mode=mode, platform=platform)        self.aliyun_log = AliyunLogger(platform=platform, mode=mode, env=env)        self.account = account        # self.red = RedisClient()    def feishu_time_list(self):        summary = FeishuUtils.get_values_batch("KsoMsyP2ghleM9tzBfmcEEXBnXg", "RuLK77")        for row in summary[1:]:            channel = row[0]            day_count = row[1]            if channel:                if channel == self.platform:                    return day_count            else:                return None        return None    def publish_time_flag(self):        """        判断发布时间是否过期        :return: True or False        """        # 判断发布时间        publish_time_stamp = self.item["publish_time_stamp"]        update_time_stamp = self.item["update_time_stamp"]        max_d = self.rule_dict.get("period", {}).get("max", 1000)        min_d = self.rule_dict.get("period", {}).get("min", 1000)        days = max_d if max_d > min_d else min_d        days_time = self.feishu_time_list()        if days_time:            days = int(days_time)        if self.platform == "gongzhonghao":            if (                    int(time.time()) - publish_time_stamp                    > 3600 * 24 * days            ) and (                    int(time.time()) - update_time_stamp                    > 3600 * 24 * days            ):                self.aliyun_log.logging(                    code="2004",                    trace_id=self.trace_id,                    data=self.item,                    message="发布时间超过{}天".format(days),                )                return False        else:            if days == 0:                publish_time_stamp = int(time.time())  # 示例时间戳                is_today = datetime.fromtimestamp(publish_time_stamp).date() == datetime.today().date()                if not is_today:                    return False            elif (                    int(time.time()) - publish_time_stamp                    > 3600 * 24 * days            ):                self.aliyun_log.logging(                    code="2004",                    trace_id=self.trace_id,                    data=self.item,                    message="发布时间超过{}天".format(days),                )                return False        return True    def title_flag(self):        """        视频标题是否满足需求        :return:        """        title = self.item["video_title"]        cleaned_title = re.sub(r"[^\w]", " ", title)        # 敏感词        # 获取敏感词列表        sensitive_words = []        if any(word in cleaned_title for word in sensitive_words):            self.aliyun_log.logging(                code="2003",                trace_id=self.trace_id,                message="标题中包含敏感词",                data=self.item,                account=self.account            )            return False        return True    def download_rule_flag(self):        """        视频基础下载规则        :return:        """        for key in self.item:            if self.rule_dict.get(key):                max_value = (                    int(self.rule_dict[key]["max"])                    if int(self.rule_dict[key]["max"]) > 0                    else 999999999999999                )                if key == "peroid":  # peroid是抓取周期天数                    continue                else:                    flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value                    if not flag:                        self.aliyun_log.logging(                            code="2004",                            trace_id=self.trace_id,                            data=self.item,                            message="{}: {} <= {} <= {}, {}".format(                                key,                                self.rule_dict[key]["min"],                                self.item[key],                                max_value,                                flag,                            ),                            account=self.account                        )                        return flag            else:                continue        return True    def feishu_list(self):        summary = FeishuUtils.get_values_batch("KsoMsyP2ghleM9tzBfmcEEXBnXg", "letS93")        for row in summary[1:]:            channel = row[0]            day_count = row[1]            if channel:                if channel == self.platform:                    return day_count            else:                return None        return None    # 按照某个具体平台来去重    def repeat_video(self):        """        视频是否重复        :return:        """        out_id = self.item["out_video_id"]        day_count = self.feishu_list()        if day_count:            sql_2 = f"""select create_time from crawler_video where platform = "{self.platform}" and  out_video_id="{out_id}" AND create_time >= DATE_SUB(NOW(), INTERVAL {int(day_count)} DAY);"""            repeat_video = self.mysql.select(sql=sql_2)            if repeat_video:                self.aliyun_log.logging(                    code="2002",                    trace_id=self.trace_id,                    message="重复的视频",                    data=self.item,                    account=self.account                )                return False            else:                return True        if self.platform == "zhufuniannianshunxinjixiang" or  self.platform == "weiquanshipin" or  self.platform == "piaoquangushi" or  self.platform == "lepaoledong" or  self.platform == "zhufukuaizhuan" or self.platform == "linglingkuailezhufu" or self.platform == "lepaoledongdijie":            return True        if self.platform == "jierizhufuhuakaifugui" or self.platform == "yuannifuqimanman" or self.platform == "haoyunzhufuduo" or self.platform == "quzhuan" or self.platform == "zhufudewenhou" or self.platform == "jierizhufuxingfujixiang" or self.platform == "haoyoushipin" or self.platform == "xinshiquan" or self.platform == "laonianshenghuokuaile" or self.platform == "laonianquan":            return True        if self.platform == "zhuwanwufusunew" and self.mode == "recommend":            return True        if self.platform == "jixiangxingfu" and self.mode == "recommend":            return True        if self.platform == "yuannifuqichangzai" and self.mode == "recommend":            return True        if self.platform == "benshanzhufu" and self.mode == "recommend":            return True        if self.platform == "zuihaodesongni" and self.mode == "recommend":            return True        if self.platform == "tiantianjufuqi" and self.mode == "recommend":            return True        # 判断加上标题去重        if self.mode == "recommend" and self.platform == "zhufuhaoyunbaofu":            title = self.item["video_title"]            sql = f""" select 1 from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}" and video_title="{title}"; """        else:            sql = f""" select 1 from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """        repeat_video = self.mysql.select(sql=sql)        if repeat_video:            # 喜事多多平台 4 天去重一次            if self.platform == "xishiduoduo":                sql_2 = f"""select create_time from crawler_video where out_video_id="{out_id}";"""                video_time = self.mysql.select(sql=sql_2)[0][0].timestamp()                if int(time.time()) - video_time >= 86400 * 4:                    return True            # 小年糕推荐流和祝福圈子推荐流 3 天去重一次            elif self.platform == "xiaoniangaotuijianliu" or self.platform == "zhufuquanzituijianliu":                sql_2 = f"""select create_time from crawler_video where out_video_id="{out_id}";"""                video_time = self.mysql.select(sql=sql_2)[0][0].timestamp()                if int(time.time()) - video_time >= 86400 * 3:                    return True            self.aliyun_log.logging(                code="2002",                trace_id=self.trace_id,                message="重复的视频",                data=self.item,                account=self.account            )            return False        return True    # def mq_exists(self):    #     """    #     检测 mq 是否已经发送过了    #     :return:    #     """    #     if self.red.connect():    #         index_txt = "{}-{}".format(self.platform, self.item['video_id'])    #         index_md5 = hashlib.md5(index_txt.encode()).hexdigest()    #         if self.red.select(index_md5):    #             self.aliyun_log.logging(    #                 code="2007",    #                 trace_id=self.trace_id,    #                 message="该视频 mq 已经发送"    #             )    #             return False    #         else:    #             self.red.insert(index_md5, int(time.time()), 43200)    #             return True    #     else:    #         return True    def process_item(self):        """        全规则判断,符合规则的数据则return True        :return:        """        # 判断该 mq 是否已经发了        # if not self.mq_exists():        #     return False        if not self.publish_time_flag():            # 记录相关日志            return False        if not self.title_flag():            # 记录相关日志            return False        if not self.repeat_video():            # 记录相关日志            return False        if not self.download_rule_flag():            # 记录相关日志            return False        return True
 |