pipeline.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. import os
  2. import re
  3. import sys
  4. import time
  5. from datetime import datetime, timezone
  6. from core.models.rule_models import RuleModel
  7. from core.utils.feishu_data_async import FeishuDataAsync
  8. from core.utils.log.logger_manager import LoggerManager
  9. from services.async_mysql_service import AsyncMysqlService
  10. class PiaoQuanPipeline:
  11. """
  12. 完整异步爬虫管道 - 每个校验不通过都详细记录本地日志+阿里云日志
  13. """
  14. def __init__(self, platform, mode, rule_dict, env, item, trace_id, account=None):
  15. self.platform = platform
  16. self.mode = mode
  17. self.rule_dict = rule_dict
  18. self.env = env
  19. self.item = item
  20. self.trace_id = trace_id
  21. self.account = account
  22. # 使用Pydantic模型验证规则字典
  23. # try:
  24. # self.validated_rules = RuleModel(**rule_dict)
  25. # except Exception as e:
  26. # LoggerManager.get_logger(platform=platform, mode=mode).warning(f"规则验证失败: {e}")
  27. # self.validated_rules = None
  28. self.mysql = AsyncMysqlService(platform=platform, mode=mode)
  29. self.logger = LoggerManager.get_logger(platform=platform, mode=mode)
  30. self.aliyun_log = LoggerManager.get_aliyun_logger(platform=platform, mode=mode)
  31. self.feishu_spreadsheet_token = "KsoMsyP2ghleM9tzBfmcEEXBnXg"
  32. self.test_account = [58528285, 58527674, 58528085, 58527582, 58527601, 58527612, 58528281, 58528095, 58527323,
  33. 58528071, 58527278]
  34. async def feishu_time_list(self):
  35. async with FeishuDataAsync() as feishu_data:
  36. summary = await feishu_data.get_values(
  37. spreadsheet_token=self.feishu_spreadsheet_token,
  38. sheet_id="RuLK77"
  39. )
  40. for row in summary[1:]:
  41. if row[0] == self.platform:
  42. return row[1]
  43. return None
  44. async def feishu_list(self):
  45. async with FeishuDataAsync() as feishu_data:
  46. summary = await feishu_data.get_values(
  47. spreadsheet_token=self.feishu_spreadsheet_token,
  48. sheet_id="letS93"
  49. )
  50. for row in summary[1:]:
  51. if row[0] == self.platform:
  52. return row[1]
  53. return None
  54. async def publish_time_flag(self) -> bool:
  55. publish_ts = self.item.get("publish_time_stamp")
  56. update_ts = self.item.get("update_time_stamp")
  57. max_d = self.rule_dict.get("period", {}).get("max", 1000)
  58. min_d = self.rule_dict.get("period", {}).get("min", 1000)
  59. days = max(max_d, min_d)
  60. # feishu_days = await self.feishu_time_list()
  61. # if feishu_days:
  62. # days = int(feishu_days)
  63. now_ts = int(time.time())
  64. if self.platform == "gongzhonghao":
  65. if (now_ts - publish_ts > 86400 * days) and (now_ts - update_ts > 86400 * days):
  66. msg = f"[发布时间过期] now={now_ts}, publish={publish_ts}, update={update_ts}, limit_days={days}"
  67. self.logger.warning(msg)
  68. self.aliyun_log.logging(
  69. code="2004",
  70. trace_id=self.trace_id,
  71. data={
  72. "item": self.item,
  73. "now_ts": now_ts,
  74. "publish_ts": publish_ts,
  75. "update_ts": update_ts,
  76. "days_limit": days
  77. },
  78. message=msg,
  79. account=self.account
  80. )
  81. return False
  82. else:
  83. if days == 0 or (self.platform == "xiaoniangao" and self.item["out_user_id"] in self.test_account) :
  84. # 使用UTC时间进行比较,避免时区问题
  85. is_today = datetime.fromtimestamp(publish_ts, tz=timezone.utc).date() == datetime.now(timezone.utc).date()
  86. if not is_today:
  87. msg = "[发布时间] 不在今日"
  88. self.logger.warning(msg)
  89. self.aliyun_log.logging(
  90. code="2004",
  91. trace_id=self.trace_id,
  92. data={
  93. "item": self.item,
  94. "publish_ts": publish_ts
  95. },
  96. message=msg,
  97. account=self.account
  98. )
  99. return False
  100. elif now_ts - publish_ts > 86400 * days:
  101. msg = f"[发布时间超限制] now={now_ts}, publish={publish_ts}, limit_days={days}"
  102. self.logger.warning(msg)
  103. self.aliyun_log.logging(
  104. code="2004",
  105. trace_id=self.trace_id,
  106. data={
  107. "item": self.item,
  108. "now_ts": now_ts,
  109. "publish_ts": publish_ts,
  110. "days_limit": days
  111. },
  112. message=msg,
  113. account=self.account
  114. )
  115. return False
  116. return True
  117. def title_flag(self) -> bool:
  118. """
  119. 标题敏感词过滤
  120. :return:
  121. """
  122. title = self.item.get("video_title", "")
  123. if not title:
  124. return True
  125. # 清理标题,移除空白字符
  126. cleaned_title = re.sub(r"\s+", " ", title).strip()
  127. # 异步获取敏感词列表
  128. sensitive_words = [] # 这里应该从飞书表格或其他配置源获取敏感词
  129. # 检查是否包含敏感词
  130. for word in sensitive_words:
  131. if word and word in cleaned_title:
  132. msg = f"[标题包含敏感词] {word} in {title}"
  133. self.logger.warning(msg)
  134. self.aliyun_log.logging(
  135. code="2003",
  136. trace_id=self.trace_id,
  137. data={
  138. "item": self.item,
  139. "title": title,
  140. "matched_word": word
  141. },
  142. message=msg,
  143. account=self.account
  144. )
  145. return False
  146. return True
  147. def download_rule_flag(self) -> bool:
  148. """
  149. 视频基础下载规则
  150. :return:
  151. "rule": "[{\"period\":{\"min\":15,\"max\":3}},{\"duration\":{\"min\":50,\"max\":0}},{\"share_cnt\":{\"min\":2,\"max\":0}},{\"videos_cnt\":{\"min\":300,\"max\":0}}]",
  152. """
  153. for key in self.item:
  154. if self.rule_dict.get(key):
  155. max_value = (
  156. int(self.rule_dict[key]["max"])
  157. if int(self.rule_dict[key]["max"]) > 0
  158. else 999999999999999
  159. )
  160. if key == "peroid": # peroid是抓取周期天数
  161. continue
  162. else:
  163. flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value
  164. if not flag:
  165. self.aliyun_log.logging(
  166. code="2004",
  167. trace_id=self.trace_id,
  168. data=self.item,
  169. message="{}: {} <= {} <= {}, {}".format(
  170. key,
  171. self.rule_dict[key]["min"],
  172. self.item[key],
  173. max_value,
  174. flag,
  175. ),
  176. account=self.account
  177. )
  178. return flag
  179. else:
  180. continue
  181. return True
  182. async def repeat_video(self) -> bool:
  183. out_id = self.item.get("out_video_id")
  184. title = self.item.get("video_title", "")
  185. bypass_platforms = {
  186. "zhufuniannianshunxinjixiang", "weiquanshipin", "piaoquangushi", "lepaoledong", "zhufukuaizhuan",
  187. "linglingkuailezhufu", "lepaoledongdijie", "jierizhufuhuakaifugui","yuannifuqimanman", "haoyunzhufuduo",
  188. "quzhuan", "zhufudewenhou", "jierizhufuxingfujixiang", "haoyoushipin", "xinshiquan",
  189. "laonianshenghuokuaile", "laonianquan"
  190. }
  191. if self.platform in bypass_platforms or (self.platform, self.mode) in [
  192. ("zhuwanwufusunew", "recommend"),
  193. ("jixiangxingfu", "recommend"),
  194. ("yuannifuqichangzai", "recommend"),
  195. ("benshanzhufu", "recommend"),
  196. ("zuihaodesongni", "recommend"),
  197. ("tiantianjufuqi", "recommend")
  198. ]:
  199. self.logger.info("[去重] 平台配置无需去重,直接通过")
  200. return True
  201. day_count = await self.feishu_list()
  202. if day_count:
  203. sql = """
  204. SELECT UNIX_TIMESTAMP(create_time) as ts FROM crawler_video
  205. WHERE platform = %s AND out_video_id = %s AND create_time >= DATE_SUB(NOW(), INTERVAL %s DAY)
  206. """
  207. rows = await self.mysql.fetch_all(sql, [self.platform, out_id, int(day_count)])
  208. if rows:
  209. msg = f"[去重失败] {out_id} 在 {day_count} 天内已存在"
  210. self.logger.warning(msg)
  211. self.aliyun_log.logging(
  212. code="2002",
  213. trace_id=self.trace_id,
  214. data={
  215. "item": self.item,
  216. "existing_timestamps": [r["ts"] for r in rows],
  217. "day_count": day_count
  218. },
  219. message=msg,
  220. account=self.account
  221. )
  222. return False
  223. if self.platform == "zhufuhaoyunbaofu" and self.mode == "recommend":
  224. sql = """
  225. SELECT 1 FROM crawler_video WHERE platform = %s AND out_video_id = %s AND video_title = %s
  226. """
  227. result = await self.mysql.fetch_one(sql, [self.platform, out_id, title])
  228. else:
  229. sql = """
  230. SELECT 1 FROM crawler_video WHERE platform = %s AND out_video_id = %s
  231. """
  232. result = await self.mysql.fetch_one(sql, [self.platform, out_id])
  233. if result:
  234. msg = f"[去重失败] {out_id} 已存在"
  235. self.logger.warning(msg)
  236. self.aliyun_log.logging(
  237. code="2002",
  238. trace_id=self.trace_id,
  239. data={
  240. "item": self.item,
  241. "out_video_id": out_id
  242. },
  243. message=msg,
  244. account=self.account
  245. )
  246. return False
  247. self.logger.info("[去重] 校验通过")
  248. return True
  249. async def process_item(self) -> bool:
  250. """
  251. 异步执行完整规则流程,并输出详细本地日志和云日志
  252. """
  253. self.logger.info(f"开始校验: {self.item.get('out_video_id', '')}")
  254. if not await self.publish_time_flag():
  255. self.logger.info("校验结束: 发布时间不符合")
  256. return False
  257. if not self.title_flag():
  258. self.logger.info("校验结束: 标题不符合")
  259. return False
  260. if not await self.repeat_video():
  261. self.logger.info("校验结束: 去重不符合")
  262. return False
  263. if not self.download_rule_flag():
  264. self.logger.info("校验结束: 下载规则不符合")
  265. return False
  266. self.logger.info("校验结束: 全部通过")
  267. return True