pipeline.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. import os
  2. import re
  3. import time
  4. from .aliyun_log import AliyunLogger
  5. from common.scheduling_db import MysqlHelper
  6. class PiaoQuanPipeline:
  7. def __init__(self, platform, mode, rule_dict, env, item):
  8. self.platform = platform
  9. self.mode = mode
  10. self.item = item
  11. self.rule_dict = rule_dict
  12. self.env = env
  13. # 视频的发布时间限制
  14. def publish_time_flag(self):
  15. # 判断发布时间
  16. publish_time_stamp = self.item["publish_time_stamp"]
  17. update_time_stamp = self.item["update_time_stamp"]
  18. if (
  19. int(time.time()) - publish_time_stamp
  20. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  21. ) and (
  22. int(time.time()) - update_time_stamp
  23. > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
  24. ):
  25. AliyunLogger.logging(
  26. code="2001",
  27. platform=self.platform,
  28. mode=self.mode,
  29. data="",
  30. env=self.env,
  31. message="发布时间超过{}天".format(int(self.rule_dict.get('period', {}).get('max', 1000)))
  32. )
  33. return False
  34. return True
  35. # 视频标题是否满足需求
  36. def title_flag(self):
  37. title = self.item['video_title']
  38. cleaned_title = re.sub(r'[^\w]', ' ', title)
  39. # 敏感词
  40. # 获取敏感词列表
  41. sensitive_words = []
  42. if any(word in cleaned_title for word in sensitive_words):
  43. AliyunLogger.logging(
  44. code="2004",
  45. platform=self.platform,
  46. mode=self.mode,
  47. env=self.env,
  48. message="标题中包含敏感词",
  49. data=self.item
  50. )
  51. return False
  52. return True
  53. # 视频基础下载规则
  54. def download_rule_flag(self):
  55. # 格式化 video_dict:publish_time_stamp
  56. if self.item.get("publish_time_stamp"):
  57. self.item["publish_time"] = self.item["publish_time_stamp"] * 1000
  58. # 格式化 video_dict:period
  59. if self.item.get("publish_time") and self.item.get("period", "noperiod") == "noperiod":
  60. self.item["period"] = int((int(time.time() * 1000) - self.item["publish_time"]) / (3600 * 24 * 1000))
  61. # 格式化 rule_dict 最大值取值为 0 的问题
  62. for key in self.item:
  63. if self.rule_dict.get(key):
  64. max_value = int(self.rule_dict[key]["max"]) if int(self.rule_dict[key]["max"]) > 0 else 999999999999999
  65. if key == "peroid":
  66. flag = 0 <= int(self.item[key]) <= max_value
  67. AliyunLogger.logging(
  68. code="2003",
  69. platform=self.platform,
  70. mode=self.mode,
  71. env=self.env,
  72. data=self.item,
  73. message='{}: 0 <= {} <= {}, {}'.format(key, self.item[key], max_value, flag)
  74. )
  75. if not flag:
  76. return flag
  77. else:
  78. flag = int(self.rule_dict[key]["min"]) <= int(self.item[key] <= max_value)
  79. AliyunLogger.logging(
  80. code="2003",
  81. platform=self.platform,
  82. mode=self.mode,
  83. env=self.env,
  84. data=self.item,
  85. message='{}: {} <= {} <= {}, {}'.format(key, self.rule_dict[key]["min"], self.item[key], max_value, flag)
  86. )
  87. if not flag:
  88. return flag
  89. else:
  90. continue
  91. return True
  92. # 按照某个具体平台来去重
  93. def repeat_video(self):
  94. # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """
  95. out_id = self.item['out_video_id']
  96. sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
  97. repeat_video = MysqlHelper.get_values(
  98. log_type=self.mode,
  99. crawler=self.platform,
  100. env=self.env,
  101. sql=sql,
  102. action=''
  103. )
  104. if repeat_video:
  105. AliyunLogger.logging(
  106. code="2002",
  107. platform=self.platform,
  108. mode=self.mode,
  109. env=self.env,
  110. message="重复的视频",
  111. data=self.item
  112. )