pipeline.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. import os
  2. import re
  3. import sys
  4. import time
  5. from datetime import datetime
  6. sys.path.append(os.getcwd())
  7. from core.utils.feishu_data_async import FeishuDataAsync
  8. from core.utils.log.logger_manager import LoggerManager
  9. from services.async_mysql_service import AsyncMysqlService
  10. class PiaoQuanPipeline:
  11. """
  12. 完整异步爬虫管道 - 每个校验不通过都详细记录本地日志+阿里云日志
  13. """
  14. def __init__(self, platform, mode, rule_dict, env, item, trace_id, account=None):
  15. self.platform = platform
  16. self.mode = mode
  17. self.rule_dict = rule_dict
  18. self.env = env
  19. self.item = item
  20. self.trace_id = trace_id
  21. self.account = account
  22. self.mysql = AsyncMysqlService(platform=platform, mode=mode)
  23. self.logger = LoggerManager.get_logger(platform=platform, mode=mode)
  24. self.aliyun_log = LoggerManager.get_aliyun_logger(platform=platform, mode=mode)
  25. self.feishu_spreadsheet_token = "KsoMsyP2ghleM9tzBfmcEEXBnXg"
  26. async def feishu_time_list(self):
  27. async with FeishuDataAsync() as feishu_data:
  28. summary = await feishu_data.get_values(
  29. spreadsheet_token=self.feishu_spreadsheet_token,
  30. sheet_id="RuLK77"
  31. )
  32. for row in summary[1:]:
  33. if row[0] == self.platform:
  34. return row[1]
  35. return None
  36. async def feishu_list(self):
  37. async with FeishuDataAsync() as feishu_data:
  38. summary = await feishu_data.get_values(
  39. spreadsheet_token=self.feishu_spreadsheet_token,
  40. sheet_id="letS93"
  41. )
  42. for row in summary[1:]:
  43. if row[0] == self.platform:
  44. return row[1]
  45. return None
  46. async def title_restricted_words(self):
  47. async with FeishuDataAsync() as feishu_data:
  48. summary = await feishu_data.get_values(
  49. spreadsheet_token=self.feishu_spreadsheet_token,
  50. sheet_id="BS9uyu"
  51. )
  52. for row in summary[1:]:
  53. if row[0] == self.platform:
  54. return row[1]
  55. return None
  56. async def publish_time_flag(self) -> bool:
  57. publish_ts = self.item.get("publish_time_stamp", int(time.time()))
  58. update_ts = self.item.get("update_time_stamp", int(time.time()))
  59. max_d = self.rule_dict.get("period", {}).get("max", 1000)
  60. min_d = self.rule_dict.get("period", {}).get("min", 1000)
  61. days = max(max_d, min_d)
  62. feishu_days = await self.feishu_time_list()
  63. if feishu_days:
  64. days = int(feishu_days)
  65. now_ts = int(time.time())
  66. if self.platform == "gongzhonghao":
  67. if (now_ts - publish_ts > 86400 * days) and (now_ts - update_ts > 86400 * days):
  68. msg = f"[发布时间过期] now={now_ts}, publish={publish_ts}, update={update_ts}, limit_days={days}"
  69. self.logger.warning(msg)
  70. self.aliyun_log.logging(
  71. code="2004",
  72. trace_id=self.trace_id,
  73. data={
  74. "item": self.item,
  75. "now_ts": now_ts,
  76. "publish_ts": publish_ts,
  77. "update_ts": update_ts,
  78. "days_limit": days
  79. },
  80. message=msg,
  81. account=self.account
  82. )
  83. return False
  84. else:
  85. if days == 0:
  86. is_today = datetime.fromtimestamp(publish_ts).date() == datetime.today().date()
  87. if not is_today:
  88. msg = "[发布时间] 不在今日"
  89. self.logger.warning(msg)
  90. self.aliyun_log.logging(
  91. code="2004",
  92. trace_id=self.trace_id,
  93. data={
  94. "item": self.item,
  95. "publish_ts": publish_ts
  96. },
  97. message=msg,
  98. account=self.account
  99. )
  100. return False
  101. elif now_ts - publish_ts > 86400 * days:
  102. msg = f"[发布时间超限制] now={now_ts}, publish={publish_ts}, limit_days={days}"
  103. self.logger.warning(msg)
  104. self.aliyun_log.logging(
  105. code="2004",
  106. trace_id=self.trace_id,
  107. data={
  108. "item": self.item,
  109. "now_ts": now_ts,
  110. "publish_ts": publish_ts,
  111. "days_limit": days
  112. },
  113. message=msg,
  114. account=self.account
  115. )
  116. return False
  117. return True
  118. def title_flag(self) -> bool:
  119. title = self.item.get("video_title", "")
  120. cleaned_title = re.sub(r"[^\w]", " ", title)
  121. sensitive_words = [] # 可配置敏感词列表
  122. for word in sensitive_words:
  123. if word in cleaned_title:
  124. msg = f"[标题包含敏感词] {word} in {title}"
  125. self.logger.warning(msg)
  126. self.aliyun_log.logging(
  127. code="2003",
  128. trace_id=self.trace_id,
  129. data={
  130. "item": self.item,
  131. "title": title,
  132. "matched_word": word
  133. },
  134. message=msg,
  135. account=self.account
  136. )
  137. return False
  138. return True
  139. def download_rule_flag(self) -> bool:
  140. """
  141. 视频基础下载规则
  142. :return:
  143. "rule": "[{\"period\":{\"min\":15,\"max\":3}},{\"duration\":{\"min\":50,\"max\":0}},{\"share_cnt\":{\"min\":2,\"max\":0}},{\"videos_cnt\":{\"min\":300,\"max\":0}}]",
  144. """
  145. for key in self.item:
  146. if self.rule_dict.get(key):
  147. max_value = (
  148. int(self.rule_dict[key]["max"])
  149. if int(self.rule_dict[key]["max"]) > 0
  150. else 999999999999999
  151. )
  152. if key == "peroid": # peroid是抓取周期天数
  153. continue
  154. else:
  155. flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value
  156. if not flag:
  157. self.aliyun_log.logging(
  158. code="2004",
  159. trace_id=self.trace_id,
  160. data=self.item,
  161. message="{}: {} <= {} <= {}, {}".format(
  162. key,
  163. self.rule_dict[key]["min"],
  164. self.item[key],
  165. max_value,
  166. flag,
  167. ),
  168. account=self.account
  169. )
  170. return flag
  171. else:
  172. continue
  173. return True
  174. async def repeat_video(self) -> bool:
  175. out_id = self.item.get("out_video_id")
  176. title = self.item.get("video_title", "")
  177. bypass_platforms = {
  178. "zhufuniannianshunxinjixiang", "weiquanshipin", "piaoquangushi", "lepaoledong", "zhufukuaizhuan",
  179. "linglingkuailezhufu", "lepaoledongdijie", "jierizhufuhuakaifugui","yuannifuqimanman", "haoyunzhufuduo",
  180. "quzhuan", "zhufudewenhou", "jierizhufuxingfujixiang", "haoyoushipin", "xinshiquan",
  181. "laonianshenghuokuaile", "laonianquan"
  182. }
  183. if self.platform in bypass_platforms or (self.platform, self.mode) in {
  184. ("zhuwanwufusunew", "recommend"),
  185. ("jixiangxingfu", "recommend"),
  186. ("yuannifuqichangzai", "recommend"),
  187. ("benshanzhufu", "recommend"),
  188. ("zuihaodesongni", "recommend"),
  189. ("tiantianjufuqi", "recommend")
  190. }:
  191. self.logger.info("[去重] 平台配置无需去重,直接通过")
  192. return True
  193. day_count = await self.feishu_list()
  194. if day_count:
  195. sql = """
  196. SELECT UNIX_TIMESTAMP(create_time) as ts FROM crawler_video
  197. WHERE platform = %s AND out_video_id = %s AND create_time >= DATE_SUB(NOW(), INTERVAL %s DAY)
  198. """
  199. rows = await self.mysql.fetch_all(sql, [self.platform, out_id, int(day_count)])
  200. if rows:
  201. msg = f"[去重失败] {out_id} 在 {day_count} 天内已存在"
  202. self.logger.warning(msg)
  203. self.aliyun_log.logging(
  204. code="2002",
  205. trace_id=self.trace_id,
  206. data={
  207. "item": self.item,
  208. "existing_timestamps": [r["ts"] for r in rows],
  209. "day_count": day_count
  210. },
  211. message=msg,
  212. account=self.account
  213. )
  214. return False
  215. if self.platform == "zhufuhaoyunbaofu" and self.mode == "recommend":
  216. sql = """
  217. SELECT 1 FROM crawler_video WHERE platform = %s AND out_video_id = %s AND video_title = %s
  218. """
  219. result = await self.mysql.fetch_one(sql, [self.platform, out_id, title])
  220. else:
  221. sql = """
  222. SELECT 1 FROM crawler_video WHERE platform = %s AND out_video_id = %s
  223. """
  224. result = await self.mysql.fetch_one(sql, [self.platform, out_id])
  225. if result:
  226. msg = f"[去重失败] {out_id} 已存在"
  227. self.logger.warning(msg)
  228. self.aliyun_log.logging(
  229. code="2002",
  230. trace_id=self.trace_id,
  231. data={
  232. "item": self.item,
  233. "out_video_id": out_id
  234. },
  235. message=msg,
  236. account=self.account
  237. )
  238. return False
  239. self.logger.info("[去重] 校验通过")
  240. return True
  241. async def process_item(self) -> bool:
  242. """
  243. 异步执行完整规则流程,并输出详细本地日志和云日志
  244. """
  245. self.logger.info(f"开始校验: {self.item.get('out_video_id', '')}")
  246. if not await self.publish_time_flag():
  247. self.logger.info("校验结束: 发布时间不符合")
  248. return False
  249. if not self.title_flag():
  250. self.logger.info("校验结束: 标题不符合")
  251. return False
  252. if not await self.repeat_video():
  253. self.logger.info("校验结束: 去重不符合")
  254. return False
  255. if not self.download_rule_flag():
  256. self.logger.info("校验结束: 下载规则不符合")
  257. return False
  258. self.logger.info("校验结束: 全部通过")
  259. return True