import asyncio import json import logging import os import tempfile import uuid from datetime import datetime from pathlib import Path import websockets from filelock import FileLock from protocol import IMMessage, IMResponse from notifier import AgentNotifier, ConsoleNotifier logging.basicConfig(level=logging.INFO, format="%(asctime)s [CLIENT:%(name)s] %(message)s") class ChatWindow: """单个聊天窗口的数据管理。""" def __init__(self, chat_id: str, data_dir: Path): self.chat_id = chat_id self.data_dir = data_dir self.data_dir.mkdir(parents=True, exist_ok=True) self.chatbox_path = data_dir / "chatbox.jsonl" self.in_pending_path = data_dir / "in_pending.json" self.out_pending_path = data_dir / "out_pending.jsonl" # 文件锁 self._in_pending_lock = FileLock(str(data_dir / ".in_pending.lock")) self._out_pending_lock = FileLock(str(data_dir / ".out_pending.lock")) self._chatbox_lock = FileLock(str(data_dir / ".chatbox.lock")) # 初始化文件 if not self.chatbox_path.exists(): self.chatbox_path.write_text("") if not self.in_pending_path.exists(): self.in_pending_path.write_text("[]") if not self.out_pending_path.exists(): self.out_pending_path.write_text("") def append_to_in_pending(self, msg: dict): with self._in_pending_lock: pending = self._load_json_array(self.in_pending_path) pending.append(msg) self._atomic_write_json(self.in_pending_path, pending) def read_in_pending(self) -> list[dict]: with self._in_pending_lock: return self._load_json_array(self.in_pending_path) def clear_in_pending(self): with self._in_pending_lock: self._atomic_write_json(self.in_pending_path, []) def append_to_chatbox(self, msg: dict): with self._chatbox_lock: with open(self.chatbox_path, "a", encoding="utf-8") as f: f.write(json.dumps(msg, ensure_ascii=False) + "\n") def append_to_out_pending(self, msg: dict): with self._out_pending_lock: with open(self.out_pending_path, "a", encoding="utf-8") as f: f.write(json.dumps(msg, ensure_ascii=False) + "\n") @staticmethod def _load_json_array(path: Path) -> list: if not path.exists(): return [] text = path.read_text(encoding="utf-8").strip() if not text: return [] try: data = json.loads(text) return data if isinstance(data, list) else [] except json.JSONDecodeError: return [] @staticmethod def _atomic_write_json(path: Path, data): tmp_fd, tmp_path = tempfile.mkstemp(dir=str(path.parent), suffix=".tmp") try: with os.fdopen(tmp_fd, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) os.replace(tmp_path, str(path)) except Exception: if os.path.exists(tmp_path): os.unlink(tmp_path) raise class IMClient: """IM Client - 一个实例管理多个聊天窗口。 一个 Agent (contact_id) 对应一个 IMClient 实例。 该实例可以管理多个 chat_id(窗口),每个窗口有独立的消息存储。 """ def __init__( self, contact_id: str, server_url: str = "ws://localhost:8000", data_dir: str | None = None, notify_interval: float = 30.0, ): self.contact_id = contact_id self.server_url = server_url self.notify_interval = notify_interval self.base_dir = Path(data_dir) if data_dir else Path("data") / contact_id self.base_dir.mkdir(parents=True, exist_ok=True) # 窗口管理 self._windows: dict[str, ChatWindow] = {} self._notifiers: dict[str, AgentNotifier] = {} self.ws = None self.log = logging.getLogger(contact_id) self._send_queue = asyncio.Queue() def open_window(self, chat_id: str | None = None, notifier: AgentNotifier | None = None) -> str: """打开一个新窗口。 Args: chat_id: 窗口 ID(留空自动生成) notifier: 该窗口的通知器 Returns: 窗口的 chat_id """ if chat_id is None: chat_id = datetime.now().strftime("%Y%m%d_%H%M%S_") + uuid.uuid4().hex[:6] if chat_id in self._windows: return chat_id window_dir = self.base_dir / "windows" / chat_id self._windows[chat_id] = ChatWindow(chat_id, window_dir) self._notifiers[chat_id] = notifier or ConsoleNotifier() self.log.info(f"打开窗口: {chat_id}") return chat_id def close_window(self, chat_id: str): """关闭一个窗口。""" self._windows.pop(chat_id, None) self._notifiers.pop(chat_id, None) self.log.info(f"关闭窗口: {chat_id}") def list_windows(self) -> list[str]: """列出所有打开的窗口。""" return list(self._windows.keys()) async def run(self): """启动 Client 服务,自动重连。""" while True: try: # 连接时不带 chat_id,因为一个实例管理多个窗口 ws_url = f"{self.server_url}/ws?contact_id={self.contact_id}&chat_id=__multi__" self.log.info(f"连接 {ws_url} ...") async with websockets.connect(ws_url) as ws: self.ws = ws self.log.info("已连接") await asyncio.gather( self._ws_listener(), self._send_worker(), self._pending_notifier(), ) except (websockets.ConnectionClosed, ConnectionRefusedError, OSError) as e: self.log.warning(f"连接断开: {e}, 5 秒后重连...") self.ws = None await asyncio.sleep(5) except asyncio.CancelledError: self.log.info("服务停止") break async def _ws_listener(self): """监听 WebSocket,根据 receiver_chat_id 分发到对应窗口。""" async for raw in self.ws: try: data = json.loads(raw) except json.JSONDecodeError: self.log.warning(f"收到无效 JSON: {raw}") continue if "sender" in data and "receiver" in data: # 聊天消息 receiver_chat_id = data.get("receiver_chat_id") if receiver_chat_id and receiver_chat_id in self._windows: # 定向发送到指定窗口 window = self._windows[receiver_chat_id] window.append_to_in_pending(data) window.append_to_chatbox(data) self.log.info(f"收到消息 -> 窗口 {receiver_chat_id}: {data['sender']}") elif not receiver_chat_id: # 广播到所有窗口 for chat_id, window in self._windows.items(): window.append_to_in_pending(data) window.append_to_chatbox(data) self.log.info(f"收到消息 -> 广播到 {len(self._windows)} 个窗口: {data['sender']}") else: self.log.warning(f"收到消息但窗口 {receiver_chat_id} 不存在") elif "status" in data: # 发送回执 resp = IMResponse(**data) if resp.status == "success": self.log.info(f"消息 {resp.msg_id} 发送成功") else: self.log.warning(f"消息 {resp.msg_id} 发送失败: {resp.error}") async def _send_worker(self): """从队列取消息并发送。""" while True: msg_data = await self._send_queue.get() msg = IMMessage(sender=self.contact_id, **msg_data) try: await self.ws.send(msg.model_dump_json()) self.log.info(f"发送消息: -> {msg.receiver}:{msg.receiver_chat_id or '*'}") # 记录到发送方窗口的 chatbox if msg.sender_chat_id and msg.sender_chat_id in self._windows: self._windows[msg.sender_chat_id].append_to_chatbox(msg.model_dump()) except Exception as e: self.log.error(f"发送失败: {e}") if msg.sender_chat_id and msg.sender_chat_id in self._windows: self._windows[msg.sender_chat_id].append_to_out_pending(msg.model_dump()) async def _pending_notifier(self): """轮询各窗口的 in_pending,有新消息就调通知回调。""" while True: for chat_id, window in list(self._windows.items()): pending = window.read_in_pending() if pending: senders = list(set(m.get("sender", "unknown") for m in pending)) count = len(pending) notifier = self._notifiers.get(chat_id) if notifier: try: await notifier.notify(count=count, from_contacts=senders) except Exception as e: self.log.error(f"窗口 {chat_id} 通知回调异常: {e}") await asyncio.sleep(self.notify_interval) # ── Agent 调用的工具方法 ── def read_pending(self, chat_id: str) -> list[dict]: """读取某个窗口的待处理消息,并清空。""" window = self._windows.get(chat_id) if window is None: return [] pending = window.read_in_pending() if pending: window.clear_in_pending() return pending def send_message( self, chat_id: str, receiver: str, content: str, msg_type: str = "chat", receiver_chat_id: str | None = None, ): """从某个窗口发送消息。""" msg_data = { "sender_chat_id": chat_id, "receiver": receiver, "content": content, "msg_type": msg_type, "receiver_chat_id": receiver_chat_id, } self._send_queue.put_nowait(msg_data) def get_chat_history(self, chat_id: str, peer_id: str | None = None, limit: int = 20) -> list[dict]: """查询某个窗口的聊天历史。""" window = self._windows.get(chat_id) if window is None or not window.chatbox_path.exists(): return [] lines = window.chatbox_path.read_text(encoding="utf-8").strip().splitlines() messages = [] for line in reversed(lines): if not line.strip(): continue try: m = json.loads(line) except json.JSONDecodeError: continue if peer_id and m.get("sender") != peer_id and m.get("receiver") != peer_id: continue messages.append({ "sender": m.get("sender", "unknown"), "receiver": m.get("receiver", "unknown"), "content": m.get("content", ""), "msg_type": m.get("msg_type", "chat"), }) if len(messages) >= limit: break messages.reverse() return messages