get_history_oss_path.py 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344
  1. from typing import Dict, List
  2. import aiohttp
  3. from applications.const import server_const
  4. async def request_for_fission_info(content_id: str) -> Dict:
  5. """
  6. 异步获取带有 fission 信息的已发布 OSS 路径列表。
  7. :param content_id: 内容的唯一标识符
  8. :return: 包含 OSS 路径列表及相关信息的字典
  9. """
  10. url = f"http://{server_const.NEW_SERVER_PUBLIC_IP}:{server_const.PORT}/oss_rank"
  11. async with aiohttp.ClientSession() as session:
  12. try:
  13. async with session.post(
  14. url,
  15. json={"contentId": content_id},
  16. headers={"Content-Type": "application/json"},
  17. ) as response:
  18. response.raise_for_status()
  19. return await response.json()
  20. except aiohttp.ClientError as e:
  21. print(f"请求失败: {e}")
  22. return {}
  23. except Exception as e:
  24. print(f"发生错误: {e}")
  25. return {}
  26. async def get_history_oss_path(content_id: str) -> List:
  27. """
  28. 获取content的oss_路径
  29. """
  30. fission_info = await request_for_fission_info(content_id=content_id)
  31. if not fission_info:
  32. return []
  33. else:
  34. response_code = fission_info["code"]
  35. if response_code == server_const.SUCCESS_CODE:
  36. return fission_info['oss_path_list']
  37. else:
  38. return []