pipeline.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. import os
  2. import re
  3. import sys
  4. import time
  5. from datetime import datetime
  6. from core.utils.feishu_data_async import FeishuDataAsync
  7. from core.utils.log.logger_manager import LoggerManager
  8. from services.async_mysql_service import AsyncMysqlService
  9. sys.path.append(os.getcwd())
  10. class PiaoQuanPipeline:
  11. """
  12. 异步版爬虫管道——用于执行视频规则校验
  13. """
  14. def __init__(self, platform, mode, rule_dict, env, item, trace_id, account=None):
  15. self.platform = platform
  16. self.mode = mode
  17. self.item = item
  18. self.rule_dict = rule_dict
  19. self.env = env
  20. self.trace_id = trace_id
  21. self.account = account
  22. # 初始化异步MySQL客户端
  23. self.mysql = AsyncMysqlService(platform=platform, mode=mode)
  24. self.aliyun_log = LoggerManager.get_aliyun_logger(platform=platform, mode=mode)
  25. async def feishu_time_list(self):
  26. """从飞书读取天数配置"""
  27. summary = await FeishuDataAsync.get_values("KsoMsyP2ghleM9tzBfmcEEXBnXg", "RuLK77")
  28. for row in summary[1:]:
  29. if row[0] == self.platform:
  30. return row[1]
  31. return None
  32. async def publish_time_flag(self) -> bool:
  33. """
  34. 判断发布时间是否过期
  35. :return: True or False
  36. """
  37. publish_time_stamp = self.item["publish_time_stamp"]
  38. update_time_stamp = self.item["update_time_stamp"]
  39. max_d = self.rule_dict.get("period", {}).get("max", 1000)
  40. min_d = self.rule_dict.get("period", {}).get("min", 1000)
  41. days = max(max_d, min_d)
  42. days_time = await self.feishu_time_list()
  43. if days_time:
  44. days = int(days_time)
  45. now_ts = int(time.time())
  46. if self.platform == "gongzhonghao":
  47. if now_ts - publish_time_stamp > 86400 * days and now_ts - update_time_stamp > 86400 * days:
  48. self.aliyun_log.logging(
  49. code="2004",
  50. trace_id=self.trace_id,
  51. data=self.item,
  52. message=f"发布时间超过{days}天"
  53. )
  54. return False
  55. else:
  56. if days == 0:
  57. is_today = datetime.fromtimestamp(publish_time_stamp).date() == datetime.today().date()
  58. if not is_today:
  59. return False
  60. elif now_ts - publish_time_stamp > 86400 * days:
  61. self.aliyun_log.logging(
  62. code="2004",
  63. trace_id=self.trace_id,
  64. data=self.item,
  65. message=f"发布时间超过{days}天"
  66. )
  67. return False
  68. return True
  69. def title_flag(self) -> bool:
  70. """
  71. 检查标题是否包含敏感词
  72. """
  73. title = self.item["video_title"]
  74. cleaned_title = re.sub(r"[^\w]", " ", title)
  75. sensitive_words = [] # 这里可添加敏感词
  76. if any(word in cleaned_title for word in sensitive_words):
  77. self.aliyun_log.logging(
  78. code="2003",
  79. trace_id=self.trace_id,
  80. message="标题中包含敏感词",
  81. data=self.item,
  82. account=self.account
  83. )
  84. return False
  85. return True
  86. def download_rule_flag(self) -> bool:
  87. """
  88. 检查是否符合各项下载数值规则
  89. """
  90. for key in self.item:
  91. if self.rule_dict.get(key):
  92. max_value = int(self.rule_dict[key].get("max", 999999999))
  93. if key == "peroid":
  94. continue
  95. val = int(self.item.get(key, 0))
  96. if not int(self.rule_dict[key]["min"]) <= val <= max_value:
  97. self.aliyun_log.logging(
  98. code="2004",
  99. trace_id=self.trace_id,
  100. message=f"{key}: 不符合规则",
  101. data=self.item,
  102. account=self.account
  103. )
  104. return False
  105. return True
  106. async def feishu_list(self):
  107. """从飞书拉取天数配置,用于去重判断"""
  108. summary = FeishuUtils.get_values_batch("KsoMsyP2ghleM9tzBfmcEEXBnXg", "letS93")
  109. for row in summary[1:]:
  110. if row[0] == self.platform:
  111. return row[1]
  112. return None
  113. async def repeat_video(self) -> bool:
  114. """
  115. 判断视频是否重复(包括飞书配置去重天数逻辑)
  116. """
  117. out_id = self.item["out_video_id"]
  118. title = self.item["video_title"]
  119. day_count = await self.feishu_list()
  120. async with self.mysql as db:
  121. if day_count:
  122. sql = f"""
  123. SELECT create_time
  124. FROM crawler_video
  125. WHERE classname = %s AND out_video_id = %s
  126. AND create_time >= DATE_SUB(NOW(), INTERVAL %s DAY)
  127. """
  128. rows = await db.client.fetch_all(sql, [self.platform, out_id, int(day_count)])
  129. if rows:
  130. self.aliyun_log.logging(
  131. code="2002",
  132. trace_id=self.trace_id,
  133. message="重复的视频",
  134. data=self.item,
  135. account=self.account
  136. )
  137. return False
  138. # 特定平台绕过去重判断
  139. bypass = {
  140. ("zhufuniannianshunxinjixiang", "recommend"),
  141. ("benshanzhufu", "recommend"),
  142. ("tiantianjufuqi", "recommend"),
  143. }
  144. if (self.platform, self.mode) in bypass:
  145. return True
  146. # 标题去重逻辑(示例)
  147. if self.platform == "zhufuhaoyunbaofu" and self.mode == "recommend":
  148. sql = """
  149. SELECT 1
  150. FROM crawler_video
  151. WHERE classname = %s
  152. AND out_video_id = %s
  153. AND video_title = %s \
  154. """
  155. result = await db.client.fetch_one(sql, [self.platform, out_id, title])
  156. else:
  157. sql = """
  158. SELECT 1
  159. FROM crawler_video
  160. WHERE classname = %s
  161. AND out_video_id = %s \
  162. """
  163. result = await db.client.fetch_one(sql, [self.platform, out_id])
  164. if result:
  165. self.aliyun_log.logging(
  166. code="2002",
  167. trace_id=self.trace_id,
  168. message="重复的视频",
  169. data=self.item,
  170. account=self.account
  171. )
  172. return False
  173. return True
  174. async def process_item(self) -> bool:
  175. """
  176. 异步执行所有规则校验
  177. """
  178. if not await self.publish_time_flag():
  179. return False
  180. if not self.title_flag():
  181. return False
  182. if not await self.repeat_video():
  183. return False
  184. if not self.download_rule_flag():
  185. return False
  186. return True