Sfoglia il codice sorgente

feat:添加fish参考音频同步任务

zhaohaipeng 2 mesi fa
parent
commit
2c06239116
1 ha cambiato i file con 84 aggiunte e 0 eliminazioni
  1. 84 0
      script/fish_reference_audio_sync.py

+ 84 - 0
script/fish_reference_audio_sync.py

@@ -1,7 +1,11 @@
 from typing import List, Dict, Any
 
+import pandas as pd
+from tabulate import tabulate
+
 from client.FishClient import FishClient
 from helper.MySQLHelper import MySQLHelper
+from util import feishu_inform_util
 
 official_fish_client = FishClient("https://api.fish.audio")
 
@@ -12,6 +16,28 @@ mysql_helper = MySQLHelper(
     database="aigc-admin-prod"
 )
 
+webhook_url = "https://open.feishu.cn/open-apis/bot/v2/hook/c09712a8-22cd-4bfa-93a5-30ae7b1db11b"
+
+
+def df_to_feishu_table(df: pd.DataFrame) -> dict:
+    """
+    将 pandas DataFrame 转换为飞书卡片表格组件
+    """
+    # 构建列定义
+    columns = [{"name": col, "width": "auto"} for col in df.columns]
+
+    # 构建行数据
+    rows = []
+    for _, row in df.iterrows():
+        cells = [{"text": str(val)} for val in row.values]
+        rows.append({"cells": cells})
+
+    return {
+        "tag": "table",
+        "columns": columns,
+        "rows": rows
+    }
+
 
 def get_fish_pq_ip() -> List[str]:
     sql = "select * from base_config where config_key = 'fish_pq_ip_list';"
@@ -27,11 +53,47 @@ def get_all_reference_by_db() -> List[Dict[str, Any]]:
     return mysql_helper.execute_query(sql)
 
 
+def build_card_json(msg: str):
+    return {
+        "config": {},
+        "i18n_elements": {
+            "zh_cn": [
+                {
+                    "tag": "markdown",
+                    "content": "",
+                    "text_align": "left",
+                    "text_size": "normal"
+                },
+                {
+                    "tag": "markdown",
+                    "content": f"```\n{msg}\n```",
+                    "text_align": "left",
+                    "text_size": "normal"
+                }
+            ]
+        },
+        "i18n_header": {
+            "zh_cn": {
+                "title": {
+                    "tag": "plain_text",
+                    "content": "特征同步延迟告警"
+                },
+                "subtitle": {
+                    "tag": "plain_text",
+                    "content": ""
+                },
+                "template": "yellow"
+            }
+        }
+    }
+
+
 def _main():
     db_all_reference = get_all_reference_by_db()
     reference_id_and_text_map = {}
     all_ip = get_fish_pq_ip()
     print(f"当前配置的Fish服务器IP列表为: {all_ip}")
+    sync_fail_list = []
     for ip in all_ip:
         print(f"开始将音频同步到实例【{ip}】")
         fish_client = FishClient(f"http://{ip}:8080")
@@ -40,6 +102,11 @@ def _main():
             reference_id = reference_info['speaker_id']
             try:
                 if reference_id in exist_references_ids:
+                    sync_fail_list.append({
+                        "实例IP": ip,
+                        "音频ID": reference_id,
+                        "失败原因": "同步成功",
+                    })
                     print(f"音频ID【{reference_id}】在实例【{ip}】上已经存在,跳过")
                     continue
 
@@ -54,8 +121,25 @@ def _main():
                 print(f"音频ID【{reference_id}】同步到实例【{ip}】上完成")
             except Exception as e:
                 print(f"音频ID【{reference_id}】同步到实例【{ip}】上异常 {str(e)}")
+                sync_fail_list.append({
+                    "实例IP": ip,
+                    "音频ID": reference_id,
+                    "失败原因": str(e),
+                })
         print(f"将音频同步到实例【{ip}】完成")
 
+        # 同步失败的告警通知
+        df = pd.DataFrame(sync_fail_list)
+        msg = tabulate(df, headers='keys', tablefmt='grid', showindex=False)
+        print("同步失败的音频信息")
+        print(msg)
+        print("=" * 300)
+
+        feishu_inform_util.send_card_msg_to_feishu(
+            webhook=webhook_url,
+            card_json=build_card_json(msg)
+        )
+
 
 if __name__ == '__main__':
     _main()