123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201 |
- import re
- import sys
- import os
- import time
- from datetime import datetime
- from core.utils.feishu.feishu_utils import FeishuUtils
- from services.async_mysql_service import AsyncMysqlService
- from core.utils.log.logger_manager import LoggerManager
- sys.path.append(os.getcwd())
- class PiaoQuanPipeline:
- """
- 异步版爬虫管道——用于执行视频规则校验
- """
- def __init__(self, platform, mode, rule_dict, env, item, trace_id, account=None):
- self.platform = platform
- self.mode = mode
- self.item = item
- self.rule_dict = rule_dict
- self.env = env
- self.trace_id = trace_id
- self.account = account
- # 初始化异步MySQL客户端
- self.mysql = AsyncMysqlService(platform=platform, mode=mode)
- self.aliyun_log = LoggerManager.get_aliyun_logger(platform=platform, mode=mode)
- async def feishu_time_list(self):
- """从飞书读取天数配置"""
- summary = FeishuUtils.get_values_batch("KsoMsyP2ghleM9tzBfmcEEXBnXg", "RuLK77")
- for row in summary[1:]:
- if row[0] == self.platform:
- return row[1]
- return None
- async def publish_time_flag(self) -> bool:
- """
- 判断发布时间是否过期
- :return: True or False
- """
- publish_time_stamp = self.item["publish_time_stamp"]
- update_time_stamp = self.item["update_time_stamp"]
- max_d = self.rule_dict.get("period", {}).get("max", 1000)
- min_d = self.rule_dict.get("period", {}).get("min", 1000)
- days = max(max_d, min_d)
- days_time = await self.feishu_time_list()
- if days_time:
- days = int(days_time)
- now_ts = int(time.time())
- if self.platform == "gongzhonghao":
- if now_ts - publish_time_stamp > 86400 * days and now_ts - update_time_stamp > 86400 * days:
- self.aliyun_log.logging(
- code="2004",
- trace_id=self.trace_id,
- data=self.item,
- message=f"发布时间超过{days}天"
- )
- return False
- else:
- if days == 0:
- is_today = datetime.fromtimestamp(publish_time_stamp).date() == datetime.today().date()
- if not is_today:
- return False
- elif now_ts - publish_time_stamp > 86400 * days:
- self.aliyun_log.logging(
- code="2004",
- trace_id=self.trace_id,
- data=self.item,
- message=f"发布时间超过{days}天"
- )
- return False
- return True
- def title_flag(self) -> bool:
- """
- 检查标题是否包含敏感词
- """
- title = self.item["video_title"]
- cleaned_title = re.sub(r"[^\w]", " ", title)
- sensitive_words = [] # 这里可添加敏感词
- if any(word in cleaned_title for word in sensitive_words):
- self.aliyun_log.logging(
- code="2003",
- trace_id=self.trace_id,
- message="标题中包含敏感词",
- data=self.item,
- account=self.account
- )
- return False
- return True
- def download_rule_flag(self) -> bool:
- """
- 检查是否符合各项下载数值规则
- """
- for key in self.item:
- if self.rule_dict.get(key):
- max_value = int(self.rule_dict[key].get("max", 999999999))
- if key == "peroid":
- continue
- val = int(self.item.get(key, 0))
- if not int(self.rule_dict[key]["min"]) <= val <= max_value:
- self.aliyun_log.logging(
- code="2004",
- trace_id=self.trace_id,
- message=f"{key}: 不符合规则",
- data=self.item,
- account=self.account
- )
- return False
- return True
- async def feishu_list(self):
- """从飞书拉取天数配置,用于去重判断"""
- summary = FeishuUtils.get_values_batch("KsoMsyP2ghleM9tzBfmcEEXBnXg", "letS93")
- for row in summary[1:]:
- if row[0] == self.platform:
- return row[1]
- return None
- async def repeat_video(self) -> bool:
- """
- 判断视频是否重复(包括飞书配置去重天数逻辑)
- """
- out_id = self.item["out_video_id"]
- title = self.item["video_title"]
- day_count = await self.feishu_list()
- async with self.mysql as db:
- if day_count:
- sql = f"""
- SELECT create_time
- FROM crawler_video
- WHERE platform = %s AND out_video_id = %s
- AND create_time >= DATE_SUB(NOW(), INTERVAL %s DAY)
- """
- rows = await db.client.fetch_all(sql, [self.platform, out_id, int(day_count)])
- if rows:
- self.aliyun_log.logging(
- code="2002",
- trace_id=self.trace_id,
- message="重复的视频",
- data=self.item,
- account=self.account
- )
- return False
- # 特定平台绕过去重判断
- bypass = {
- ("zhufuniannianshunxinjixiang", "recommend"),
- ("benshanzhufu", "recommend"),
- ("tiantianjufuqi", "recommend"),
- }
- if (self.platform, self.mode) in bypass:
- return True
- # 标题去重逻辑(示例)
- if self.platform == "zhufuhaoyunbaofu" and self.mode == "recommend":
- sql = """
- SELECT 1 FROM crawler_video
- WHERE platform = %s AND out_video_id = %s AND video_title = %s
- """
- result = await db.client.fetch_one(sql, [self.platform, out_id, title])
- else:
- sql = """
- SELECT 1 FROM crawler_video
- WHERE platform = %s AND out_video_id = %s
- """
- result = await db.client.fetch_one(sql, [self.platform, out_id])
- if result:
- self.aliyun_log.logging(
- code="2002",
- trace_id=self.trace_id,
- message="重复的视频",
- data=self.item,
- account=self.account
- )
- return False
- return True
- async def process_item(self) -> bool:
- """
- 异步执行所有规则校验
- """
- if not await self.publish_time_flag():
- return False
- if not self.title_flag():
- return False
- if not await self.repeat_video():
- return False
- if not self.download_rule_flag():
- return False
- return True
|