import logging from fastapi import FastAPI, WebSocket, WebSocketDisconnect from protocol import IMMessage, IMResponse from contact_store import ContactStore logging.basicConfig(level=logging.INFO, format="%(asctime)s [SERVER] %(message)s") log = logging.getLogger(__name__) app = FastAPI() # 在线路由表: (contact_id, chat_id) -> WebSocket registry: dict[tuple[str, str], WebSocket] = {} # 联系人存储 contact_store = ContactStore() @app.get("/health") async def health(): # 返回格式: {contact_id: [chat_id1, chat_id2, ...]} online_map = {} for (contact_id, chat_id) in registry.keys(): online_map.setdefault(contact_id, []).append(chat_id) return {"status": "ok", "online": online_map} @app.post("/contacts/{user_id}/add") async def add_contact(user_id: str, contact_id: str): """添加联系人。""" contact_store.add_contact(user_id, contact_id) return {"status": "ok"} @app.get("/contacts/{user_id}") async def get_contacts(user_id: str): """获取联系人列表。""" contacts = contact_store.get_contacts(user_id) return {"contacts": contacts} @app.post("/chats/{user_id}/add") async def add_chat(user_id: str, contact_id: str, chat_id: str): """为某联系人添加新的 chat_id。""" contact_store.add_chat(user_id, contact_id, chat_id) return {"status": "ok"} @app.get("/chats/{user_id}/{contact_id}") async def get_chats(user_id: str, contact_id: str): """获取与某联系人的所有 chat_id。""" chats = contact_store.get_chats(user_id, contact_id) return {"chats": chats} @app.websocket("/ws") async def ws_endpoint(ws: WebSocket, contact_id: str, chat_id: str): await ws.accept() key = (contact_id, chat_id) # 如果同 (contact_id, chat_id) 已连接,踢掉旧连接 old = registry.pop(key, None) if old: try: await old.close(code=4001, reason="replaced") except Exception: pass registry[key] = ws log.info(f"{contact_id}:{chat_id} 上线 (当前在线: {len(registry)} 个窗口)") try: while True: data = await ws.receive_json() msg = IMMessage(**data) # 截取消息内容用于日志(避免日志过长) content_preview = (msg.content or "")[:150] if len(msg.content or "") > 150: content_preview += "..." log.info(f"收到消息: {msg.sender}:{msg.sender_chat_id} -> {msg.receiver}:{msg.receiver_chat_id or '*'}") log.info(f" 内容: {content_preview}") # 查找目标连接 if msg.receiver_chat_id: # 定向发送到指定窗口 target_key = (msg.receiver, msg.receiver_chat_id) target_ws = registry.get(target_key) targets = [(target_key, target_ws)] if target_ws else [] else: # 广播给该 contact_id 的所有窗口 targets = [(k, v) for k, v in registry.items() if k[0] == msg.receiver] if not targets: await ws.send_json( IMResponse(status="failed", msg_id=msg.msg_id, error="target_offline").model_dump() ) log.info(f"转发失败: {msg.receiver} 不在线") continue # 尝试发送到所有目标 success_count = 0 for target_key, target_ws in targets: try: await target_ws.send_json(msg.model_dump()) success_count += 1 except Exception: # 目标连接已死但未清理 registry.pop(target_key, None) log.warning(f"转发失败: {target_key} 连接已断, 已清理") if success_count > 0: await ws.send_json( IMResponse(status="success", msg_id=msg.msg_id).model_dump() ) log.info(f"转发成功: {msg.sender} -> {msg.receiver} ({success_count} 个窗口)") else: await ws.send_json( IMResponse(status="failed", msg_id=msg.msg_id, error="send_error").model_dump() ) except WebSocketDisconnect: registry.pop(key, None) log.info(f"{contact_id}:{chat_id} 下线 (当前在线: {len(registry)} 个窗口)") except Exception as e: registry.pop(key, None) log.error(f"{contact_id}:{chat_id} 异常断开: {e}")