Procházet zdrojové kódy

feat:修改监控告警

zhaohaipeng před 5 dny
rodič
revize
981386d9ca

+ 0 - 37
client/OSSClient.py

@@ -1,37 +0,0 @@
-import logging
-
-import oss2
-from oss2.credentials import StaticCredentialsProvider
-
-logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
-
-
-class OSSClient(object):
-    def __init__(self, access_key, access_secret, endpoint, region):
-        print(f"oss.version: {oss2.__version__}")
-        self.access_key = access_key
-        self.access_secret = access_secret
-        self.endpoint = endpoint
-        self.region = region
-        self.auth = oss2.ProviderAuthV4(StaticCredentialsProvider(self.access_key, self.access_secret))
-
-    def download_file(self, bucket_name, oss_file_path, local_file_path):
-        bucket = self._get_bucket(bucket_name)
-        bucket.get_object_to_file(oss_file_path, local_file_path)
-
-    def file_is_exist(self, bucket_name, oss_file_path) -> bool:
-        """
-        检查阿里云 OSS 文件是否存在。
-        :param bucket_name: OSS Bucket 名称
-        :param oss_file_path: 文件路径,例如 'folder/file.txt'
-        :return: 布尔值,表示文件是否存在
-        """
-        try:
-            bucket = self._get_bucket(bucket_name)
-            exists = bucket.object_exists(oss_file_path)
-            return exists
-        except Exception as e:
-            return False
-
-    def _get_bucket(self, bucket_name: str):
-        return oss2.Bucket(auth=self.auth, endpoint=self.endpoint, bucket_name=bucket_name, region=self.region)

+ 0 - 4
client/SLSClient.py

@@ -1,4 +0,0 @@
-
-class SLSClient(object):
-    def __init__(self):
-        pass

+ 0 - 68
client/XxlJobClient.py

@@ -1,68 +0,0 @@
-import json
-
-import requests
-
-header = {
-    "Cookie": "Hm_lvt_7f5c31e86e4cc8a706f7b3957d81f353=1714270679; "
-              "XXL_JOB_LOGIN_IDENTITY=7b226964223a312c22757365726e616d65223a2261646d696e222c2270617373776f7264"
-              "223a226531306164633339343962613539616262653536653035376632306638383365222c22726f6c65223a312c2270"
-              "65726d697373696f6e223a6e756c6c7d",
-    "content-type": "application/x-www-form-urlencoded; charset=UTF-8"
-}
-
-
-class XxJobClient(object):
-
-    def __init__(self, base_url):
-        self.base_url = base_url
-
-    def get_all_job_log(self, job_group=0, job_id=0, log_status=-1) -> list[dict]:
-        """
-        获取job的执行日志
-        :param job_group: 执行器ID, 0-全部
-        :param job_id: 任务ID, 0-全部
-        :param log_status:  任务状态,-1-全部, 1-成功, 2-失败, 3-进行中
-        :return:
-        """
-        url = f"{self.base_url}/joblog/pageList"
-        param = {
-            "start": 0,
-            "length": 1000,
-            "jobGroup": job_group,
-            "jobId": job_id,
-            "logStatus": log_status,
-            "filterTime": "2024-09-27 00:00:00 - 2024-09-27 23:59:59"
-        }
-        response = requests.post(url, data=param, headers=header)
-        return json.loads(response.content)['data']
-
-    def get_all_job_group(self) -> list[dict]:
-        """
-        获取所有的执行器
-        :return:
-        """
-
-        url = f"{self.base_url}/jobgroup/pageList"
-        param = {
-            "start": 0,
-            "length": 1000,
-        }
-        response = requests.post(url, data=param, headers=header)
-        return json.loads(response.content)['data']
-
-    def get_all_job_info(self, job_group=0, trigger_status=-1) -> list[dict]:
-        """
-        获取所有的job信息
-        :param job_group: 执行器ID
-        :param trigger_status: 状态 0-关,1-开,-1-全部
-        :return:
-        """
-        url = f"{self.base_url}/jobinfo/pageList"
-        param = {
-            "jobGroup": job_group,
-            "start": 0,
-            "length": 1000,
-            "triggerStatus": trigger_status
-        }
-        response = requests.post(url, data=param, headers=header)
-        return json.loads(response.content)['data']

+ 69 - 26
monitor/automation_provide_job_monitor.py

@@ -1,4 +1,5 @@
-import datetime
+from datetime import datetime, time
+from typing import List, Tuple
 
 from aliyun.log import LogClient
 from aliyun.log.auth import AUTH_VERSION_4
@@ -10,11 +11,16 @@ access_key = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
 access_key_id = "LTAIWYUujJAm7CbH"
 project = "crawler-scheduler"
 log_store = "aigc-provider"
-query_sql = "* | select crawlerMode, result, if(reason='null', '成功', reason) as reason, count(distinct videoId) as videoIdCnt, count(distinct crawlerPlanId) as crawlerPlanIdCnt  from log where reason not in ('该账号已经存在爬取计划,跳过执行', '该视频近期已经处理过', '该Topic已经创建过爬取计划', '该关键词已经创建过爬取计划') group by crawlerMode, result, reason order by crawlerMode, result desc, reason"
+state_query_sql = "* | select crawlerMode, result, if(reason='null', '成功', reason) as reason, count(distinct videoId) as videoIdCnt, count(distinct crawlerPlanId) as crawlerPlanIdCnt  from log where reason not in ('该账号已经存在爬取计划,跳过执行', '该视频近期已经处理过', '该Topic已经创建过爬取计划', '该关键词已经创建过爬取计划') group by crawlerMode, result, reason order by crawlerMode, result desc, reason"
 
 client = LogClient(endpoint=endpoint, accessKey=access_key, accessKeyId=access_key_id, auth_version=AUTH_VERSION_4, region='cn-hangzhou')
 webhook = 'https://open.feishu.cn/open-apis/bot/v2/hook/9f5c5cce-5eb2-4731-b368-33926f5549f9'
 
+all_crawler_mode_list = [
+    "account", "account_extend", "channel_topic", "channel_topic_extend",
+    "channel_image_search_video", "channel_image_search_topic", "channel_image_search_topic_extend"
+]
+
 card_json = {
     "schema": "2.0",
     "header": {
@@ -40,6 +46,7 @@ def gen_collapsible_panel_json(title, content, is_parent: bool = True) -> dict:
                 "content": title
             },
             "vertical_align": "center",
+
         },
         "border": {
             "color": "grey",
@@ -54,35 +61,22 @@ def gen_collapsible_panel_json(title, content, is_parent: bool = True) -> dict:
     }
 
 
-def main():
-    # 获取当前日期
-    today = datetime.datetime.now()
-
-    # 当天开始时间(00:00:00)
-    start_of_day = datetime.datetime.combine(today.date(), datetime.time.min)
-    # 当天结束时间(23:59:59.999999)
-    end_of_day = datetime.datetime.combine(today.date(), datetime.time.max)
-
-    # 转换为时间戳(秒级)
-    start_timestamp = int(start_of_day.timestamp())
-    end_timestamp = int(end_of_day.timestamp())
+def job_run_state(start_ts: int, end_ts: int):
+    """
+    任务运行情况统计
+    """
 
-    resp = client.get_log(project=project, logstore=log_store, from_time=start_timestamp, to_time=end_timestamp, query=query_sql)
+    resp = client.get_log(project=project, logstore=log_store, from_time=start_ts, to_time=end_ts, query=state_query_sql)
     log_data = resp.get_body().get('data')
 
-    all_crawler_mode = []
-    for datum in log_data:
-        if datum.get('crawlerMode') not in all_crawler_mode:
-            all_crawler_mode.append(datum.get('crawlerMode'))
-
     collapsible_limit = 5
-    crawler_mode_group = [all_crawler_mode[i:i + collapsible_limit] for i in range(0, len(all_crawler_mode), collapsible_limit)]
-    for crawler_mode_list in crawler_mode_group:
+    crawler_mode_group = [all_crawler_mode_list[i:i + collapsible_limit] for i in range(0, len(all_crawler_mode_list), collapsible_limit)]
+    for crawler_mode_partition in crawler_mode_group:
         elements = []
-        for crawler_mode in crawler_mode_list:
+        for crawler_mode in crawler_mode_partition:
             content = "| reason | videoIdCnt | crawlerPlanIdCnt |\n"
             content += "| --- | --- | --- |\n"
-            for datum in resp.get_body().get('data'):
+            for datum in log_data:
                 if crawler_mode != datum.get('crawlerMode'):
                     continue
                 reason = datum.get('reason')
@@ -90,9 +84,58 @@ def main():
                 crawler_plan_id_cnt = datum.get('crawlerPlanIdCnt')
                 content += f"| {reason} | {video_id_cnt} | {crawler_plan_id_cnt} |\n"
             elements.append(gen_collapsible_panel_json(crawler_mode, content))
+        new_card_json = {**card_json, **{}}
+        new_card_json["body"]["elements"] = elements
+        feishu_inform_util.send_card_msg_to_feishu(webhook, new_card_json)
+
+
+def crawler_mode_not_success_warning(start_ts: int, end_ts: int, crawler_mode_and_video_source_list: List[Tuple[str, str]]):
+    for crawler_mode, video_source in crawler_mode_and_video_source_list:
+        query_sql = f"crawlerMode : {crawler_mode} and videoSource : {video_source} and result : true | select count(1) as cnt from log"
+        resp = client.get_log(project=project, logstore=log_store, from_time=start_ts, to_time=end_ts, query=query_sql)
+        success_cnt = int(resp.get_body().get('data')[0]['cnt'])
+        if success_cnt <= 0:
+            msg = f"- 供给方式: {crawler_mode} \n- 视频来源: {video_source} \n- 当天还没有成功执行的任务,请关注"
+            new_card_json = {**card_json, **{}}
+            new_card_json['header']['template'] = 'red'
+            new_card_json['body']['elements'] = [{
+                "tag": "markdown",
+                "content": msg
+            }]
+            feishu_inform_util.send_card_msg_to_feishu(webhook, card_json)
+
+
+def main():
+    # 获取当前日期
+    today = datetime.now()
+
+    # 转换为时间戳(秒级)
+    # 当天开始时间(00:00:00)
+    start_ts = int(datetime.combine(today.date(), time.min).timestamp())
+    # 当天结束时间(23:59:59.999999)
+    end_ts = int(datetime.combine(today.date(), time.max).timestamp())
+
+    job_run_state(start_ts, end_ts)
+
+    # 历史爆款
+    video_source_list = ["history"]
+    history_crawler_mode_list = ["account_extend", "channel_topic", "channel_topic_extend", "channel_image_search_video", "channel_image_search_topic",
+                                 "channel_image_search_topic_extend"]
+    # 九点半之后统计每日爆款
+    if today.hour >= 9 and today.minute >= 30:
+        video_source_list.append("top")
+
+    crawler_mode_and_video_source_list = []
+    for crawler_mode in all_crawler_mode_list:
+        for video_source in video_source_list:
+            if video_source == "history":
+                if crawler_mode not in history_crawler_mode_list:
+                    continue
+                crawler_mode_and_video_source_list.append((crawler_mode, video_source))
+            else:
+                crawler_mode_and_video_source_list.append((crawler_mode, video_source))
 
-        card_json["body"]["elements"] = elements
-        feishu_inform_util.send_card_msg_to_feishu(webhook, card_json)
+    crawler_mode_not_success_warning(start_ts, end_ts, crawler_mode_and_video_source_list)
 
 
 if __name__ == "__main__":

+ 0 - 1
monitor/feature_spark_monitor.py

@@ -174,7 +174,6 @@ def _main():
     config = configparser.ConfigParser()
     config.read(args.config)
     webhook_url = config.get("feishu", "model.webhook")
-    # webhook_url = 'https://open.feishu.cn/open-apis/bot/v2/hook/540d4098-367a-4068-9a44-b8109652f07c'
 
     apollo_meta_url = config.get("apollo", "meta")
     apollo = ApolloClient.ApolloClient(apollo_meta_url)

+ 0 - 0
aliyun/__init__.py → resource/__init__.py


+ 0 - 0
resource/enums/__init__.py


+ 13 - 0
resource/enums/region.py

@@ -0,0 +1,13 @@
+from enum import Enum
+
+
+class Region(Enum):
+    HANG_ZHOU = ("cn-hangzhou", "r-kvstore.aliyuncs.com", "r-kvstore-vpc.cn-hangzhou.aliyuncs.com", "杭州")
+    HONG_KONG = ("cn-hongkong", "r-kvstore.cn-hongkong.aliyuncs.com", "r-kvstore-vpc.cn-hongkong.aliyuncs.com", "香港")
+    SOUTHEAST = ("ap-southeast-1", "r-kvstore.ap-southeast-1.aliyuncs.com", "r-kvstore-vpc.ap-southeast-1.aliyuncs.com", "新加坡")
+    US_WEST = ("us-west-1", "r-kvstore.us-west-1.aliyuncs.com", "r-kvstore-vpc.us-west-1.aliyuncs.com", "美国(硅谷)")
+
+    def __init__(self, region, endpoint, vpc_endpoint, desc):
+        self.region = region
+        self.endpoint = endpoint
+        self.desc = desc

+ 39 - 0
resource/kv_store_monitor.py

@@ -0,0 +1,39 @@
+from datetime import datetime
+
+from resource.enums.region import Region
+from resource.service.FeiShuService import FeiShuService
+from resource.service.KVStoreService import KVStoreService
+
+kv_store_service = KVStoreService()
+fei_shu_service = FeiShuService()
+
+# fei_shu_spread_sheet_token = "XzQGsheQzhk74rtknKacClASnTc"
+# fei_shu_sheet_id = "25dce4"
+#
+# tenant_access_token = fei_shu_service.get_tenant_access_token("cli_a89702999f3c900b", "47ewnaxRqJAvHYdUR8idHgfzfeqAu0Pz")
+
+
+def main():
+    all_instances = kv_store_service.get_all_instance(region=Region.HANG_ZHOU)
+    for instance in all_instances:
+        dt = datetime.now().strftime('%Y-%m-%d')
+        instance_id = instance.instance_id  # 实例ID
+        instance_name = instance.instance_name  # 实例名
+        instance_status = instance.instance_status  # 实例状态
+        instance_type = instance.instance_type  # 实例类型
+        zone_id = instance.zone_id  # 可用区ID
+        architecture_type = instance.architecture_type  # 架构类型
+        engine_version = instance.engine_version  # 兼容的Redis版本
+        charge_type = instance.charge_type  # 付费类型
+        capacity = instance.capacity / 1024  # 容量
+        values = [[dt, zone_id, instance_id, instance_name, instance_status, instance_type, architecture_type, engine_version, charge_type, capacity]]
+
+        # fei_shu_service.spreadsheet_values_prepend(tenant_access_token, fei_shu_spread_sheet_token, f"{fei_shu_sheet_id}!A2:J2", values)
+        instance_attribute = kv_store_service.describe_instance_attribute(instance_id, Region.HANG_ZHOU)
+        storage = instance_attribute.storage
+        availability_value = instance_attribute.availability_value
+        print(f"{instance_id} --> {storage} --> {availability_value}")
+
+
+if __name__ == "__main__":
+    main()

+ 33 - 0
resource/service/FeiShuService.py

@@ -0,0 +1,33 @@
+import json
+
+import requests
+
+
+class FeiShuService(object):
+    def __init__(self, ):
+        self.base_url = "https://open.feishu.cn"
+
+    def get_tenant_access_token(self, app_id: str, app_secret: str) -> str:
+        url = f"{self.base_url}/open-apis/auth/v3/tenant_access_token/internal"
+        param = {
+            "app_id": app_id,
+            "app_secret": app_secret,
+        }
+        response = requests.post(url, data=param)
+        return response.json()['tenant_access_token']
+
+    def spreadsheet_values_prepend(self, access_token: str, spreadsheet_token: str, range: str, values: list):
+        url = f"{self.base_url}/open-apis/sheets/v2/spreadsheets/{spreadsheet_token}/values_prepend"
+        param = {
+            "valueRange": {
+                "range": range,
+                "values": values,
+            }
+        }
+        headers = {
+            "Authorization": f"Bearer {access_token}",
+            "Content-Type": "application/json; charset=utf-8"
+        }
+
+        response = requests.post(url, data=json.dumps(param), headers=headers)
+        print(response.json())

+ 51 - 0
resource/service/KVStoreService.py

@@ -0,0 +1,51 @@
+from typing import Optional
+
+from alibabacloud_r_kvstore20150101.client import Client as KVStoreClient
+from alibabacloud_r_kvstore20150101.models import *
+from alibabacloud_tea_openapi import models
+
+from resource.enums.region import Region
+
+
+class KVStoreService(object):
+    def __init__(self):
+        self.region_client_map: Dict[Region, KVStoreClient] = {}
+        self.access_key_id = "LTAI4GBWbFvvXoXsSVBe1o9f"
+        self.access_key_secret = "kRAikWitb4kDxaAyBqNrmLmllMEDO3"
+
+    def create_client(self, region: Region) -> KVStoreClient:
+        config = models.Config(
+            access_key_id=self.access_key_id,
+            access_key_secret=self.access_key_secret,
+            endpoint=region.endpoint,
+        )
+        return KVStoreClient(config)
+
+    def get_kvstore_client(self, region: Region) -> KVStoreClient:
+        if region in self.region_client_map:
+            return self.region_client_map[region]
+
+        client = self.create_client(region)
+        self.region_client_map[region] = client
+        return client
+
+    def get_all_instance(self, region: Region) -> List[DescribeInstancesResponseBodyInstancesKVStoreInstance]:
+        page_number = 1
+        result: List[DescribeInstancesResponseBodyInstancesKVStoreInstance] = []
+        while True:
+            response = self.describe_instances(region, page_number)
+            if not response or not response.kvstore_instance:
+                break
+            result.extend(response.kvstore_instance)
+            page_number += 1
+        return result
+
+    def describe_instances(self, region: Region, page_number: int = 1, page_size: int = 50) -> Optional[DescribeInstancesResponseBodyInstances]:
+        request = DescribeInstancesRequest(region_id=region.region, page_number=page_number, page_size=page_size)
+        client = self.get_kvstore_client(region)
+        return client.describe_instances(request).body.instances
+
+    def describe_instance_attribute(self, instance_id: str, region: Region) -> Optional[DescribeInstanceAttributeResponseBodyInstancesDBInstanceAttribute]:
+        request = DescribeInstanceAttributeRequest(instance_id=instance_id)
+        response = self.get_kvstore_client(region).describe_instance_attribute(request)
+        return response.body.instances.dbinstance_attribute

+ 0 - 0
resource/service/__init__.py


+ 0 - 29
script/xxl_job.py

@@ -1,29 +0,0 @@
-from client import XxlJobClient
-
-XXL_JOB_BASE_URL = "http://xxl-job-internal.piaoquantv.com/xxl-job-admin"
-
-xxl_job_client = XxlJobClient.XxJobClient(XXL_JOB_BASE_URL)
-
-
-def _main():
-    all_job_group = {}
-    for item in xxl_job_client.get_all_job_group():
-        all_job_group[item['id']] = item
-
-    for item in xxl_job_client.get_all_job_info():
-        job_name = item['jobDesc']
-        job_id = item['id']
-        job_author = item['author']
-        job_group = item['jobGroup']
-        for log_item in xxl_job_client.get_all_job_log(job_id=job_id, log_status=2):
-            job_group_info = all_job_group[job_group]
-            print(
-                f"【任务执行失败】执行器: {job_group_info['title']} "
-                f"任务ID: {job_id}, "
-                f"任务名称: {job_name},"
-                f" 执行时间: {log_item['triggerTime']}"
-            )
-
-
-if __name__ == '__main__':
-    _main()

+ 1 - 2
util/feishu_inform_util.py

@@ -13,7 +13,7 @@ def send_card_msg_to_feishu(webhook, card_json):
         "msg_type": "interactive",
         "card": card_json
     }
-    print(f"推送飞书消息webhook地址 - {webhook},  消息内容: {json.dumps(payload_message)}")
+    print(f"推送飞书消息webhook地址 - {webhook},  消息内容: {json.dumps(payload_message, indent=4, ensure_ascii=False)}")
     response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
     print(f"推送飞书消息返回结果: {response.text}")
 
@@ -27,4 +27,3 @@ def timestamp_format(timestamp: str) -> str:
                 )
     except ValueError as e:
         return timestamp
-