pqFunctions.py 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import asyncio
  6. import aiohttp
  7. async def asyncPost(url, headers, payload):
  8. """
  9. :param url:
  10. :param headers:
  11. :param payload:
  12. :return:
  13. """
  14. retries = 3
  15. async with aiohttp.ClientSession() as session:
  16. for attempt in range(3):
  17. try:
  18. async with session.post(url, headers=headers, json=payload, timeout=10) as response:
  19. return await response.json()
  20. except asyncio.TimeoutError:
  21. if attempt < retries - 1:
  22. await asyncio.sleep(2) # 等待一段时间后重试
  23. else:
  24. raise
  25. async def getPQVideoDetail(video_id):
  26. """
  27. 获取票圈视频详情信息
  28. :return:
  29. """
  30. url = "https://longvideoapi.piaoquantv.com/longvideoapi/openapi/video/batchSelectVideoInfo"
  31. data = {
  32. "videoIdList": [video_id]
  33. }
  34. header = {
  35. "Content-Type": "application/json",
  36. }
  37. response = await asyncPost(url, header, data)
  38. return response
  39. #
  40. #
  41. # response = asyncio.run(getPQVideoDetail(24543579))
  42. #
  43. # print(json.dumps(response, ensure_ascii=False, indent=4))