import os import re import sys import time from datetime import datetime, timezone from core.models.rule_models import RuleModel from core.utils.feishu_data_async import FeishuDataAsync from core.utils.log.logger_manager import LoggerManager from services.async_mysql_service import AsyncMysqlService class PiaoQuanPipeline: """ 完整异步爬虫管道 - 每个校验不通过都详细记录本地日志+阿里云日志 """ def __init__(self, platform, mode, rule_dict, env, item, trace_id, account=None): self.platform = platform self.mode = mode self.rule_dict = rule_dict self.env = env self.item = item self.trace_id = trace_id self.account = account # 使用Pydantic模型验证规则字典 try: self.validated_rules = RuleModel(**rule_dict) except Exception as e: LoggerManager.get_logger(platform=platform, mode=mode).warning(f"规则验证失败: {e}") self.validated_rules = None self.mysql = AsyncMysqlService(platform=platform, mode=mode) self.logger = LoggerManager.get_logger(platform=platform, mode=mode) self.aliyun_log = LoggerManager.get_aliyun_logger(platform=platform, mode=mode) self.feishu_spreadsheet_token = "KsoMsyP2ghleM9tzBfmcEEXBnXg" self.test_account = [58528285, 58527674, 58528085, 58527582, 58527601, 58527612, 58528281, 58528095, 58527323, 58528071, 58527278] async def feishu_time_list(self): async with FeishuDataAsync() as feishu_data: summary = await feishu_data.get_values( spreadsheet_token=self.feishu_spreadsheet_token, sheet_id="RuLK77" ) for row in summary[1:]: if row[0] == self.platform: return row[1] return None async def feishu_list(self): async with FeishuDataAsync() as feishu_data: summary = await feishu_data.get_values( spreadsheet_token=self.feishu_spreadsheet_token, sheet_id="letS93" ) for row in summary[1:]: if row[0] == self.platform: return row[1] return None async def publish_time_flag(self) -> bool: publish_ts = self.item.get("publish_time_stamp") update_ts = self.item.get("update_time_stamp") max_d = self.rule_dict.get("period", {}).get("max", 1000) min_d = self.rule_dict.get("period", {}).get("min", 1000) days = max(max_d, min_d) # feishu_days = await self.feishu_time_list() # if feishu_days: # days = int(feishu_days) now_ts = int(time.time()) if self.platform == "gongzhonghao": if (now_ts - publish_ts > 86400 * days) and (now_ts - update_ts > 86400 * days): msg = f"[发布时间过期] now={now_ts}, publish={publish_ts}, update={update_ts}, limit_days={days}" self.logger.warning(msg) self.aliyun_log.logging( code="2004", trace_id=self.trace_id, data={ "item": self.item, "now_ts": now_ts, "publish_ts": publish_ts, "update_ts": update_ts, "days_limit": days }, message=msg, account=self.account ) return False else: if days == 0 or (self.platform == "xiaoniangao" and self.item["out_user_id"] in self.test_account) : # 使用UTC时间进行比较,避免时区问题 is_today = datetime.fromtimestamp(publish_ts, tz=timezone.utc).date() == datetime.now(timezone.utc).date() if not is_today: msg = "[发布时间] 不在今日" self.logger.warning(msg) self.aliyun_log.logging( code="2004", trace_id=self.trace_id, data={ "item": self.item, "publish_ts": publish_ts }, message=msg, account=self.account ) return False elif now_ts - publish_ts > 86400 * days: msg = f"[发布时间超限制] now={now_ts}, publish={publish_ts}, limit_days={days}" self.logger.warning(msg) self.aliyun_log.logging( code="2004", trace_id=self.trace_id, data={ "item": self.item, "now_ts": now_ts, "publish_ts": publish_ts, "days_limit": days }, message=msg, account=self.account ) return False return True def title_flag(self) -> bool: """ 标题敏感词过滤 :return: """ title = self.item.get("video_title", "") if not title: return True # 清理标题,移除空白字符 cleaned_title = re.sub(r"\s+", " ", title).strip() # 异步获取敏感词列表 sensitive_words = [] # 这里应该从飞书表格或其他配置源获取敏感词 # 检查是否包含敏感词 for word in sensitive_words: if word and word in cleaned_title: msg = f"[标题包含敏感词] {word} in {title}" self.logger.warning(msg) self.aliyun_log.logging( code="2003", trace_id=self.trace_id, data={ "item": self.item, "title": title, "matched_word": word }, message=msg, account=self.account ) return False return True def download_rule_flag(self) -> bool: """ 视频基础下载规则 :return: "rule": "[{\"period\":{\"min\":15,\"max\":3}},{\"duration\":{\"min\":50,\"max\":0}},{\"share_cnt\":{\"min\":2,\"max\":0}},{\"videos_cnt\":{\"min\":300,\"max\":0}}]", """ for key in self.item: if self.rule_dict.get(key): max_value = ( int(self.rule_dict[key]["max"]) if int(self.rule_dict[key]["max"]) > 0 else 999999999999999 ) if key == "peroid": # peroid是抓取周期天数 continue else: flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value if not flag: self.aliyun_log.logging( code="2004", trace_id=self.trace_id, data=self.item, message="{}: {} <= {} <= {}, {}".format( key, self.rule_dict[key]["min"], self.item[key], max_value, flag, ), account=self.account ) return flag else: continue return True async def repeat_video(self) -> bool: out_id = self.item.get("out_video_id") title = self.item.get("video_title", "") bypass_platforms = { "zhufuniannianshunxinjixiang", "weiquanshipin", "piaoquangushi", "lepaoledong", "zhufukuaizhuan", "linglingkuailezhufu", "lepaoledongdijie", "jierizhufuhuakaifugui","yuannifuqimanman", "haoyunzhufuduo", "quzhuan", "zhufudewenhou", "jierizhufuxingfujixiang", "haoyoushipin", "xinshiquan", "laonianshenghuokuaile", "laonianquan" } if self.platform in bypass_platforms or (self.platform, self.mode) in [ ("zhuwanwufusunew", "recommend"), ("jixiangxingfu", "recommend"), ("yuannifuqichangzai", "recommend"), ("benshanzhufu", "recommend"), ("zuihaodesongni", "recommend"), ("tiantianjufuqi", "recommend") ]: self.logger.info("[去重] 平台配置无需去重,直接通过") return True day_count = await self.feishu_list() if day_count: sql = """ SELECT UNIX_TIMESTAMP(create_time) as ts FROM crawler_video WHERE platform = %s AND out_video_id = %s AND create_time >= DATE_SUB(NOW(), INTERVAL %s DAY) """ rows = await self.mysql.fetch_all(sql, [self.platform, out_id, int(day_count)]) if rows: msg = f"[去重失败] {out_id} 在 {day_count} 天内已存在" self.logger.warning(msg) self.aliyun_log.logging( code="2002", trace_id=self.trace_id, data={ "item": self.item, "existing_timestamps": [r["ts"] for r in rows], "day_count": day_count }, message=msg, account=self.account ) return False if self.platform == "zhufuhaoyunbaofu" and self.mode == "recommend": sql = """ SELECT 1 FROM crawler_video WHERE platform = %s AND out_video_id = %s AND video_title = %s """ result = await self.mysql.fetch_one(sql, [self.platform, out_id, title]) else: sql = """ SELECT 1 FROM crawler_video WHERE platform = %s AND out_video_id = %s """ result = await self.mysql.fetch_one(sql, [self.platform, out_id]) if result: msg = f"[去重失败] {out_id} 已存在" self.logger.warning(msg) self.aliyun_log.logging( code="2002", trace_id=self.trace_id, data={ "item": self.item, "out_video_id": out_id }, message=msg, account=self.account ) return False self.logger.info("[去重] 校验通过") return True async def process_item(self) -> bool: """ 异步执行完整规则流程,并输出详细本地日志和云日志 """ self.logger.info(f"开始校验: {self.item.get('out_video_id', '')}") if not await self.publish_time_flag(): self.logger.info("校验结束: 发布时间不符合") return False if not self.title_flag(): self.logger.info("校验结束: 标题不符合") return False if not await self.repeat_video(): self.logger.info("校验结束: 去重不符合") return False if not self.download_rule_flag(): self.logger.info("校验结束: 下载规则不符合") return False self.logger.info("校验结束: 全部通过") return True