pipeline.py 10 KB


  1. import os
  2. import re
  3. import sys
  4. import time
  5. from datetime import datetime
  6. sys.path.append(os.getcwd())
  7. from core.utils.feishu_data_async import FeishuDataAsync
  8. from core.utils.log.logger_manager import LoggerManager
  9. from services.async_mysql_service import AsyncMysqlService
  10. class PiaoQuanPipeline:
  11. """
  12. 完整异步爬虫管道 - 每个校验不通过都详细记录本地日志+阿里云日志
  13. """
  14. def __init__(self, platform, mode, rule_dict, env, item, trace_id, account=None):
  15. self.platform = platform
  16. self.mode = mode
  17. self.rule_dict = rule_dict
  18. self.env = env
  19. self.item = item
  20. self.trace_id = trace_id
  21. self.account = account
  22. self.mysql = AsyncMysqlService(platform=platform, mode=mode)
  23. self.logger = LoggerManager.get_logger(platform=platform, mode=mode)
  24. self.aliyun_log = LoggerManager.get_aliyun_logger(platform=platform, mode=mode)
  25. async def feishu_time_list(self):
  26. async with FeishuDataAsync() as feishu_data:
  27. summary = await feishu_data.get_values(
  28. spreadsheet_token="KsoMsyP2ghleM9tzBfmcEEXBnXg",
  29. sheet_id="RuLK77"
  30. )
  31. for row in summary[1:]:
  32. if row[0] == self.platform:
  33. return row[1]
  34. return None
  35. async def feishu_list(self):
  36. async with FeishuDataAsync() as feishu_data:
  37. summary = await feishu_data.get_values(
  38. spreadsheet_token="KsoMsyP2ghleM9tzBfmcEEXBnXg",
  39. sheet_id="letS93"
  40. )
  41. for row in summary[1:]:
  42. if row[0] == self.platform:
  43. return row[1]
  44. return None
  45. async def publish_time_flag(self) -> bool:
  46. publish_ts = self.item.get("publish_time_stamp", int(time.time()))
  47. update_ts = self.item.get("update_time_stamp", int(time.time()))
  48. max_d = self.rule_dict.get("period", {}).get("max", 1000)
  49. min_d = self.rule_dict.get("period", {}).get("min", 1000)
  50. days = max(max_d, min_d)
  51. feishu_days = await self.feishu_time_list()
  52. if feishu_days:
  53. days = int(feishu_days)
  54. now_ts = int(time.time())
  55. if self.platform == "gongzhonghao":
  56. if (now_ts - publish_ts > 86400 * days) and (now_ts - update_ts > 86400 * days):
  57. msg = f"[发布时间过期] now={now_ts}, publish={publish_ts}, update={update_ts}, limit_days={days}"
  58. self.logger.warning(msg)
  59. self.aliyun_log.logging(
  60. code="2004",
  61. trace_id=self.trace_id,
  62. data={
  63. "item": self.item,
  64. "now_ts": now_ts,
  65. "publish_ts": publish_ts,
  66. "update_ts": update_ts,
  67. "days_limit": days
  68. },
  69. message=msg,
  70. account=self.account
  71. )
  72. return False
  73. else:
  74. if days == 0:
  75. is_today = datetime.fromtimestamp(publish_ts).date() == datetime.today().date()
  76. if not is_today:
  77. msg = "[发布时间] 不在今日"
  78. self.logger.warning(msg)
  79. self.aliyun_log.logging(
  80. code="2004",
  81. trace_id=self.trace_id,
  82. data={
  83. "item": self.item,
  84. "publish_ts": publish_ts
  85. },
  86. message=msg,
  87. account=self.account
  88. )
  89. return False
  90. elif now_ts - publish_ts > 86400 * days:
  91. msg = f"[发布时间超限制] now={now_ts}, publish={publish_ts}, limit_days={days}"
  92. self.logger.warning(msg)
  93. self.aliyun_log.logging(
  94. code="2004",
  95. trace_id=self.trace_id,
  96. data={
  97. "item": self.item,
  98. "now_ts": now_ts,
  99. "publish_ts": publish_ts,
  100. "days_limit": days
  101. },
  102. message=msg,
  103. account=self.account
  104. )
  105. return False
  106. return True
  107. def title_flag(self) -> bool:
  108. title = self.item.get("video_title", "")
  109. cleaned_title = re.sub(r"[^\w]", " ", title)
  110. sensitive_words = [] # 可配置敏感词列表
  111. for word in sensitive_words:
  112. if word in cleaned_title:
  113. msg = f"[标题包含敏感词] {word} in {title}"
  114. self.logger.warning(msg)
  115. self.aliyun_log.logging(
  116. code="2003",
  117. trace_id=self.trace_id,
  118. data={
  119. "item": self.item,
  120. "title": title,
  121. "matched_word": word
  122. },
  123. message=msg,
  124. account=self.account
  125. )
  126. return False
  127. return True
  128. def download_rule_flag(self) -> bool:
  129. """
  130. 视频基础下载规则
  131. :return:
  132. """
  133. for key in self.item:
  134. if self.rule_dict.get(key):
  135. max_value = (
  136. int(self.rule_dict[key]["max"])
  137. if int(self.rule_dict[key]["max"]) > 0
  138. else 999999999999999
  139. )
  140. if key == "peroid": # peroid是抓取周期天数
  141. continue
  142. else:
  143. flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value
  144. if not flag:
  145. self.aliyun_log.logging(
  146. code="2004",
  147. trace_id=self.trace_id,
  148. data=self.item,
  149. message="{}: {} <= {} <= {}, {}".format(
  150. key,
  151. self.rule_dict[key]["min"],
  152. self.item[key],
  153. max_value,
  154. flag,
  155. ),
  156. account=self.account
  157. )
  158. return flag
  159. async def repeat_video(self) -> bool:
  160. out_id = self.item.get("out_video_id")
  161. title = self.item.get("video_title", "")
  162. bypass_platforms = {
  163. "zhufuniannianshunxinjixiang", "weiquanshipin", "piaoquangushi", "lepaoledong", "zhufukuaizhuan",
  164. "linglingkuailezhufu", "lepaoledongdijie", "jierizhufuhuakaifugui","yuannifuqimanman", "haoyunzhufuduo",
  165. "quzhuan", "zhufudewenhou", "jierizhufuxingfujixiang", "haoyoushipin", "xinshiquan",
  166. "laonianshenghuokuaile", "laonianquan"
  167. }
  168. if self.platform in bypass_platforms or (self.platform, self.mode) in {
  169. ("zhuwanwufusunew", "recommend"),
  170. ("jixiangxingfu", "recommend"),
  171. ("yuannifuqichangzai", "recommend"),
  172. ("benshanzhufu", "recommend"),
  173. ("zuihaodesongni", "recommend"),
  174. ("tiantianjufuqi", "recommend")
  175. }:
  176. self.logger.info("[去重] 平台配置无需去重,直接通过")
  177. return True
  178. day_count = await self.feishu_list()
  179. if day_count:
  180. sql = """
  181. SELECT UNIX_TIMESTAMP(create_time) as ts FROM crawler_video
  182. WHERE platform = %s AND out_video_id = %s AND create_time >= DATE_SUB(NOW(), INTERVAL %s DAY)
  183. """
  184. rows = await self.mysql.fetch_all(sql, [self.platform, out_id, int(day_count)])
  185. if rows:
  186. msg = f"[去重失败] {out_id} 在 {day_count} 天内已存在"
  187. self.logger.warning(msg)
  188. self.aliyun_log.logging(
  189. code="2002",
  190. trace_id=self.trace_id,
  191. data={
  192. "item": self.item,
  193. "existing_timestamps": [r["ts"] for r in rows],
  194. "day_count": day_count
  195. },
  196. message=msg,
  197. account=self.account
  198. )
  199. return False
  200. if self.platform == "zhufuhaoyunbaofu" and self.mode == "recommend":
  201. sql = """
  202. SELECT 1 FROM crawler_video WHERE platform = %s AND out_video_id = %s AND video_title = %s
  203. """
  204. result = await self.mysql.fetch_one(sql, [self.platform, out_id, title])
  205. else:
  206. sql = """
  207. SELECT 1 FROM crawler_video WHERE platform = %s AND out_video_id = %s
  208. """
  209. result = await self.mysql.fetch_one(sql, [self.platform, out_id])
  210. if result:
  211. msg = f"[去重失败] {out_id} 已存在"
  212. self.logger.warning(msg)
  213. self.aliyun_log.logging(
  214. code="2002",
  215. trace_id=self.trace_id,
  216. data={
  217. "item": self.item,
  218. "out_video_id": out_id
  219. },
  220. message=msg,
  221. account=self.account
  222. )
  223. return False
  224. self.logger.info("[去重] 校验通过")
  225. return True
  226. async def process_item(self) -> bool:
  227. """
  228. 异步执行完整规则流程,并输出详细本地日志和云日志
  229. """
  230. self.logger.info(f"开始校验: {self.item.get('out_video_id', '')}")
  231. if not await self.publish_time_flag():
  232. self.logger.info("校验结束: 发布时间不符合")
  233. return False
  234. if not self.title_flag():
  235. self.logger.info("校验结束: 标题不符合")
  236. return False
  237. if not await self.repeat_video():
  238. self.logger.info("校验结束: 去重不符合")
  239. return False
  240. if not self.download_rule_flag():
  241. self.logger.info("校验结束: 下载规则不符合")
  242. return False
  243. self.logger.info("校验结束: 全部通过")
  244. return True