1234567891011121314151617181920212223242526272829303132333435363738394041424344 |
- from typing import Dict, List
- import aiohttp
- from applications.const import server_const
- async def request_for_fission_info(content_id: str) -> Dict:
- """
- 异步获取带有 fission 信息的已发布 OSS 路径列表。
- :param content_id: 内容的唯一标识符
- :return: 包含 OSS 路径列表及相关信息的字典
- """
- url = f"http://{server_const.NEW_SERVER_PUBLIC_IP}:{server_const.PORT}/oss_rank"
- async with aiohttp.ClientSession() as session:
- try:
- async with session.post(
- url,
- json={"contentId": content_id},
- headers={"Content-Type": "application/json"},
- ) as response:
- response.raise_for_status()
- return await response.json()
- except aiohttp.ClientError as e:
- print(f"请求失败: {e}")
- return {}
- except Exception as e:
- print(f"发生错误: {e}")
- return {}
- async def get_history_oss_path(content_id: str) -> List:
- """
- 获取content的oss_路径
- """
- fission_info = await request_for_fission_info(content_id=content_id)
- if not fission_info:
- return []
- else:
- response_code = fission_info["code"]
- if response_code == server_const.SUCCESS_CODE:
- return fission_info['oss_path_list']
- else:
- return []
|