from typing import Dict, List import aiohttp from applications.const import server_const async def request_for_fission_info(content_id: str) -> Dict: """ 异步获取带有 fission 信息的已发布 OSS 路径列表。 :param content_id: 内容的唯一标识符 :return: 包含 OSS 路径列表及相关信息的字典 """ url = f"http://{server_const.NEW_SERVER_PUBLIC_IP}:{server_const.PORT}/oss_rank" async with aiohttp.ClientSession() as session: try: async with session.post( url, json={"contentId": content_id}, headers={"Content-Type": "application/json"}, ) as response: response.raise_for_status() return await response.json() except aiohttp.ClientError as e: print(f"请求失败: {e}") return {} except Exception as e: print(f"发生错误: {e}") return {} async def get_history_oss_path(content_id: str) -> List: """ 获取content的oss_路径 """ fission_info = await request_for_fission_info(content_id=content_id) if not fission_info: return [] else: response_code = fission_info["code"] if response_code == server_const.SUCCESS_CODE: return fission_info['oss_path_list'] else: return []