pipeline.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. import os
  2. import re
  3. import sys
  4. import time
  5. from datetime import datetime
  6. sys.path.append(os.getcwd())
  7. from core.utils.feishu_data_async import FeishuDataAsync
  8. from core.utils.log.logger_manager import LoggerManager
  9. from services.async_mysql_service import AsyncMysqlService
  10. class PiaoQuanPipeline:
  11. """
  12. 完整异步爬虫管道 - 每个校验不通过都详细记录本地日志+阿里云日志
  13. """
  14. def __init__(self, platform, mode, rule_dict, env, item, trace_id, account=None):
  15. self.platform = platform
  16. self.mode = mode
  17. self.rule_dict = rule_dict
  18. self.env = env
  19. self.item = item
  20. self.trace_id = trace_id
  21. self.account = account
  22. self.mysql = AsyncMysqlService(platform=platform, mode=mode)
  23. self.logger = LoggerManager.get_logger(platform=platform, mode=mode)
  24. self.aliyun_log = LoggerManager.get_aliyun_logger(platform=platform, mode=mode)
  25. async def feishu_time_list(self):
  26. async with FeishuDataAsync() as feishu_data:
  27. summary = await feishu_data.get_values(
  28. spreadsheet_token="KsoMsyP2ghleM9tzBfmcEEXBnXg",
  29. sheet_id="RuLK77"
  30. )
  31. for row in summary[1:]:
  32. if row[0] == self.platform:
  33. return row[1]
  34. return None
  35. async def feishu_list(self):
  36. async with FeishuDataAsync() as feishu_data:
  37. summary = await feishu_data.get_values(
  38. spreadsheet_token="KsoMsyP2ghleM9tzBfmcEEXBnXg",
  39. sheet_id="letS93"
  40. )
  41. for row in summary[1:]:
  42. if row[0] == self.platform:
  43. return row[1]
  44. return None
  45. async def publish_time_flag(self) -> bool:
  46. publish_ts = self.item.get("publish_time_stamp", int(time.time()))
  47. update_ts = self.item.get("update_time_stamp", int(time.time()))
  48. max_d = self.rule_dict.get("period", {}).get("max", 1000)
  49. min_d = self.rule_dict.get("period", {}).get("min", 1000)
  50. days = max(max_d, min_d)
  51. feishu_days = await self.feishu_time_list()
  52. if feishu_days:
  53. days = int(feishu_days)
  54. now_ts = int(time.time())
  55. if self.platform == "gongzhonghao":
  56. if (now_ts - publish_ts > 86400 * days) and (now_ts - update_ts > 86400 * days):
  57. msg = f"[发布时间过期] now={now_ts}, publish={publish_ts}, update={update_ts}, limit_days={days}"
  58. self.logger.warning(msg)
  59. self.aliyun_log.logging(
  60. code="2004",
  61. trace_id=self.trace_id,
  62. data={
  63. "item": self.item,
  64. "now_ts": now_ts,
  65. "publish_ts": publish_ts,
  66. "update_ts": update_ts,
  67. "days_limit": days
  68. },
  69. message=msg,
  70. account=self.account
  71. )
  72. return False
  73. else:
  74. if days == 0:
  75. is_today = datetime.fromtimestamp(publish_ts).date() == datetime.today().date()
  76. if not is_today:
  77. msg = "[发布时间] 不在今日"
  78. self.logger.warning(msg)
  79. self.aliyun_log.logging(
  80. code="2004",
  81. trace_id=self.trace_id,
  82. data={
  83. "item": self.item,
  84. "publish_ts": publish_ts
  85. },
  86. message=msg,
  87. account=self.account
  88. )
  89. return False
  90. elif now_ts - publish_ts > 86400 * days:
  91. msg = f"[发布时间超限制] now={now_ts}, publish={publish_ts}, limit_days={days}"
  92. self.logger.warning(msg)
  93. self.aliyun_log.logging(
  94. code="2004",
  95. trace_id=self.trace_id,
  96. data={
  97. "item": self.item,
  98. "now_ts": now_ts,
  99. "publish_ts": publish_ts,
  100. "days_limit": days
  101. },
  102. message=msg,
  103. account=self.account
  104. )
  105. return False
  106. return True
  107. def title_flag(self) -> bool:
  108. title = self.item.get("video_title", "")
  109. cleaned_title = re.sub(r"[^\w]", " ", title)
  110. sensitive_words = [] # 可配置敏感词列表
  111. for word in sensitive_words:
  112. if word in cleaned_title:
  113. msg = f"[标题包含敏感词] {word} in {title}"
  114. self.logger.warning(msg)
  115. self.aliyun_log.logging(
  116. code="2003",
  117. trace_id=self.trace_id,
  118. data={
  119. "item": self.item,
  120. "title": title,
  121. "matched_word": word
  122. },
  123. message=msg,
  124. account=self.account
  125. )
  126. return False
  127. return True
  128. def download_rule_flag(self) -> bool:
  129. for key, rule in self.rule_dict.items():
  130. if key == "period":
  131. continue
  132. item_value = int(self.item.get(key, 0))
  133. min_v = int(rule.get("min", 0))
  134. max_v = int(rule.get("max", 999999999))
  135. if not (min_v <= item_value <= max_v):
  136. msg = f"[{key} 校验失败] value={item_value}, expected=[{min_v}, {max_v}]"
  137. self.logger.warning(msg)
  138. self.aliyun_log.logging(
  139. code="2004",
  140. trace_id=self.trace_id,
  141. data={
  142. "item": self.item,
  143. "field": key,
  144. "item_value": item_value,
  145. "min": min_v,
  146. "max": max_v
  147. },
  148. message=msg,
  149. account=self.account
  150. )
  151. return False
  152. return True
  153. async def repeat_video(self) -> bool:
  154. out_id = self.item.get("out_video_id")
  155. title = self.item.get("video_title", "")
  156. bypass_platforms = {
  157. "zhufuniannianshunxinjixiang", "weiquanshipin", "piaoquangushi", "lepaoledong", "zhufukuaizhuan",
  158. "linglingkuailezhufu", "lepaoledongdijie", "jierizhufuhuakaifugui", "haoyunzhufuduo",
  159. "quzhuan", "zhufudewenhou", "jierizhufuxingfujixiang", "haoyoushipin", "xinshiquan",
  160. "laonianshenghuokuaile", "laonianquan"
  161. }
  162. if self.platform in bypass_platforms or (self.platform, self.mode) in {
  163. ("zhuwanwufusunew", "recommend"),
  164. ("jixiangxingfu", "recommend"),
  165. ("yuannifuqichangzai", "recommend"),
  166. ("benshanzhufu", "recommend"),
  167. ("zuihaodesongni", "recommend"),
  168. ("tiantianjufuqi", "recommend")
  169. }:
  170. self.logger.info("[去重] 平台配置无需去重,直接通过")
  171. return True
  172. day_count = await self.feishu_list()
  173. if day_count:
  174. sql = """
  175. SELECT UNIX_TIMESTAMP(create_time) as ts FROM crawler_video
  176. WHERE platform = %s AND out_video_id = %s AND create_time >= DATE_SUB(NOW(), INTERVAL %s DAY)
  177. """
  178. rows = await self.mysql.fetch_all(sql, [self.platform, out_id, int(day_count)])
  179. if rows:
  180. msg = f"[去重失败] {out_id} 在 {day_count} 天内已存在"
  181. self.logger.warning(msg)
  182. self.aliyun_log.logging(
  183. code="2002",
  184. trace_id=self.trace_id,
  185. data={
  186. "item": self.item,
  187. "existing_timestamps": [r["ts"] for r in rows],
  188. "day_count": day_count
  189. },
  190. message=msg,
  191. account=self.account
  192. )
  193. return False
  194. if self.platform == "zhufuhaoyunbaofu" and self.mode == "recommend":
  195. sql = """
  196. SELECT 1 FROM crawler_video WHERE platform = %s AND out_video_id = %s AND video_title = %s
  197. """
  198. result = await self.mysql.fetch_one(sql, [self.platform, out_id, title])
  199. else:
  200. sql = """
  201. SELECT 1 FROM crawler_video WHERE platform = %s AND out_video_id = %s
  202. """
  203. result = await self.mysql.fetch_one(sql, [self.platform, out_id])
  204. if result:
  205. msg = f"[去重失败] {out_id} 已存在"
  206. self.logger.warning(msg)
  207. self.aliyun_log.logging(
  208. code="2002",
  209. trace_id=self.trace_id,
  210. data={
  211. "item": self.item,
  212. "out_video_id": out_id
  213. },
  214. message=msg,
  215. account=self.account
  216. )
  217. return False
  218. self.logger.info("[去重] 校验通过")
  219. return True
  220. async def process_item(self) -> bool:
  221. """
  222. 异步执行完整规则流程,并输出详细本地日志和云日志
  223. """
  224. self.logger.info(f"开始校验: {self.item.get('out_video_id', '')}")
  225. if not await self.publish_time_flag():
  226. self.logger.info("校验结束: 发布时间不符合")
  227. return False
  228. if not self.title_flag():
  229. self.logger.info("校验结束: 标题不符合")
  230. return False
  231. if not await self.repeat_video():
  232. self.logger.info("校验结束: 去重不符合")
  233. return False
  234. if not self.download_rule_flag():
  235. self.logger.info("校验结束: 下载规则不符合")
  236. return False
  237. self.logger.info("校验结束: 全部通过")
  238. return True