import re import sys import os import time from datetime import datetime from core.utils.feishu.feishu_utils import FeishuUtils from services.async_mysql_service import AsyncMysqlService from core.utils.log.logger_manager import LoggerManager sys.path.append(os.getcwd()) class PiaoQuanPipeline: """ 异步版爬虫管道——用于执行视频规则校验 """ def __init__(self, platform, mode, rule_dict, env, item, trace_id, account=None): self.platform = platform self.mode = mode self.item = item self.rule_dict = rule_dict self.env = env self.trace_id = trace_id self.account = account # 初始化异步MySQL客户端 self.mysql = AsyncMysqlService(platform=platform, mode=mode) self.aliyun_log = LoggerManager.get_aliyun_logger(platform=platform, mode=mode) async def feishu_time_list(self): """从飞书读取天数配置""" summary = FeishuUtils.get_values_batch("KsoMsyP2ghleM9tzBfmcEEXBnXg", "RuLK77") for row in summary[1:]: if row[0] == self.platform: return row[1] return None async def publish_time_flag(self) -> bool: """ 判断发布时间是否过期 :return: True or False """ publish_time_stamp = self.item["publish_time_stamp"] update_time_stamp = self.item["update_time_stamp"] max_d = self.rule_dict.get("period", {}).get("max", 1000) min_d = self.rule_dict.get("period", {}).get("min", 1000) days = max(max_d, min_d) days_time = await self.feishu_time_list() if days_time: days = int(days_time) now_ts = int(time.time()) if self.platform == "gongzhonghao": if now_ts - publish_time_stamp > 86400 * days and now_ts - update_time_stamp > 86400 * days: self.aliyun_log.logging( code="2004", trace_id=self.trace_id, data=self.item, message=f"发布时间超过{days}天" ) return False else: if days == 0: is_today = datetime.fromtimestamp(publish_time_stamp).date() == datetime.today().date() if not is_today: return False elif now_ts - publish_time_stamp > 86400 * days: self.aliyun_log.logging( code="2004", trace_id=self.trace_id, data=self.item, message=f"发布时间超过{days}天" ) return False return True def title_flag(self) -> bool: """ 检查标题是否包含敏感词 """ title = self.item["video_title"] cleaned_title = re.sub(r"[^\w]", " ", title) sensitive_words = [] # 这里可添加敏感词 if any(word in cleaned_title for word in sensitive_words): self.aliyun_log.logging( code="2003", trace_id=self.trace_id, message="标题中包含敏感词", data=self.item, account=self.account ) return False return True def download_rule_flag(self) -> bool: """ 检查是否符合各项下载数值规则 """ for key in self.item: if self.rule_dict.get(key): max_value = int(self.rule_dict[key].get("max", 999999999)) if key == "peroid": continue val = int(self.item.get(key, 0)) if not int(self.rule_dict[key]["min"]) <= val <= max_value: self.aliyun_log.logging( code="2004", trace_id=self.trace_id, message=f"{key}: 不符合规则", data=self.item, account=self.account ) return False return True async def feishu_list(self): """从飞书拉取天数配置,用于去重判断""" summary = FeishuUtils.get_values_batch("KsoMsyP2ghleM9tzBfmcEEXBnXg", "letS93") for row in summary[1:]: if row[0] == self.platform: return row[1] return None async def repeat_video(self) -> bool: """ 判断视频是否重复(包括飞书配置去重天数逻辑) """ out_id = self.item["out_video_id"] title = self.item["video_title"] day_count = await self.feishu_list() async with self.mysql as db: if day_count: sql = f""" SELECT create_time FROM crawler_video WHERE platform = %s AND out_video_id = %s AND create_time >= DATE_SUB(NOW(), INTERVAL %s DAY) """ rows = await db.client.fetch_all(sql, [self.platform, out_id, int(day_count)]) if rows: self.aliyun_log.logging( code="2002", trace_id=self.trace_id, message="重复的视频", data=self.item, account=self.account ) return False # 特定平台绕过去重判断 bypass = { ("zhufuniannianshunxinjixiang", "recommend"), ("benshanzhufu", "recommend"), ("tiantianjufuqi", "recommend"), } if (self.platform, self.mode) in bypass: return True # 标题去重逻辑(示例) if self.platform == "zhufuhaoyunbaofu" and self.mode == "recommend": sql = """ SELECT 1 FROM crawler_video WHERE platform = %s AND out_video_id = %s AND video_title = %s """ result = await db.client.fetch_one(sql, [self.platform, out_id, title]) else: sql = """ SELECT 1 FROM crawler_video WHERE platform = %s AND out_video_id = %s """ result = await db.client.fetch_one(sql, [self.platform, out_id]) if result: self.aliyun_log.logging( code="2002", trace_id=self.trace_id, message="重复的视频", data=self.item, account=self.account ) return False return True async def process_item(self) -> bool: """ 异步执行所有规则校验 """ if not await self.publish_time_flag(): return False if not self.title_flag(): return False if not await self.repeat_video(): return False if not self.download_rule_flag(): return False return True