123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273 |
- import hashlib
- import re
- import sys
- import os
- import time
- from application.common.feishu.feishu_utils import FeishuUtils
- sys.path.append(os.getcwd())
- from datetime import datetime
- from application.common import MysqlHelper, AliyunLogger
- # from application.common.redis.pyredis import RedisClient
- class PiaoQuanPipeline(object):
- """
- 爬虫管道——爬虫规则判断
- """
- def __init__(self, platform, mode, rule_dict, env, item, trace_id, account=None):
- self.platform = platform
- self.mode = mode
- self.item = item
- self.rule_dict = rule_dict
- self.env = env
- self.trace_id = trace_id
- self.mysql = MysqlHelper(env=env, mode=mode, platform=platform)
- self.aliyun_log = AliyunLogger(platform=platform, mode=mode, env=env)
- self.account = account
- # self.red = RedisClient()
- def feishu_time_list(self):
- summary = FeishuUtils.get_values_batch("KsoMsyP2ghleM9tzBfmcEEXBnXg", "RuLK77")
- for row in summary[1:]:
- channel = row[0]
- day_count = row[1]
- if channel:
- if channel == self.platform:
- return day_count
- else:
- return None
- return None
- def publish_time_flag(self):
- """
- 判断发布时间是否过期
- :return: True or False
- """
- # 判断发布时间
- publish_time_stamp = self.item["publish_time_stamp"]
- update_time_stamp = self.item["update_time_stamp"]
- max_d = self.rule_dict.get("period", {}).get("max", 1000)
- min_d = self.rule_dict.get("period", {}).get("min", 1000)
- days = max_d if max_d > min_d else min_d
- days_time = self.feishu_time_list()
- if days_time:
- days = int(days_time)
- if self.platform == "gongzhonghao":
- if (
- int(time.time()) - publish_time_stamp
- > 3600 * 24 * days
- ) and (
- int(time.time()) - update_time_stamp
- > 3600 * 24 * days
- ):
- self.aliyun_log.logging(
- code="2004",
- trace_id=self.trace_id,
- data=self.item,
- message="发布时间超过{}天".format(days),
- )
- return False
- else:
- if days == 0:
- publish_time_stamp = int(time.time()) # 示例时间戳
- is_today = datetime.fromtimestamp(publish_time_stamp).date() == datetime.today().date()
- if not is_today:
- return False
- elif (
- int(time.time()) - publish_time_stamp
- > 3600 * 24 * days
- ):
- self.aliyun_log.logging(
- code="2004",
- trace_id=self.trace_id,
- data=self.item,
- message="发布时间超过{}天".format(days),
- )
- return False
- return True
- def title_flag(self):
- """
- 视频标题是否满足需求
- :return:
- """
- title = self.item["video_title"]
- cleaned_title = re.sub(r"[^\w]", " ", title)
- # 敏感词
- # 获取敏感词列表
- sensitive_words = []
- if any(word in cleaned_title for word in sensitive_words):
- self.aliyun_log.logging(
- code="2003",
- trace_id=self.trace_id,
- message="标题中包含敏感词",
- data=self.item,
- account=self.account
- )
- return False
- return True
- def download_rule_flag(self):
- """
- 视频基础下载规则
- :return:
- """
- for key in self.item:
- if self.rule_dict.get(key):
- max_value = (
- int(self.rule_dict[key]["max"])
- if int(self.rule_dict[key]["max"]) > 0
- else 999999999999999
- )
- if key == "peroid": # peroid是抓取周期天数
- continue
- else:
- flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value
- if not flag:
- self.aliyun_log.logging(
- code="2004",
- trace_id=self.trace_id,
- data=self.item,
- message="{}: {} <= {} <= {}, {}".format(
- key,
- self.rule_dict[key]["min"],
- self.item[key],
- max_value,
- flag,
- ),
- account=self.account
- )
- return flag
- else:
- continue
- return True
- def feishu_list(self):
- summary = FeishuUtils.get_values_batch("KsoMsyP2ghleM9tzBfmcEEXBnXg", "letS93")
- for row in summary[1:]:
- channel = row[0]
- day_count = row[1]
- if channel:
- if channel == self.platform:
- return day_count
- else:
- return None
- return None
- # 按照某个具体平台来去重
- def repeat_video(self):
- """
- 视频是否重复
- :return:
- """
- out_id = self.item["out_video_id"]
- day_count = self.feishu_list()
- if day_count:
- sql_2 = f"""select create_time from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}" AND create_time >= DATE_SUB(NOW(), INTERVAL {int(day_count)} DAY);"""
- repeat_video = self.mysql.select(sql=sql_2)
- if repeat_video:
- self.aliyun_log.logging(
- code="2002",
- trace_id=self.trace_id,
- message="重复的视频",
- data=self.item,
- account=self.account
- )
- return False
- else:
- return True
- if self.platform == "zhufuniannianshunxinjixiang" or self.platform == "weiquanshipin" or self.platform == "piaoquangushi" or self.platform == "lepaoledong" or self.platform == "zhufukuaizhuan" or self.platform == "linglingkuailezhufu" or self.platform == "lepaoledongdijie":
- return True
- if self.platform == "jierizhufuhuakaifugui" or self.platform == "yuannifuqimanman" or self.platform == "haoyunzhufuduo" or self.platform == "quzhuan" or self.platform == "zhufudewenhou" or self.platform == "jierizhufuxingfujixiang" or self.platform == "haoyoushipin" or self.platform == "xinshiquan" or self.platform == "laonianshenghuokuaile" or self.platform == "laonianquan":
- return True
- if self.platform == "zhuwanwufusunew" and self.mode == "recommend":
- return True
- if self.platform == "jixiangxingfu" and self.mode == "recommend":
- return True
- if self.platform == "yuannifuqichangzai" and self.mode == "recommend":
- return True
- if self.platform == "benshanzhufu" and self.mode == "recommend":
- return True
- if self.platform == "zuihaodesongni" and self.mode == "recommend":
- return True
- if self.platform == "tiantianjufuqi" and self.mode == "recommend":
- return True
- # 判断加上标题去重
- if self.mode == "recommend" and self.platform == "zhufuhaoyunbaofu":
- title = self.item["video_title"]
- sql = f""" select 1 from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}" and video_title="{title}"; """
- else:
- sql = f""" select 1 from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
- repeat_video = self.mysql.select(sql=sql)
- if repeat_video:
- # 喜事多多平台 4 天去重一次
- if self.platform == "xishiduoduo":
- sql_2 = f"""select create_time from crawler_video where out_video_id="{out_id}";"""
- video_time = self.mysql.select(sql=sql_2)[0][0].timestamp()
- if int(time.time()) - video_time >= 86400 * 4:
- return True
- # 小年糕推荐流和祝福圈子推荐流 3 天去重一次
- elif self.platform == "xiaoniangaotuijianliu" or self.platform == "zhufuquanzituijianliu":
- sql_2 = f"""select create_time from crawler_video where out_video_id="{out_id}";"""
- video_time = self.mysql.select(sql=sql_2)[0][0].timestamp()
- if int(time.time()) - video_time >= 86400 * 3:
- return True
- self.aliyun_log.logging(
- code="2002",
- trace_id=self.trace_id,
- message="重复的视频",
- data=self.item,
- account=self.account
- )
- return False
- return True
- # def mq_exists(self):
- # """
- # 检测 mq 是否已经发送过了
- # :return:
- # """
- # if self.red.connect():
- # index_txt = "{}-{}".format(self.platform, self.item['video_id'])
- # index_md5 = hashlib.md5(index_txt.encode()).hexdigest()
- # if self.red.select(index_md5):
- # self.aliyun_log.logging(
- # code="2007",
- # trace_id=self.trace_id,
- # message="该视频 mq 已经发送"
- # )
- # return False
- # else:
- # self.red.insert(index_md5, int(time.time()), 43200)
- # return True
- # else:
- # return True
- def process_item(self):
- """
- 全规则判断,符合规则的数据则return True
- :return:
- """
- # 判断该 mq 是否已经发了
- # if not self.mq_exists():
- # return False
- if not self.publish_time_flag():
- # 记录相关日志
- return False
- if not self.title_flag():
- # 记录相关日志
- return False
- if not self.repeat_video():
- # 记录相关日志
- return False
- if not self.download_rule_flag():
- # 记录相关日志
- return False
- return True
|