sse_client.py 3.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. import json
  2. import uuid
  3. import requests
  4. def sse_request(url, headers=None, params=None, json_data=None, method="POST"):
  5. """
  6. 调用 SSE 接口,逐事件 yield 出解析后的字典。
  7. Args:
  8. url: 接口地址
  9. headers: 请求头
  10. params: URL 查询参数
  11. json_data: POST 的 JSON body
  12. method: "POST" 或 "GET"
  13. Yields:
  14. dict: {"event": "...", "data": ..., "id": "..."}
  15. """
  16. headers = headers or {}
  17. headers.setdefault("Accept", "text/event-stream")
  18. headers.setdefault("Content-Type", "application/json")
  19. session = requests.Session()
  20. response = None
  21. try:
  22. if method.upper() == "GET":
  23. response = session.get(url, headers=headers, params=params, stream=True)
  24. else:
  25. response = session.post(url, headers=headers, params=params, json=json_data, stream=True)
  26. response.raise_for_status()
  27. event = None
  28. data_buf = []
  29. event_id = None
  30. for line in response.iter_lines(decode_unicode=True):
  31. if line is None:
  32. continue
  33. if line.startswith("id:"):
  34. event_id = line[3:].strip()
  35. elif line.startswith("event:"):
  36. event = line[6:].strip()
  37. elif line.startswith("data:"):
  38. data_buf.append(line[5:].strip())
  39. elif line == "": # 空行 = 事件结束
  40. if data_buf:
  41. raw = "\n".join(data_buf)
  42. try:
  43. parsed_data = json.loads(raw)
  44. except json.JSONDecodeError:
  45. parsed_data = raw
  46. yield {"event": event, "data": parsed_data, "id": event_id}
  47. event = None
  48. data_buf = []
  49. event_id = None
  50. finally:
  51. if response is not None:
  52. response.close()
  53. session.close()
  54. if __name__ == '__main__':
  55. headers = {
  56. "X-Api-Key": "941b57b2-f4c7-411e-9cf6-ebba6e6f1dee",
  57. "X-Api-Resource-Id": "seed-icl-2.0",
  58. "X-Api-Request-Id": uuid.uuid4().hex.replace("-", "")
  59. }
  60. params = {
  61. "req_params": {
  62. "text": "这个视频真是说到咱们心坎里去了!一位80岁的老教授,用她一生的智慧,总结了晚年幸福的10个“不要”,每一条都让人醍醐灌顶,听完让人倍感振奋!这可不是空话套话,句句都是大实话,真是一语点醒梦中人啊!尤其是身体健康、金钱和子女相处这些事,咱们上了年纪的人,哪个没遇到过困惑?\n\n如果你也觉得说得太对了,赶紧点击下方黄色按钮转发到你所有的微信群里,再点击下方绿色按钮发给身边的亲人朋友!你这一发,也许就点醒了身边的老伴、兄弟姐妹,或是你家里的长辈,让他们少走弯路,少吃亏,晚年能过得舒心又体面!想想看,要是他们早点听到这些忠告,是不是就能避免很多烦恼和矛盾?\n\n视频里说的这些,都是咱们普遍会遇到的经历和痛点,很多问题可能我们现在就正面临着呢,听了老教授的话,瞬间感觉心里亮堂多了!比如不要轻易借钱给别人,不要卖掉老房子,不要盲目去免费旅游,这都是血淋淋的教训啊!咱们这代人,吃了一辈子的苦,到老了就图个安稳和自在,这些经验教训可都是宝贵的财富!\n\n所以,别让这份难得的智慧在我们手里断了,让它像火炬一样传递下去,照亮更多人的晚年生活!您的每一次分享,都是一份善举,一份对亲友的真心关爱。快快转",
  63. "speaker": "S_7lCIz2422",
  64. "audio_params": {
  65. "format": "mp3"
  66. },
  67. }
  68. }
  69. sse_request("https://openspeech.bytedance.com/api/v3/tts/unidirectional/sse", headers, {}, params)