123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268 |
- import os
- import re
- import sys
- import time
- from datetime import datetime
- sys.path.append(os.getcwd())
- from core.utils.feishu_data_async import FeishuDataAsync
- from core.utils.log.logger_manager import LoggerManager
- from services.async_mysql_service import AsyncMysqlService
- class PiaoQuanPipeline:
- """
- 完整异步爬虫管道 - 每个校验不通过都详细记录本地日志+阿里云日志
- """
- def __init__(self, platform, mode, rule_dict, env, item, trace_id, account=None):
- self.platform = platform
- self.mode = mode
- self.rule_dict = rule_dict
- self.env = env
- self.item = item
- self.trace_id = trace_id
- self.account = account
- self.mysql = AsyncMysqlService(platform=platform, mode=mode)
- self.logger = LoggerManager.get_logger(platform=platform, mode=mode)
- self.aliyun_log = LoggerManager.get_aliyun_logger(platform=platform, mode=mode)
- async def feishu_time_list(self):
- async with FeishuDataAsync() as feishu_data:
- summary = await feishu_data.get_values(
- spreadsheet_token="KsoMsyP2ghleM9tzBfmcEEXBnXg",
- sheet_id="RuLK77"
- )
- for row in summary[1:]:
- if row[0] == self.platform:
- return row[1]
- return None
- async def feishu_list(self):
- async with FeishuDataAsync() as feishu_data:
- summary = await feishu_data.get_values(
- spreadsheet_token="KsoMsyP2ghleM9tzBfmcEEXBnXg",
- sheet_id="letS93"
- )
- for row in summary[1:]:
- if row[0] == self.platform:
- return row[1]
- return None
- async def publish_time_flag(self) -> bool:
- publish_ts = self.item.get("publish_time_stamp", int(time.time()))
- update_ts = self.item.get("update_time_stamp", int(time.time()))
- max_d = self.rule_dict.get("period", {}).get("max", 1000)
- min_d = self.rule_dict.get("period", {}).get("min", 1000)
- days = max(max_d, min_d)
- feishu_days = await self.feishu_time_list()
- if feishu_days:
- days = int(feishu_days)
- now_ts = int(time.time())
- if self.platform == "gongzhonghao":
- if (now_ts - publish_ts > 86400 * days) and (now_ts - update_ts > 86400 * days):
- msg = f"[发布时间过期] now={now_ts}, publish={publish_ts}, update={update_ts}, limit_days={days}"
- self.logger.warning(msg)
- self.aliyun_log.logging(
- code="2004",
- trace_id=self.trace_id,
- data={
- "item": self.item,
- "now_ts": now_ts,
- "publish_ts": publish_ts,
- "update_ts": update_ts,
- "days_limit": days
- },
- message=msg,
- account=self.account
- )
- return False
- else:
- if days == 0:
- is_today = datetime.fromtimestamp(publish_ts).date() == datetime.today().date()
- if not is_today:
- msg = "[发布时间] 不在今日"
- self.logger.warning(msg)
- self.aliyun_log.logging(
- code="2004",
- trace_id=self.trace_id,
- data={
- "item": self.item,
- "publish_ts": publish_ts
- },
- message=msg,
- account=self.account
- )
- return False
- elif now_ts - publish_ts > 86400 * days:
- msg = f"[发布时间超限制] now={now_ts}, publish={publish_ts}, limit_days={days}"
- self.logger.warning(msg)
- self.aliyun_log.logging(
- code="2004",
- trace_id=self.trace_id,
- data={
- "item": self.item,
- "now_ts": now_ts,
- "publish_ts": publish_ts,
- "days_limit": days
- },
- message=msg,
- account=self.account
- )
- return False
- return True
- def title_flag(self) -> bool:
- title = self.item.get("video_title", "")
- cleaned_title = re.sub(r"[^\w]", " ", title)
- sensitive_words = [] # 可配置敏感词列表
- for word in sensitive_words:
- if word in cleaned_title:
- msg = f"[标题包含敏感词] {word} in {title}"
- self.logger.warning(msg)
- self.aliyun_log.logging(
- code="2003",
- trace_id=self.trace_id,
- data={
- "item": self.item,
- "title": title,
- "matched_word": word
- },
- message=msg,
- account=self.account
- )
- return False
- return True
- def download_rule_flag(self) -> bool:
- """
- 视频基础下载规则
- :return:
- """
- for key in self.item:
- if self.rule_dict.get(key):
- max_value = (
- int(self.rule_dict[key]["max"])
- if int(self.rule_dict[key]["max"]) > 0
- else 999999999999999
- )
- if key == "peroid": # peroid是抓取周期天数
- continue
- else:
- flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value
- if not flag:
- self.aliyun_log.logging(
- code="2004",
- trace_id=self.trace_id,
- data=self.item,
- message="{}: {} <= {} <= {}, {}".format(
- key,
- self.rule_dict[key]["min"],
- self.item[key],
- max_value,
- flag,
- ),
- account=self.account
- )
- return flag
- async def repeat_video(self) -> bool:
- out_id = self.item.get("out_video_id")
- title = self.item.get("video_title", "")
- bypass_platforms = {
- "zhufuniannianshunxinjixiang", "weiquanshipin", "piaoquangushi", "lepaoledong", "zhufukuaizhuan",
- "linglingkuailezhufu", "lepaoledongdijie", "jierizhufuhuakaifugui","yuannifuqimanman", "haoyunzhufuduo",
- "quzhuan", "zhufudewenhou", "jierizhufuxingfujixiang", "haoyoushipin", "xinshiquan",
- "laonianshenghuokuaile", "laonianquan"
- }
- if self.platform in bypass_platforms or (self.platform, self.mode) in {
- ("zhuwanwufusunew", "recommend"),
- ("jixiangxingfu", "recommend"),
- ("yuannifuqichangzai", "recommend"),
- ("benshanzhufu", "recommend"),
- ("zuihaodesongni", "recommend"),
- ("tiantianjufuqi", "recommend")
- }:
- self.logger.info("[去重] 平台配置无需去重,直接通过")
- return True
- day_count = await self.feishu_list()
- if day_count:
- sql = """
- SELECT UNIX_TIMESTAMP(create_time) as ts FROM crawler_video
- WHERE platform = %s AND out_video_id = %s AND create_time >= DATE_SUB(NOW(), INTERVAL %s DAY)
- """
- rows = await self.mysql.fetch_all(sql, [self.platform, out_id, int(day_count)])
- if rows:
- msg = f"[去重失败] {out_id} 在 {day_count} 天内已存在"
- self.logger.warning(msg)
- self.aliyun_log.logging(
- code="2002",
- trace_id=self.trace_id,
- data={
- "item": self.item,
- "existing_timestamps": [r["ts"] for r in rows],
- "day_count": day_count
- },
- message=msg,
- account=self.account
- )
- return False
- if self.platform == "zhufuhaoyunbaofu" and self.mode == "recommend":
- sql = """
- SELECT 1 FROM crawler_video WHERE platform = %s AND out_video_id = %s AND video_title = %s
- """
- result = await self.mysql.fetch_one(sql, [self.platform, out_id, title])
- else:
- sql = """
- SELECT 1 FROM crawler_video WHERE platform = %s AND out_video_id = %s
- """
- result = await self.mysql.fetch_one(sql, [self.platform, out_id])
- if result:
- msg = f"[去重失败] {out_id} 已存在"
- self.logger.warning(msg)
- self.aliyun_log.logging(
- code="2002",
- trace_id=self.trace_id,
- data={
- "item": self.item,
- "out_video_id": out_id
- },
- message=msg,
- account=self.account
- )
- return False
- self.logger.info("[去重] 校验通过")
- return True
- async def process_item(self) -> bool:
- """
- 异步执行完整规则流程,并输出详细本地日志和云日志
- """
- self.logger.info(f"开始校验: {self.item.get('out_video_id', '')}")
- if not await self.publish_time_flag():
- self.logger.info("校验结束: 发布时间不符合")
- return False
- if not self.title_flag():
- self.logger.info("校验结束: 标题不符合")
- return False
- if not await self.repeat_video():
- self.logger.info("校验结束: 去重不符合")
- return False
- if not self.download_rule_flag():
- self.logger.info("校验结束: 下载规则不符合")
- return False
- self.logger.info("校验结束: 全部通过")
- return True
|