Просмотр исходного кода

新增 aliyun_log, pipeline.py

罗俊辉 1 год назад
Родитель
Сommit
15a830b9ac
3 измененных файлов с 200 добавлено и 3 удалено
  1. 1 3
      common/__init__.py
  2. 80 0
      common/aliyun_log.py
  3. 119 0
      common/pipeline.py

+ 1 - 3
common/__init__.py

@@ -1,3 +1 @@
-# -*- coding: utf-8 -*-
-# @Author: wangkun
-# @Time: 2023/1/31
+

+ 80 - 0
common/aliyun_log.py

@@ -0,0 +1,80 @@
+# -*- coding: utf-8 -*-
+# @Author: 罗俊辉
+# @Time: 2023/10/23
+"""
+公共方法,包含:生成log / 删除log
+"""
+import json
+from aliyun.log import LogClient, PutLogsRequest, LogItem
+from datetime import date, timedelta
+from datetime import datetime
+import time
+
+proxies = {"http": None, "https": None}
+
+
+class AliyunLogger:
+    # 统一获取当前时间 <class 'datetime.datetime'>  2022-04-14 20:13:51.244472
+    now = datetime.now()
+    # 昨天 <class 'str'>  2022-04-13
+    yesterday = (date.today() + timedelta(days=-1)).strftime("%Y-%m-%d")
+    # 今天 <class 'datetime.date'>  2022-04-14
+    today = date.today()
+    # 明天 <class 'str'>  2022-04-15
+    tomorrow = (date.today() + timedelta(days=1)).strftime("%Y-%m-%d")
+
+    # 写入阿里云日志
+    @staticmethod
+    def logging(code, platform, mode, env, message, data, account=None):
+        """
+        写入阿里云日志
+        测试库: https://sls.console.aliyun.com/lognext/project/crawler-log-dev/logsearch/crawler-log-dev
+        正式库: https://sls.console.aliyun.com/lognext/project/crawler-log-prod/logsearch/crawler-log-prod
+        """
+        # 设置阿里云日志服务的访问信息
+        accessKeyId = "LTAIWYUujJAm7CbH"
+        accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
+        if env == "dev":
+            project = "crawler-log-dev"
+            logstore = "crawler-log-dev"
+            endpoint = "cn-hangzhou.log.aliyuncs.com"
+        else:
+            project = "crawler-log-prod"
+            logstore = "crawler-log-prod"
+            endpoint = "cn-hangzhou-intranet.log.aliyuncs.com"
+
+        # 创建 LogClient 实例
+        client = LogClient(endpoint, accessKeyId, accessKey)
+        log_group = []
+        log_item = LogItem()
+
+        """
+        生成日志消息体格式,例如
+        crawler:xigua
+        message:不满足抓取规则 
+        mode:search
+        timestamp:1686656143
+        """
+        message = message.replace("\r", " ").replace("\n", " ")
+        contents = [
+            (f"code", str(code)),
+            (f"platform", str(platform)),
+            (f"mode", str(mode)),
+            (f"message", str(message)),
+            (f"data", json.dumps(data, ensure_ascii=False)),
+            (f"account", str(account)),
+            ("timestamp", str(int(time.time()))),
+        ]
+
+        log_item.set_contents(contents)
+        log_group.append(log_item)
+        # 写入日志
+        request = PutLogsRequest(
+            project=project,
+            logstore=logstore,
+            topic="",
+            source="",
+            logitems=log_group,
+            compress=False,
+        )
+        client.put_logs(request)

+ 119 - 0
common/pipeline.py

@@ -0,0 +1,119 @@
+import os
+import re
+import time
+from .aliyun_log import AliyunLogger
+from common.scheduling_db import MysqlHelper
+
+
+class PiaoQuanPipeline:
+    def __init__(self, platform, mode, rule_dict, env, item):
+        self.platform = platform
+        self.mode = mode
+        self.item = item
+        self.rule_dict = rule_dict
+        self.env = env
+
+    # 视频的发布时间限制
+    def publish_time_flag(self):
+        # 判断发布时间
+        publish_time_stamp = self.item["publish_time_stamp"]
+        update_time_stamp = self.item["update_time_stamp"]
+        if (
+                int(time.time()) - publish_time_stamp
+                > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
+        ) and (
+                int(time.time()) - update_time_stamp
+                > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
+        ):
+            AliyunLogger.logging(
+                code="2001",
+                platform=self.platform,
+                mode=self.mode,
+                data="",
+                env=self.env,
+                message="发布时间超过{}天".format(int(self.rule_dict.get('period', {}).get('max', 1000)))
+            )
+            return False
+        return True
+
+    # 视频标题是否满足需求
+    def title_flag(self):
+        title = self.item['video_title']
+        cleaned_title = re.sub(r'[^\w]', ' ', title)
+        # 敏感词
+        # 获取敏感词列表
+        sensitive_words = []
+        if any(word in cleaned_title for word in sensitive_words):
+            AliyunLogger.logging(
+                code="2004",
+                platform=self.platform,
+                mode=self.mode,
+                env=self.env,
+                message="标题中包含敏感词",
+                data=self.item
+            )
+            return False
+        return True
+
+    # 视频基础下载规则
+    def download_rule_flag(self):
+        # 格式化 video_dict:publish_time_stamp
+        if self.item.get("publish_time_stamp"):
+            self.item["publish_time"] = self.item["publish_time_stamp"] * 1000
+        # 格式化 video_dict:period
+        if self.item.get("publish_time") and self.item.get("period", "noperiod") == "noperiod":
+            self.item["period"] = int((int(time.time() * 1000) - self.item["publish_time"]) / (3600 * 24 * 1000))
+        # 格式化 rule_dict 最大值取值为 0 的问题
+        for key in self.item:
+            if self.rule_dict.get(key):
+                max_value = int(self.rule_dict[key]["max"]) if int(self.rule_dict[key]["max"]) > 0 else 999999999999999
+                if key == "peroid":
+                    flag = 0 <= int(self.item[key]) <= max_value
+                    AliyunLogger.logging(
+                        code="2003",
+                        platform=self.platform,
+                        mode=self.mode,
+                        env=self.env,
+                        data=self.item,
+                        message='{}: 0 <= {} <= {}, {}'.format(key, self.item[key], max_value, flag)
+                    )
+
+                    if not flag:
+                        return flag
+                else:
+                    flag = int(self.rule_dict[key]["min"]) <= int(self.item[key] <= max_value)
+                    AliyunLogger.logging(
+                        code="2003",
+                        platform=self.platform,
+                        mode=self.mode,
+                        env=self.env,
+                        data=self.item,
+                        message='{}: {} <= {} <= {}, {}'.format(key, self.rule_dict[key]["min"], self.item[key], max_value, flag)
+                    )
+                    if not flag:
+                        return flag
+            else:
+                continue
+        return True
+
+    # 按照某个具体平台来去重
+    def repeat_video(self):
+        # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """
+        out_id = self.item['out_video_id']
+        sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
+        repeat_video = MysqlHelper.get_values(
+            log_type=self.mode,
+            crawler=self.platform,
+            env=self.env,
+            sql=sql,
+            action=''
+        )
+        if repeat_video:
+            AliyunLogger.logging(
+                code="2002",
+                platform=self.platform,
+                mode=self.mode,
+                env=self.env,
+                message="重复的视频",
+                data=self.item
+            )