123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- # -*- coding: utf-8 -*-
- import argparse
- import datetime
- import json
- import requests
- server_robot = {
- 'webhook': 'https://open.feishu.cn/open-apis/bot/v2/hook/926982f5-e7af-40f5-81fd-27d8f42718e4',
- }
- level_header_template_map = {
- "info": "turquoise",
- "error": "red",
- "warn": "yellow"
- }
- level_header_title_content_map = {
- "info": "广告模型自动更新通知",
- "error": "广告模型自动更新告警",
- "warn": "广告模型自动更新告警"
- }
- level_task_status_map = {
- "info": "任务执行成功",
- "error": "任务执行失败",
- "warn": "任务执行失败",
- }
- def send_card_msg_to_feishu(webhook, card_json):
- """发送消息到飞书"""
- headers = {'Content-Type': 'application/json'}
- payload_message = {
- "msg_type": "interactive",
- "card": card_json
- }
- print(f"推送飞书消息内容: {json.dumps(payload_message)}")
- response = requests.request('POST', url=webhook, headers=headers, data=json.dumps(payload_message))
- print(response.text)
- def seconds_convert(seconds):
- hours = seconds // 3600
- minutes = (seconds % 3600) // 60
- seconds = seconds % 60
- return f"{hours}小时 {minutes}分钟 {seconds}秒"
- def _monitor(level, msg: str, start, elapsed):
- """消息推送"""
- now = datetime.datetime.now()
- if now.hour > 6:
- msg = msg.replace("\\n", "\n").replace("\\t", "\t")
- mgs_text = f"- 当前时间: {now.strftime('%Y-%m-%d %H:%M:%S')}" \
- f"\n- 任务开始时间: {start}" \
- f"\n- 任务状态: {level_task_status_map[level]}" \
- f"\n- 任务耗时: {seconds_convert(elapsed)}" \
- f"\n- 任务描述: {msg}"
- card_json = {
- "config": {},
- "i18n_elements": {
- "zh_cn": [
- {
- "tag": "markdown",
- "content": "",
- "text_align": "left",
- "text_size": "normal"
- },
- {
- "tag": "markdown",
- "content": mgs_text,
- "text_align": "left",
- "text_size": "normal"
- }
- ]
- },
- "i18n_header": {
- "zh_cn": {
- "title": {
- "tag": "plain_text",
- "content": level_header_title_content_map[level]
- },
- "subtitle": {
- "tag": "plain_text",
- "content": ""
- },
- "template": level_header_template_map[level]
- }
- }
- }
- send_card_msg_to_feishu(
- webhook=server_robot.get('webhook'),
- card_json=card_json
- )
- if __name__ == '__main__':
- parser = argparse.ArgumentParser(description='告警Utils')
- parser.add_argument('--level', type=str, help='通知级别, info, warn, error', required=True)
- parser.add_argument('--msg', type=str, help='消息', required=True)
- parser.add_argument('--start', type=str, help='任务开始时间', required=True)
- parser.add_argument('--elapsed', type=int, help='任务耗时【秒】', required=True)
- args = parser.parse_args()
- _monitor(
- level=args.level,
- msg=args.msg,
- start=args.start,
- elapsed=args.elapsed
- )
|