main.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. import logging
  2. from fastapi import FastAPI, WebSocket, WebSocketDisconnect
  3. from protocol import IMMessage, IMResponse
  4. from contact_store import ContactStore
  5. logging.basicConfig(level=logging.INFO, format="%(asctime)s [SERVER] %(message)s")
  6. log = logging.getLogger(__name__)
  7. app = FastAPI()
  8. # 在线路由表: (contact_id, chat_id) -> WebSocket
  9. registry: dict[tuple[str, str], WebSocket] = {}
  10. # 联系人存储
  11. contact_store = ContactStore()
  12. @app.get("/health")
  13. async def health():
  14. # 返回格式: {contact_id: [chat_id1, chat_id2, ...]}
  15. online_map = {}
  16. for (contact_id, chat_id) in registry.keys():
  17. online_map.setdefault(contact_id, []).append(chat_id)
  18. return {"status": "ok", "online": online_map}
  19. @app.post("/contacts/{user_id}/add")
  20. async def add_contact(user_id: str, contact_id: str):
  21. """添加联系人。"""
  22. contact_store.add_contact(user_id, contact_id)
  23. return {"status": "ok"}
  24. @app.get("/contacts/{user_id}")
  25. async def get_contacts(user_id: str):
  26. """获取联系人列表。"""
  27. contacts = contact_store.get_contacts(user_id)
  28. return {"contacts": contacts}
  29. @app.post("/chats/{user_id}/add")
  30. async def add_chat(user_id: str, contact_id: str, chat_id: str):
  31. """为某联系人添加新的 chat_id。"""
  32. contact_store.add_chat(user_id, contact_id, chat_id)
  33. return {"status": "ok"}
  34. @app.get("/chats/{user_id}/{contact_id}")
  35. async def get_chats(user_id: str, contact_id: str):
  36. """获取与某联系人的所有 chat_id。"""
  37. chats = contact_store.get_chats(user_id, contact_id)
  38. return {"chats": chats}
  39. @app.websocket("/ws")
  40. async def ws_endpoint(ws: WebSocket, contact_id: str, chat_id: str):
  41. await ws.accept()
  42. key = (contact_id, chat_id)
  43. # 如果同 (contact_id, chat_id) 已连接,踢掉旧连接
  44. old = registry.pop(key, None)
  45. if old:
  46. try:
  47. await old.close(code=4001, reason="replaced")
  48. except Exception:
  49. pass
  50. registry[key] = ws
  51. log.info(f"{contact_id}:{chat_id} 上线 (当前在线: {len(registry)} 个窗口)")
  52. try:
  53. while True:
  54. data = await ws.receive_json()
  55. msg = IMMessage(**data)
  56. # 截取消息内容用于日志(避免日志过长)
  57. content_preview = (msg.content or "")[:150]
  58. if len(msg.content or "") > 150:
  59. content_preview += "..."
  60. log.info(f"收到消息: {msg.sender}:{msg.sender_chat_id} -> {msg.receiver}:{msg.receiver_chat_id or '*'}")
  61. log.info(f" 内容: {content_preview}")
  62. # 查找目标连接
  63. if msg.receiver_chat_id:
  64. # 定向发送到指定窗口
  65. target_key = (msg.receiver, msg.receiver_chat_id)
  66. target_ws = registry.get(target_key)
  67. targets = [(target_key, target_ws)] if target_ws else []
  68. else:
  69. # 广播给该 contact_id 的所有窗口
  70. targets = [(k, v) for k, v in registry.items() if k[0] == msg.receiver]
  71. if not targets:
  72. await ws.send_json(
  73. IMResponse(status="failed", msg_id=msg.msg_id, error="target_offline").model_dump()
  74. )
  75. log.info(f"转发失败: {msg.receiver} 不在线")
  76. continue
  77. # 尝试发送到所有目标
  78. success_count = 0
  79. for target_key, target_ws in targets:
  80. try:
  81. await target_ws.send_json(msg.model_dump())
  82. success_count += 1
  83. except Exception:
  84. # 目标连接已死但未清理
  85. registry.pop(target_key, None)
  86. log.warning(f"转发失败: {target_key} 连接已断, 已清理")
  87. if success_count > 0:
  88. await ws.send_json(
  89. IMResponse(status="success", msg_id=msg.msg_id).model_dump()
  90. )
  91. log.info(f"转发成功: {msg.sender} -> {msg.receiver} ({success_count} 个窗口)")
  92. else:
  93. await ws.send_json(
  94. IMResponse(status="failed", msg_id=msg.msg_id, error="send_error").model_dump()
  95. )
  96. except WebSocketDisconnect:
  97. registry.pop(key, None)
  98. log.info(f"{contact_id}:{chat_id} 下线 (当前在线: {len(registry)} 个窗口)")
  99. except Exception as e:
  100. registry.pop(key, None)
  101. log.error(f"{contact_id}:{chat_id} 异常断开: {e}")