from typing import List, Dict, Any import pandas as pd from tabulate import tabulate from client.FishClient import FishClient from helper.MySQLHelper import MySQLHelper from util import feishu_inform_util official_fish_client = FishClient("https://api.fish.audio") mysql_helper = MySQLHelper( host="rm-t4na9qj85v7790tf84o.mysql.singapore.rds.aliyuncs.com", username="readonly", password="HdkZ4TDmeK6SQ3BRtJBk", database="aigc-admin-prod" ) webhook_url = "https://open.feishu.cn/open-apis/bot/v2/hook/c09712a8-22cd-4bfa-93a5-30ae7b1db11b" def get_fish_pq_ip() -> List[str]: sql = "select * from base_config where config_key = 'fish_pq_ip_list';" result = mysql_helper.execute_query(sql) if not result: return [] value = result[0]['config_value'] return value.split(',') def get_all_reference_by_db() -> List[Dict[str, Any]]: sql = "select * from ai_model_tts where model = 33;" return mysql_helper.execute_query(sql) def build_card_json(msg: str): is_success = True if msg else False return { "config": {}, "i18n_elements": { "zh_cn": [ { "tag": "markdown", "content": "", "text_align": "left", "text_size": "normal" }, { "tag": "markdown", "content": f"```\n{msg}\n```" if is_success else "全部同步成功", "text_align": "left", "text_size": "normal" } ] }, "i18n_header": { "zh_cn": { "title": { "tag": "plain_text", "content": "Fish音频同步完成通知" }, "subtitle": { "tag": "plain_text", "content": "" }, "template": "green" if True else "yellow" } } } def _main(): db_all_reference = get_all_reference_by_db() reference_id_and_text_map = {} all_ip = get_fish_pq_ip() print(f"当前配置的Fish服务器IP列表为: {all_ip}") sync_fail_list = [] for ip in all_ip: print(f"开始将音频同步到实例【{ip}】") fish_client = FishClient(f"http://{ip}:8080") exist_references_ids = fish_client.get_all_references_id() for reference_info in db_all_reference: reference_id = reference_info['speaker_id'] try: if reference_id in exist_references_ids: print(f"音频ID【{reference_id}】在实例【{ip}】上已经存在,跳过") continue if reference_id not in reference_id_and_text_map: model_info = official_fish_client.get_model_info_by_id(reference_id) text = model_info['samples'][0]['text'] reference_id_and_text_map[reference_id] = text audio_url = reference_info['audio_url'] reference_text = reference_id_and_text_map[reference_id] fish_client.add_reference_id_by_url(reference_id=reference_id, reference_text=reference_text, audio_url=audio_url) print(f"音频ID【{reference_id}】同步到实例【{ip}】上完成") except Exception as e: print(f"音频ID【{reference_id}】同步到实例【{ip}】上异常 {str(e)}") sync_fail_list.append({ "实例IP": ip, "音频ID": reference_id, "失败原因": str(e), }) print(f"将音频同步到实例【{ip}】完成") # 同步失败的告警通知 df = pd.DataFrame(sync_fail_list) msg = tabulate(df, headers='keys', tablefmt='grid', showindex=False) print("同步失败的音频信息") print(msg) print("=" * 300) feishu_inform_util.send_card_msg_to_feishu( webhook=webhook_url, card_json=build_card_json(msg) ) if __name__ == '__main__': _main()