pipeline.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. import os
  2. import re
  3. import sys
  4. import time
  5. from datetime import datetime
  6. sys.path.append(os.getcwd())
  7. from core.utils.feishu_data_async import FeishuDataAsync
  8. from core.utils.log.logger_manager import LoggerManager
  9. from services.async_mysql_service import AsyncMysqlService
  10. class PiaoQuanPipeline:
  11. """
  12. 完整异步爬虫管道 - 每个校验不通过都详细记录本地日志+阿里云日志
  13. """
  14. def __init__(self, platform, mode, rule_dict, env, item, trace_id, account=None):
  15. self.platform = platform
  16. self.mode = mode
  17. self.rule_dict = rule_dict
  18. self.env = env
  19. self.item = item
  20. self.trace_id = trace_id
  21. self.account = account
  22. self.mysql = AsyncMysqlService(platform=platform, mode=mode)
  23. self.logger = LoggerManager.get_logger(platform=platform, mode=mode)
  24. self.aliyun_log = LoggerManager.get_aliyun_logger(platform=platform, mode=mode)
  25. async def feishu_time_list(self):
  26. async with FeishuDataAsync() as feishu_data:
  27. summary = await feishu_data.get_values(
  28. spreadsheet_token="KsoMsyP2ghleM9tzBfmcEEXBnXg",
  29. sheet_id="RuLK77"
  30. )
  31. for row in summary[1:]:
  32. if row[0] == self.platform:
  33. return row[1]
  34. return None
  35. async def feishu_list(self):
  36. async with FeishuDataAsync() as feishu_data:
  37. summary = await feishu_data.get_values(
  38. spreadsheet_token="KsoMsyP2ghleM9tzBfmcEEXBnXg",
  39. sheet_id="letS93"
  40. )
  41. for row in summary[1:]:
  42. if row[0] == self.platform:
  43. return row[1]
  44. return None
  45. async def publish_time_flag(self) -> bool:
  46. publish_ts = self.item.get("publish_time_stamp", int(time.time()))
  47. update_ts = self.item.get("update_time_stamp", int(time.time()))
  48. max_d = self.rule_dict.get("period", {}).get("max", 1000)
  49. min_d = self.rule_dict.get("period", {}).get("min", 1000)
  50. days = max(max_d, min_d)
  51. feishu_days = await self.feishu_time_list()
  52. if feishu_days:
  53. days = int(feishu_days)
  54. now_ts = int(time.time())
  55. if self.platform == "gongzhonghao":
  56. if (now_ts - publish_ts > 86400 * days) and (now_ts - update_ts > 86400 * days):
  57. msg = f"[发布时间过期] now={now_ts}, publish={publish_ts}, update={update_ts}, limit_days={days}"
  58. self.logger.warning(msg)
  59. self.aliyun_log.logging(
  60. code="2004",
  61. trace_id=self.trace_id,
  62. data={
  63. "item": self.item,
  64. "now_ts": now_ts,
  65. "publish_ts": publish_ts,
  66. "update_ts": update_ts,
  67. "days_limit": days
  68. },
  69. message=msg,
  70. account=self.account
  71. )
  72. return False
  73. else:
  74. if days == 0:
  75. is_today = datetime.fromtimestamp(publish_ts).date() == datetime.today().date()
  76. if not is_today:
  77. msg = "[发布时间] 不在今日"
  78. self.logger.warning(msg)
  79. self.aliyun_log.logging(
  80. code="2004",
  81. trace_id=self.trace_id,
  82. data={
  83. "item": self.item,
  84. "publish_ts": publish_ts
  85. },
  86. message=msg,
  87. account=self.account
  88. )
  89. return False
  90. elif now_ts - publish_ts > 86400 * days:
  91. msg = f"[发布时间超限制] now={now_ts}, publish={publish_ts}, limit_days={days}"
  92. self.logger.warning(msg)
  93. self.aliyun_log.logging(
  94. code="2004",
  95. trace_id=self.trace_id,
  96. data={
  97. "item": self.item,
  98. "now_ts": now_ts,
  99. "publish_ts": publish_ts,
  100. "days_limit": days
  101. },
  102. message=msg,
  103. account=self.account
  104. )
  105. return False
  106. return True
  107. def title_flag(self) -> bool:
  108. title = self.item.get("video_title", "")
  109. cleaned_title = re.sub(r"[^\w]", " ", title)
  110. sensitive_words = [] # 可配置敏感词列表
  111. for word in sensitive_words:
  112. if word in cleaned_title:
  113. msg = f"[标题包含敏感词] {word} in {title}"
  114. self.logger.warning(msg)
  115. self.aliyun_log.logging(
  116. code="2003",
  117. trace_id=self.trace_id,
  118. data={
  119. "item": self.item,
  120. "title": title,
  121. "matched_word": word
  122. },
  123. message=msg,
  124. account=self.account
  125. )
  126. return False
  127. return True
  128. def download_rule_flag(self) -> bool:
  129. """
  130. 视频基础下载规则
  131. :return:
  132. """
  133. for key in self.item:
  134. if self.rule_dict.get(key):
  135. max_value = (
  136. int(self.rule_dict[key]["max"])
  137. if int(self.rule_dict[key]["max"]) > 0
  138. else 999999999999999
  139. )
  140. if key == "peroid": # peroid是抓取周期天数
  141. continue
  142. else:
  143. flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value
  144. if not flag:
  145. self.aliyun_log.logging(
  146. code="2004",
  147. trace_id=self.trace_id,
  148. data=self.item,
  149. message="{}: {} <= {} <= {}, {}".format(
  150. key,
  151. self.rule_dict[key]["min"],
  152. self.item[key],
  153. max_value,
  154. flag,
  155. ),
  156. account=self.account
  157. )
  158. return flag
  159. else:
  160. continue
  161. return True
  162. async def repeat_video(self) -> bool:
  163. out_id = self.item.get("out_video_id")
  164. title = self.item.get("video_title", "")
  165. bypass_platforms = {
  166. "zhufuniannianshunxinjixiang", "weiquanshipin", "piaoquangushi", "lepaoledong", "zhufukuaizhuan",
  167. "linglingkuailezhufu", "lepaoledongdijie", "jierizhufuhuakaifugui","yuannifuqimanman", "haoyunzhufuduo",
  168. "quzhuan", "zhufudewenhou", "jierizhufuxingfujixiang", "haoyoushipin", "xinshiquan",
  169. "laonianshenghuokuaile", "laonianquan"
  170. }
  171. if self.platform in bypass_platforms or (self.platform, self.mode) in {
  172. ("zhuwanwufusunew", "recommend"),
  173. ("jixiangxingfu", "recommend"),
  174. ("yuannifuqichangzai", "recommend"),
  175. ("benshanzhufu", "recommend"),
  176. ("zuihaodesongni", "recommend"),
  177. ("tiantianjufuqi", "recommend")
  178. }:
  179. self.logger.info("[去重] 平台配置无需去重,直接通过")
  180. return True
  181. day_count = await self.feishu_list()
  182. if day_count:
  183. sql = """
  184. SELECT UNIX_TIMESTAMP(create_time) as ts FROM crawler_video
  185. WHERE platform = %s AND out_video_id = %s AND create_time >= DATE_SUB(NOW(), INTERVAL %s DAY)
  186. """
  187. rows = await self.mysql.fetch_all(sql, [self.platform, out_id, int(day_count)])
  188. if rows:
  189. msg = f"[去重失败] {out_id} 在 {day_count} 天内已存在"
  190. self.logger.warning(msg)
  191. self.aliyun_log.logging(
  192. code="2002",
  193. trace_id=self.trace_id,
  194. data={
  195. "item": self.item,
  196. "existing_timestamps": [r["ts"] for r in rows],
  197. "day_count": day_count
  198. },
  199. message=msg,
  200. account=self.account
  201. )
  202. return False
  203. if self.platform == "zhufuhaoyunbaofu" and self.mode == "recommend":
  204. sql = """
  205. SELECT 1 FROM crawler_video WHERE platform = %s AND out_video_id = %s AND video_title = %s
  206. """
  207. result = await self.mysql.fetch_one(sql, [self.platform, out_id, title])
  208. else:
  209. sql = """
  210. SELECT 1 FROM crawler_video WHERE platform = %s AND out_video_id = %s
  211. """
  212. result = await self.mysql.fetch_one(sql, [self.platform, out_id])
  213. if result:
  214. msg = f"[去重失败] {out_id} 已存在"
  215. self.logger.warning(msg)
  216. self.aliyun_log.logging(
  217. code="2002",
  218. trace_id=self.trace_id,
  219. data={
  220. "item": self.item,
  221. "out_video_id": out_id
  222. },
  223. message=msg,
  224. account=self.account
  225. )
  226. return False
  227. self.logger.info("[去重] 校验通过")
  228. return True
  229. async def process_item(self) -> bool:
  230. """
  231. 异步执行完整规则流程,并输出详细本地日志和云日志
  232. """
  233. self.logger.info(f"开始校验: {self.item.get('out_video_id', '')}")
  234. if not await self.publish_time_flag():
  235. self.logger.info("校验结束: 发布时间不符合")
  236. return False
  237. if not self.title_flag():
  238. self.logger.info("校验结束: 标题不符合")
  239. return False
  240. if not await self.repeat_video():
  241. self.logger.info("校验结束: 去重不符合")
  242. return False
  243. if not self.download_rule_flag():
  244. self.logger.info("校验结束: 下载规则不符合")
  245. return False
  246. self.logger.info("校验结束: 全部通过")
  247. return True