| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128 |
- import logging
- from fastapi import FastAPI, WebSocket, WebSocketDisconnect
- from protocol import IMMessage, IMResponse
- from contact_store import ContactStore
- logging.basicConfig(level=logging.INFO, format="%(asctime)s [SERVER] %(message)s")
- log = logging.getLogger(__name__)
- app = FastAPI()
- # 在线路由表: (contact_id, chat_id) -> WebSocket
- registry: dict[tuple[str, str], WebSocket] = {}
- # 联系人存储
- contact_store = ContactStore()
- @app.get("/health")
- async def health():
- # 返回格式: {contact_id: [chat_id1, chat_id2, ...]}
- online_map = {}
- for (contact_id, chat_id) in registry.keys():
- online_map.setdefault(contact_id, []).append(chat_id)
- return {"status": "ok", "online": online_map}
- @app.post("/contacts/{user_id}/add")
- async def add_contact(user_id: str, contact_id: str):
- """添加联系人。"""
- contact_store.add_contact(user_id, contact_id)
- return {"status": "ok"}
- @app.get("/contacts/{user_id}")
- async def get_contacts(user_id: str):
- """获取联系人列表。"""
- contacts = contact_store.get_contacts(user_id)
- return {"contacts": contacts}
- @app.post("/chats/{user_id}/add")
- async def add_chat(user_id: str, contact_id: str, chat_id: str):
- """为某联系人添加新的 chat_id。"""
- contact_store.add_chat(user_id, contact_id, chat_id)
- return {"status": "ok"}
- @app.get("/chats/{user_id}/{contact_id}")
- async def get_chats(user_id: str, contact_id: str):
- """获取与某联系人的所有 chat_id。"""
- chats = contact_store.get_chats(user_id, contact_id)
- return {"chats": chats}
- @app.websocket("/ws")
- async def ws_endpoint(ws: WebSocket, contact_id: str, chat_id: str):
- await ws.accept()
- key = (contact_id, chat_id)
- # 如果同 (contact_id, chat_id) 已连接,踢掉旧连接
- old = registry.pop(key, None)
- if old:
- try:
- await old.close(code=4001, reason="replaced")
- except Exception:
- pass
- registry[key] = ws
- log.info(f"{contact_id}:{chat_id} 上线 (当前在线: {len(registry)} 个窗口)")
- try:
- while True:
- data = await ws.receive_json()
- msg = IMMessage(**data)
- # 截取消息内容用于日志(避免日志过长)
- content_preview = (msg.content or "")[:150]
- if len(msg.content or "") > 150:
- content_preview += "..."
- log.info(f"收到消息: {msg.sender}:{msg.sender_chat_id} -> {msg.receiver}:{msg.receiver_chat_id or '*'}")
- log.info(f" 内容: {content_preview}")
- # 查找目标连接
- if msg.receiver_chat_id:
- # 定向发送到指定窗口
- target_key = (msg.receiver, msg.receiver_chat_id)
- target_ws = registry.get(target_key)
- targets = [(target_key, target_ws)] if target_ws else []
- else:
- # 广播给该 contact_id 的所有窗口
- targets = [(k, v) for k, v in registry.items() if k[0] == msg.receiver]
- if not targets:
- await ws.send_json(
- IMResponse(status="failed", msg_id=msg.msg_id, error="target_offline").model_dump()
- )
- log.info(f"转发失败: {msg.receiver} 不在线")
- continue
- # 尝试发送到所有目标
- success_count = 0
- for target_key, target_ws in targets:
- try:
- await target_ws.send_json(msg.model_dump())
- success_count += 1
- except Exception:
- # 目标连接已死但未清理
- registry.pop(target_key, None)
- log.warning(f"转发失败: {target_key} 连接已断, 已清理")
- if success_count > 0:
- await ws.send_json(
- IMResponse(status="success", msg_id=msg.msg_id).model_dump()
- )
- log.info(f"转发成功: {msg.sender} -> {msg.receiver} ({success_count} 个窗口)")
- else:
- await ws.send_json(
- IMResponse(status="failed", msg_id=msg.msg_id, error="send_error").model_dump()
- )
- except WebSocketDisconnect:
- registry.pop(key, None)
- log.info(f"{contact_id}:{chat_id} 下线 (当前在线: {len(registry)} 个窗口)")
- except Exception as e:
- registry.pop(key, None)
- log.error(f"{contact_id}:{chat_id} 异常断开: {e}")
|