zhaohaipeng преди 1 седмица
родител
ревизия
0d37cf4c05
променени са 4 файла, в които са добавени 272 реда и са изтрити 154 реда
  1. 127 0
      monitor/supply_workflow_monitor.py
  2. 18 6
      script/dnn_model_warm_up.py
  3. 0 121
      script/fish_reference_audio_sync.py
  4. 127 27
      script/tts_test.py

+ 127 - 0
monitor/supply_workflow_monitor.py

@@ -0,0 +1,127 @@
+import unicodedata
+from datetime import datetime
+from typing import Dict, List, Any
+
+import pandas as pd
+
+from helper.MySQLHelper import MySQLHelper
+from util import feishu_inform_util
+
+fei_shu_webhook = "https://open.feishu.cn/open-apis/bot/v2/hook/c09712a8-22cd-4bfa-93a5-30ae7b1db11b"
+
+mysql_helper = MySQLHelper(
+    host="rm-t4na9qj85v7790tf84o.mysql.singapore.rds.aliyuncs.com",
+    username="readonly",
+    password="HdkZ4TDmeK6SQ3BRtJBk",
+    database="aigc-admin-prod"
+)
+
+
+def _display_width(s: str) -> int:
+    """计算字符串在终端中的显示宽度,CJK字符占2宽度,其余占1"""
+    w = 0
+    for ch in s:
+        w += 2 if unicodedata.east_asian_width(ch) in ('F', 'W', 'A') else 1
+    return w
+
+
+def _pad_to_width(s: str, target_width: int) -> str:
+    """将字符串右侧填充空格至指定的显示宽度"""
+    return s + ' ' * (target_width - _display_width(s))
+
+
+def print_df_table(df: pd.DataFrame, fmt: str = "grid") -> str:
+    """将DataFrame转换为对齐的二维表格字符串,自适应中英文混排宽度
+
+    Args:
+        df: pandas DataFrame
+        fmt: 输出格式,'grid' 为终端对齐表格,'markdown' 为 Markdown 表格(适合飞书渲染)
+    """
+    headers = list(df.columns)
+    str_rows = df.astype(str).values
+
+    if fmt == "markdown":
+        char_widths = []
+        for i, h in enumerate(headers):
+            max_w = len(str(h))
+            for row in str_rows:
+                max_w = max(max_w, len(str(row[i])))
+            char_widths.append(max_w)
+
+        def _md_cell(v, w):
+            return str(v).ljust(w)
+
+        header_cells = [_md_cell(str(h), char_widths[i]) for i, h in enumerate(headers)]
+        sep_cells = ['-' * w for w in char_widths]
+        lines = ['| ' + ' | '.join(header_cells) + ' |',
+                 '| ' + ' | '.join(sep_cells) + ' |']
+        for row in str_rows:
+            cells = [_md_cell(str(row[i]), char_widths[i]) for i in range(len(headers))]
+            lines.append('| ' + ' | '.join(cells) + ' |')
+        return '\n'.join(lines)
+
+    col_widths = []
+    for i, h in enumerate(headers):
+        max_w = _display_width(str(h))
+        for row in str_rows:
+            max_w = max(max_w, _display_width(str(row[i])))
+        col_widths.append(max_w)
+
+    sep = '+' + '+'.join('-' * (w + 2) for w in col_widths) + '+'
+
+    def _row(values):
+        cells = [_pad_to_width(str(v), col_widths[i]) for i, v in enumerate(values)]
+        return '| ' + ' | '.join(cells) + ' |'
+
+    lines = [sep, _row(headers), sep]
+    for row in str_rows:
+        lines.append(_row(row))
+    lines.append(sep)
+    return '\n'.join(lines)
+
+
+def task_exe_step_stat(ts: int) -> List[Dict[str, Any]]:
+    sql = f'''
+        select step_name AS "步骤名称",
+           case
+               when status = 0 then '初始化'
+               when status = 1 then '运行中'
+               when status = 2 then '成功'
+               when status = 3 then '失败'
+               else '未知'
+               end AS '执行状态',
+           case
+               when error_msg like '%Data too long%' then '数据超过字段长度限制'
+               when error_msg like '%Deadlock%' then '数据库死锁'
+               when error_msg = '' then ''
+               else '其他错误'
+               end AS '错误原因',
+           cnt AS '个数'
+    from (
+             select step_name, status, error_msg, count(1) as cnt
+             from supply_workflow_task_exe_step
+             where create_timestamp >= {ts}
+             group by step_name, status, error_msg
+         ) as t
+        '''
+    return mysql_helper.execute_query(sql)
+
+
+def main():
+    today_midnight = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
+    timestamp_ms = int(today_midnight.timestamp() * 1000)
+    stat = task_exe_step_stat(timestamp_ms)
+    df = pd.DataFrame(stat)
+
+    print("当日任务步骤执行统计")
+
+    msg = print_df_table(df, fmt="grid")
+
+    feishu_inform_util.send_card_msg_to_feishu(
+        webhook=fei_shu_webhook,
+        card_json=feishu_inform_util.build_card_json(msg, "当日任务步骤执行统计")
+    )
+
+
+if __name__ == '__main__':
+    main()

+ 18 - 6
script/dnn_model_warm_up.py

@@ -1,25 +1,37 @@
 #!/usr/bin/env python
 import json
 
+import pandas as pd
 from eas_prediction import TFRequest
 
-shape = 5
 if __name__ == '__main__':
     fg_config = {}
-    with open("/Users/zhao/Downloads/feature_list_20260424.json", "r") as f:
+    with open("/Users/zhao/Downloads/feature_list_20260403.json", "r") as f:
         fg_config = json.load(f)
 
+    df = pd.read_csv("/Users/zhao/Downloads/DataWorks_SQL查询_结果3_20260515170950_0.csv").sample(n=500, random_state=50)
+    df['r_vid'] = df['vid'].copy()
+
+    shape = df.shape[0]
+
     req = TFRequest('serving_default')
 
     for feature_info in fg_config['features']:
         feature_name = feature_info['feature_name']
         value_type = feature_info['value_type']
         if value_type == 'Double':
-            req.add_feed(feature_name, [shape], TFRequest.DT_DOUBLE, [0.0] * shape)
+            df[feature_name] = pd.to_numeric(df[feature_name], errors='coerce').fillna(0).astype(float)
+            content = df[feature_name].tolist()
+
+            req.add_feed(feature_name, [shape], TFRequest.DT_DOUBLE, content)
         else:
-            req.add_feed(feature_name, [shape], TFRequest.DT_STRING, [b"-1024"] * shape)
+            df[feature_name] = df[feature_name].astype(object).fillna('-1024').astype(str)
+            content = [s.encode('utf-8') for s in df[feature_name].tolist()]
+
+            req.add_feed(feature_name, [shape], TFRequest.DT_STRING, content)
 
-    req.add_fetch('probs_pLeave')
+    req.add_fetch('y_return_n_uv')
+    req.add_fetch('probs_is_share')
 
-    with open("/Users/zhao/Desktop/warm_up_20260424.bin", "wb") as fw:
+    with open("/Users/zhao/Desktop/warm_up_20260413.bin", "wb") as fw:
         fw.write(req.to_string())

+ 0 - 121
script/fish_reference_audio_sync.py

@@ -1,121 +0,0 @@
-from typing import List, Dict, Any
-
-import pandas as pd
-from tabulate import tabulate
-
-from client.FishClient import FishClient
-from helper.MySQLHelper import MySQLHelper
-from util import feishu_inform_util
-
-official_fish_client = FishClient("https://api.fish.audio")
-
-mysql_helper = MySQLHelper(
-    host="rm-t4na9qj85v7790tf84o.mysql.singapore.rds.aliyuncs.com",
-    username="readonly",
-    password="HdkZ4TDmeK6SQ3BRtJBk",
-    database="aigc-admin-prod"
-)
-
-webhook_url = "https://open.feishu.cn/open-apis/bot/v2/hook/c09712a8-22cd-4bfa-93a5-30ae7b1db11b"
-
-
-def get_fish_pq_ip() -> List[str]:
-    sql = "select * from base_config where config_key = 'fish_pq_ip_list';"
-    result = mysql_helper.execute_query(sql)
-    if not result:
-        return []
-    value = result[0]['config_value']
-    return value.split(',')
-
-
-def get_all_reference_by_db() -> List[Dict[str, Any]]:
-    sql = "select * from ai_model_tts where model = 33;"
-    return mysql_helper.execute_query(sql)
-
-
-def build_card_json(msg: str):
-    is_success = True if msg else False
-    return {
-        "config": {},
-        "i18n_elements": {
-            "zh_cn": [
-                {
-                    "tag": "markdown",
-                    "content": "",
-                    "text_align": "left",
-                    "text_size": "normal"
-                },
-                {
-                    "tag": "markdown",
-                    "content": f"```\n{msg}\n```" if is_success else "全部同步成功",
-                    "text_align": "left",
-                    "text_size": "normal"
-                }
-            ]
-        },
-        "i18n_header": {
-            "zh_cn": {
-                "title": {
-                    "tag": "plain_text",
-                    "content": "Fish音频同步完成通知"
-                },
-                "subtitle": {
-                    "tag": "plain_text",
-                    "content": ""
-                },
-                "template": "green" if True else "yellow"
-            }
-        }
-    }
-
-
-def _main():
-    db_all_reference = get_all_reference_by_db()
-    reference_id_and_text_map = {}
-    all_ip = get_fish_pq_ip()
-    print(f"当前配置的Fish服务器IP列表为: {all_ip}")
-    sync_fail_list = []
-    for ip in all_ip:
-        print(f"开始将音频同步到实例【{ip}】")
-        fish_client = FishClient(f"http://{ip}:8080")
-        exist_references_ids = fish_client.get_all_references_id()
-        for reference_info in db_all_reference:
-            reference_id = reference_info['speaker_id']
-            try:
-                if reference_id in exist_references_ids:
-                    print(f"音频ID【{reference_id}】在实例【{ip}】上已经存在,跳过")
-                    continue
-
-                if reference_id not in reference_id_and_text_map:
-                    model_info = official_fish_client.get_model_info_by_id(reference_id)
-                    text = model_info['samples'][0]['text']
-                    reference_id_and_text_map[reference_id] = text
-
-                audio_url = reference_info['audio_url']
-                reference_text = reference_id_and_text_map[reference_id]
-                fish_client.add_reference_id_by_url(reference_id=reference_id, reference_text=reference_text, audio_url=audio_url)
-                print(f"音频ID【{reference_id}】同步到实例【{ip}】上完成")
-            except Exception as e:
-                print(f"音频ID【{reference_id}】同步到实例【{ip}】上异常 {str(e)}")
-                sync_fail_list.append({
-                    "实例IP": ip,
-                    "音频ID": reference_id,
-                    "失败原因": str(e),
-                })
-        print(f"将音频同步到实例【{ip}】完成")
-
-    # 同步失败的告警通知
-    df = pd.DataFrame(sync_fail_list)
-    msg = tabulate(df, headers='keys', tablefmt='grid', showindex=False)
-    print("同步失败的音频信息")
-    print(msg)
-    print("=" * 300)
-
-    feishu_inform_util.send_card_msg_to_feishu(
-        webhook=webhook_url,
-        card_json=build_card_json(msg)
-    )
-
-
-if __name__ == '__main__':
-    _main()

+ 127 - 27
script/tts_test.py

@@ -1,44 +1,144 @@
-from pathlib import Path
+from __future__ import print_function
+
 from typing import List, Dict, Any
 
 import pandas as pd
+import requests
+import volcenginesdkcore
+import volcenginesdkspeechsaasprod
+import volcenginesdkspeechsaasprod20250521
+from volcenginesdkcore.rest import ApiException
 
-from util import file_util
+from helper.MySQLHelper import MySQLHelper
 
-base_dir = "/Users/zhao/Desktop/tzld/TTS"
+mysql_helper = MySQLHelper(
+    host="rm-t4na9qj85v7790tf84o.mysql.singapore.rds.aliyuncs.com",
+    username="readonly",
+    password="HdkZ4TDmeK6SQ3BRtJBk",
+    database="aigc-admin-prod"
+)
 
-test_sample_csv = f"{base_dir}/test_sample.csv"
-reference_path = f"{base_dir}/audio"
+ak = "AKLTZWIxNWRkMzUyYjBmNGU2Yjk5MTFiYWVmNmNiY2Q1Njg"
+sk = "WW1NM1l6TTJNRFZrT0dFMk5HSXhZamt5TnpFd1kyWTNPR0V6TURZd056Yw=="
 
+configuration = volcenginesdkcore.Configuration()
+configuration.ak = ak
+configuration.sk = sk
+configuration.region = "cn-beijing"
+# set default configuration
+volcenginesdkcore.Configuration.set_default(configuration)
 
-def read_test_sample(csv_file: Path) -> List[Dict[str, Any]]:
-    if csv_file.exists():
-        df = pd.read_csv(csv_file)
-        return df.to_dict(orient="records")
-    else:
+
+def add_speaker(audio_url: str, speaker: str):
+    url = "https://aigc-api.aiddit.com/aigc/resources/aiAccount/saveTts"
+
+    payload = {
+        "params": {
+            "model": 34,
+            "ttsName": speaker,
+            "trainAudioUrl": audio_url
+        },
+        "baseInfo": {
+            "token": "80ce2034892c4428ab5b6e39ec0a9e2d",
+        }
+    }
+
+    response = requests.post(url, headers={}, json=payload)
+    response.raise_for_status()  # 如果状态码不是 2xx,抛出异常
+    return response.json()
+
+
+def speaker_insert_db(speaker_id: str):
+    sql = f'INSERT INTO `aigc-admin-prod`.volcengine_tts_speaker (speaker_id, type, status, create_time, update_time) VALUES ("{speaker_id}", 2, 0, "2026-05-13 21:08:00", "2026-05-13 21:08:00");'
+    print(sql)
+
+
+def get_all_speakers() -> List[Dict[str, Any]]:
+    api_instance = volcenginesdkspeechsaasprod20250521.SPEECHSAASPROD20250521Api()
+    batch_list_mega_tts_train_status_request = volcenginesdkspeechsaasprod20250521.BatchListMegaTTSTrainStatusRequest(
+        state='Success',
+        page_size=100,
+        page_number=1,
+        project_name="aiddit",
+    )
+
+    try:
+        # 复制代码运行示例,请自行打印API返回值。
+        response = api_instance.batch_list_mega_tts_train_status(batch_list_mega_tts_train_status_request)
+        result = []
+        for item in response.statuses:
+            result.append({
+                "speaker_id": item.speaker_id,
+                "alias": item.alias,
+                "instance_no": item.instance_no,
+            })
+        return result
+    except ApiException as e:
+        print("Exception when calling api: %s\n" % e)
         return []
 
 
-def reference_audio_download(audio_url: str, local_file_path: str):
-    file_util.download_file(audio_url, local_file_path)
+def update_tts_alias(tts_id: str, alias: str):
+    # use global default configuration
+    api_instance = volcenginesdkspeechsaasprod.SPEECHSAASPRODApi()
+    alias_resource_pack_request = volcenginesdkspeechsaasprod.AliasResourcePackRequest(
+        alias=alias,
+        instance_number="",
+        project_name="aiddit",
+        train_id=tts_id,
+    )
+
+    try:
+        response = api_instance.alias_resource_pack(alias_resource_pack_request)
+        # response.raise_for_status()  # 如果状态码不是 2xx,抛出异常
+        return {}
+    except ApiException as e:
+        print("Exception when calling api: %s\n" % e)
+        return {}
+
+
+def get_volc_engine_tts_info(tts_name: str, mode: int) -> Dict[str, Any]:
+    sql = f"select * from ai_model_tts where speaker = '{tts_name}' and model = {mode};"
+    results = mysql_helper.execute_query(sql)
+    if results:
+        return results[0]
+    else:
+        return {}
+
+
+def get_tts_info(tts_id: str) -> Dict[str, Any]:
+    sql = f"select * from ai_model_tts where id = '{tts_id}'"
+    results = mysql_helper.execute_query(sql)
+    return results[0]
+
+
+def read_tts_id() -> List[str]:
+    df = pd.read_csv("/Users/zhao/Desktop/fish_tts.csv")
+    return df['tts_id'].tolist()
 
 
 def main():
-    df = pd.read_csv(test_sample_csv)
-
-    speaker_counts = df["speaker"].value_counts()
-    print(f"{'speaker':<30} count")
-    print("-" * 38)
-    for speaker, count in speaker_counts.items():
-        print(f"{speaker:<30} {count}")
-
-    # test_sample_list = read_test_sample(Path(test_sample_csv))
-    # for item in test_sample_list:
-    #     speaker = item["speaker"]
-    #     audio_url = item["audio_url"]
-    #     result = item["result"]
-    #     txt = item["txt"]
-    #     reference_audio_download(audio_url, local_file_path=f"{reference_path}/{speaker}.mp3")
+    speaker_id_alias_map = {}
+    for item in get_all_speakers():
+        speaker_id = item['speaker_id']
+        alias = item['alias']
+        speaker_id_alias_map[speaker_id] = alias
+
+    df = pd.read_csv("/Users/zhao/Desktop/aigc_admin_prod_ai_model_tts.csv")
+    dict_list = df.to_dict(orient='records')
+    for item in dict_list:
+        speaker_id = item['speaker_id']
+        alias = item['speaker']
+        if speaker_id not in speaker_id_alias_map:
+            print(f"{speaker_id} not in speaker_id_alias_map")
+            continue
+        volc_alias = speaker_id_alias_map[speaker_id]
+        if volc_alias == alias:
+            print(f"{volc_alias} == {alias}")
+            continue
+        print(f'更新 {speaker_id} 的别名为 {alias}')
+        response = update_tts_alias(speaker_id, alias)
+        print(f'{speaker_id} -> {alias} -> {response}')
 
 
 if __name__ == '__main__':