pipeline.py 7.3 KB


  1. import os
  2. import re
  3. import sys
  4. import time
  5. from datetime import datetime
  6. from core.utils.feishu_data_async import FeishuDataAsync
  7. from core.utils.log.logger_manager import LoggerManager
  8. from services.async_mysql_service import AsyncMysqlService
  9. sys.path.append(os.getcwd())
  10. class PiaoQuanPipeline:
  11. """
  12. 异步版爬虫管道——用于执行视频规则校验
  13. """
  14. def __init__(self, platform, mode, rule_dict, env, item, trace_id, account=None):
  15. self.platform = platform
  16. self.mode = mode
  17. self.item = item
  18. self.rule_dict = rule_dict
  19. self.env = env
  20. self.trace_id = trace_id
  21. self.account = account
  22. # 初始化异步MySQL客户端
  23. self.mysql = AsyncMysqlService(platform=platform, mode=mode)
  24. self.aliyun_log = LoggerManager.get_aliyun_logger(platform=platform, mode=mode)
  25. async def feishu_time_list(self):
  26. """从飞书读取天数配置"""
  27. async with FeishuDataAsync() as feishu_data:
  28. summary = await feishu_data.get_values(spreadsheet_token="KsoMsyP2ghleM9tzBfmcEEXBnXg", sheet_id="RuLK77")
  29. for row in summary[1:]:
  30. if row[0] == self.platform:
  31. return row[1]
  32. return None
  33. async def publish_time_flag(self) -> bool:
  34. """
  35. 判断发布时间是否过期
  36. :return: True or False
  37. """
  38. publish_time_stamp = self.item["publish_time_stamp"]
  39. update_time_stamp = self.item["update_time_stamp"]
  40. max_d = self.rule_dict.get("period", {}).get("max", 1000)
  41. min_d = self.rule_dict.get("period", {}).get("min", 1000)
  42. days = max(max_d, min_d)
  43. days_time = await self.feishu_time_list()
  44. if days_time:
  45. days = int(days_time)
  46. now_ts = int(time.time())
  47. if self.platform == "gongzhonghao":
  48. if now_ts - publish_time_stamp > 86400 * days and now_ts - update_time_stamp > 86400 * days:
  49. self.aliyun_log.logging(
  50. code="2004",
  51. trace_id=self.trace_id,
  52. data=self.item,
  53. message=f"发布时间超过{days}天"
  54. )
  55. return False
  56. else:
  57. if days == 0:
  58. is_today = datetime.fromtimestamp(publish_time_stamp).date() == datetime.today().date()
  59. if not is_today:
  60. return False
  61. elif now_ts - publish_time_stamp > 86400 * days:
  62. self.aliyun_log.logging(
  63. code="2004",
  64. trace_id=self.trace_id,
  65. data=self.item,
  66. message=f"发布时间超过{days}天"
  67. )
  68. return False
  69. return True
  70. def title_flag(self) -> bool:
  71. """
  72. 检查标题是否包含敏感词
  73. """
  74. title = self.item["video_title"]
  75. cleaned_title = re.sub(r"[^\w]", " ", title)
  76. sensitive_words = [] # 这里可添加敏感词
  77. if any(word in cleaned_title for word in sensitive_words):
  78. self.aliyun_log.logging(
  79. code="2003",
  80. trace_id=self.trace_id,
  81. message="标题中包含敏感词",
  82. data=self.item,
  83. account=self.account
  84. )
  85. return False
  86. return True
  87. def download_rule_flag(self) -> bool:
  88. """
  89. 检查是否符合各项下载数值规则
  90. """
  91. for key in self.item:
  92. if self.rule_dict.get(key):
  93. max_value = int(self.rule_dict[key].get("max", 999999999))
  94. if key == "peroid":
  95. continue
  96. val = int(self.item.get(key, 0))
  97. if not int(self.rule_dict[key]["min"]) <= val <= max_value:
  98. self.aliyun_log.logging(
  99. code="2004",
  100. trace_id=self.trace_id,
  101. message=f"{key}: 不符合规则",
  102. data=self.item,
  103. account=self.account
  104. )
  105. return False
  106. return True
  107. async def feishu_list(self):
  108. """从飞书拉取天数配置,用于去重判断"""
  109. async with FeishuDataAsync() as feishu_data:
  110. summary = await feishu_data.get_values(spreadsheet_token="KsoMsyP2ghleM9tzBfmcEEXBnXg", sheet_id="letS93")
  111. for row in summary[1:]:
  112. if row[0] == self.platform:
  113. return row[1]
  114. return None
  115. async def repeat_video(self) -> bool:
  116. """
  117. 判断视频是否重复(包括飞书配置去重天数逻辑)
  118. """
  119. out_id = self.item["out_video_id"]
  120. title = self.item["video_title"]
  121. day_count = await self.feishu_list()
  122. async with self.mysql as db:
  123. if day_count:
  124. sql = f"""
  125. SELECT create_time
  126. FROM crawler_video
  127. WHERE classname = %s AND out_video_id = %s
  128. AND create_time >= DATE_SUB(NOW(), INTERVAL %s DAY)
  129. """
  130. rows = await db.client.fetch_all(sql, [self.platform, out_id, int(day_count)])
  131. if rows:
  132. self.aliyun_log.logging(
  133. code="2002",
  134. trace_id=self.trace_id,
  135. message="重复的视频",
  136. data=self.item,
  137. account=self.account
  138. )
  139. return False
  140. # 特定平台绕过去重判断
  141. bypass = {
  142. ("zhufuniannianshunxinjixiang", "recommend"),
  143. ("benshanzhufu", "recommend"),
  144. ("tiantianjufuqi", "recommend"),
  145. }
  146. if (self.platform, self.mode) in bypass:
  147. return True
  148. # 标题去重逻辑(示例)
  149. if self.platform == "zhufuhaoyunbaofu" and self.mode == "recommend":
  150. sql = """
  151. SELECT 1
  152. FROM crawler_video
  153. WHERE classname = %s
  154. AND out_video_id = %s
  155. AND video_title = %s \
  156. """
  157. result = await db.client.fetch_one(sql, [self.platform, out_id, title])
  158. else:
  159. sql = """
  160. SELECT 1
  161. FROM crawler_video
  162. WHERE classname = %s
  163. AND out_video_id = %s \
  164. """
  165. result = await db.client.fetch_one(sql, [self.platform, out_id])
  166. if result:
  167. self.aliyun_log.logging(
  168. code="2002",
  169. trace_id=self.trace_id,
  170. message="重复的视频",
  171. data=self.item,
  172. account=self.account
  173. )
  174. return False
  175. return True
  176. async def process_item(self) -> bool:
  177. """
  178. 异步执行所有规则校验
  179. """
  180. if not await self.publish_time_flag():
  181. return False
  182. if not self.title_flag():
  183. return False
  184. if not await self.repeat_video():
  185. return False
  186. if not self.download_rule_flag():
  187. return False
  188. return True