import asyncio import json import logging import os import tempfile import uuid from datetime import datetime from pathlib import Path import websockets from filelock import FileLock from protocol import IMMessage, IMResponse from notifier import AgentNotifier, ConsoleNotifier logging.basicConfig(level=logging.INFO, format="%(asctime)s [CLIENT:%(name)s] %(message)s") class IMClient: """IM Client 长驻服务。 通过 WebSocket 连接 Server,通过文件与 Agent 交互。 文件约定 (data/{contact_id}/): chatbox.jsonl — 所有消息历史(收发都记录) in_pending.json — 收到的待处理消息 (JSON 数组) out_pending.jsonl — 发送失败的消息 窗口模式 (window_mode=True): 每次运行生成新的 chat_id,消息按 chat_id 隔离 文件结构变为: data/{contact_id}/{chat_id}/... """ def __init__( self, contact_id: str, server_url: str = "ws://localhost:8000", data_dir: str | None = None, notifier: AgentNotifier | None = None, notify_interval: float = 30.0, window_mode: bool = False, chat_id: str | None = None, ): self.contact_id = contact_id self.server_url = server_url self.notifier = notifier or ConsoleNotifier() self.notify_interval = notify_interval self.window_mode = window_mode # 窗口模式:生成或使用指定的 chat_id base_dir = Path(data_dir) if data_dir else Path("data") / contact_id if window_mode: self.chat_id = chat_id or datetime.now().strftime("%Y%m%d_%H%M%S_") + uuid.uuid4().hex[:6] self.data_dir = base_dir / "windows" / self.chat_id else: self.chat_id = None self.data_dir = base_dir self.data_dir.mkdir(parents=True, exist_ok=True) self.chatbox_path = self.data_dir / "chatbox.jsonl" self.in_pending_path = self.data_dir / "in_pending.json" self.out_pending_path = self.data_dir / "out_pending.jsonl" # 文件锁 self._in_pending_lock = FileLock(str(self.data_dir / ".in_pending.lock")) self._out_pending_lock = FileLock(str(self.data_dir / ".out_pending.lock")) self._chatbox_lock = FileLock(str(self.data_dir / ".chatbox.lock")) self.ws = None self.log = logging.getLogger(f"{contact_id}:{self.chat_id}" if self.chat_id else contact_id) self._send_queue = asyncio.Queue() # 初始化文件 if not self.chatbox_path.exists(): self.chatbox_path.write_text("") if not self.in_pending_path.exists(): self.in_pending_path.write_text("[]") if not self.out_pending_path.exists(): self.out_pending_path.write_text("") async def run(self): """启动 Client 服务,自动重连。""" while True: try: # 构造 WebSocket URL,带上 chat_id 参数 chat_id_param = self.chat_id or "default" ws_url = f"{self.server_url}/ws?contact_id={self.contact_id}&chat_id={chat_id_param}" self.log.info(f"连接 {ws_url} ...") async with websockets.connect(ws_url) as ws: self.ws = ws self.log.info("已连接") await asyncio.gather( self._ws_listener(), self._send_worker(), self._pending_notifier(), ) except (websockets.ConnectionClosed, ConnectionRefusedError, OSError) as e: self.log.warning(f"连接断开: {e}, 5 秒后重连...") self.ws = None await asyncio.sleep(5) except asyncio.CancelledError: self.log.info("服务停止") break # ── 协程 1: WebSocket 收消息 ── async def _ws_listener(self): """监听 WebSocket,聊天消息写 in_pending 和 chatbox,回执打日志。""" async for raw in self.ws: try: data = json.loads(raw) except json.JSONDecodeError: self.log.warning(f"收到无效 JSON: {raw}") continue if "sender" in data and "receiver" in data: # 聊天消息 self.log.info(f"收到消息: {data['sender']} -> {data['content'][:50]}") self._append_to_in_pending(data) self._append_to_chatbox(data) elif "status" in data: # 发送回执 resp = IMResponse(**data) if resp.status == "success": self.log.info(f"消息 {resp.msg_id} 发送成功") else: self.log.warning(f"消息 {resp.msg_id} 发送失败: {resp.error}") # ── 协程 2: 发送队列处理 ── async def _send_worker(self): """从队列取消息并发送,失败则写入 out_pending。""" while True: msg_data = await self._send_queue.get() # 填充 sender_chat_id msg = IMMessage( sender=self.contact_id, sender_chat_id=self.chat_id or "default", **msg_data ) try: await self.ws.send(msg.model_dump_json()) self.log.info(f"发送消息: -> {msg.receiver}:{msg.receiver_chat_id or '*'}") # 记录到 chatbox self._append_to_chatbox(msg.model_dump()) except Exception as e: self.log.error(f"发送失败: {e}") # 写入 out_pending self._append_to_out_pending(msg.model_dump()) # ── 协程 3: 轮询 in_pending 通知 Agent ── async def _pending_notifier(self): """轮询 in_pending.json,有新消息就调通知回调。""" while True: pending = self._read_in_pending() if pending: senders = list(set(m.get("sender", "unknown") for m in pending)) count = len(pending) try: await self.notifier.notify(count=count, from_contacts=senders) except Exception as e: self.log.error(f"通知回调异常: {e}") await asyncio.sleep(self.notify_interval) # ── 文件操作 (原子性) ── def _append_to_in_pending(self, msg: dict): """将收到的消息追加到 in_pending.json。""" with self._in_pending_lock: pending = self._load_json_array(self.in_pending_path) pending.append(msg) self._atomic_write_json(self.in_pending_path, pending) def _read_in_pending(self) -> list[dict]: """读取 in_pending.json (不清空)。""" with self._in_pending_lock: return self._load_json_array(self.in_pending_path) def _append_to_chatbox(self, msg: dict): """追加消息到 chatbox.jsonl。""" with self._chatbox_lock: with open(self.chatbox_path, "a", encoding="utf-8") as f: f.write(json.dumps(msg, ensure_ascii=False) + "\n") def _append_to_out_pending(self, msg: dict): """追加发送失败的消息到 out_pending.jsonl。""" with self._out_pending_lock: with open(self.out_pending_path, "a", encoding="utf-8") as f: f.write(json.dumps(msg, ensure_ascii=False) + "\n") # ── Agent 调用的工具方法 ── def read_pending(self) -> list[dict]: """Agent 读取 in_pending 中的消息,并清空。""" with self._in_pending_lock: pending = self._load_json_array(self.in_pending_path) if not pending: return [] # 清空 in_pending self._atomic_write_json(self.in_pending_path, []) return pending def send_message(self, receiver: str, content: str, msg_type: str = "chat", receiver_chat_id: str | None = None): """Agent 调用:将消息放入发送队列。 Args: receiver: 接收方 contact_id content: 消息内容 msg_type: 消息类型 receiver_chat_id: 接收方窗口 ID(指定则定向发送,否则广播给该 contact_id 的所有窗口) """ msg_data = { "receiver": receiver, "content": content, "msg_type": msg_type, "receiver_chat_id": receiver_chat_id } self._send_queue.put_nowait(msg_data) # ── 工具方法 ── @staticmethod def _load_json_array(path: Path) -> list: if not path.exists(): return [] text = path.read_text(encoding="utf-8").strip() if not text: return [] try: data = json.loads(text) return data if isinstance(data, list) else [] except json.JSONDecodeError: return [] @staticmethod def _atomic_write_json(path: Path, data): """原子写入:先写临时文件再 rename。""" tmp_fd, tmp_path = tempfile.mkstemp(dir=str(path.parent), suffix=".tmp") try: with os.fdopen(tmp_fd, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) os.replace(tmp_path, str(path)) except Exception: if os.path.exists(tmp_path): os.unlink(tmp_path) raise