| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- import asyncio
- import json
- import logging
- import os
- import tempfile
- import uuid
- from datetime import datetime
- from pathlib import Path
- import websockets
- from filelock import FileLock
- from protocol import IMMessage, IMResponse
- from notifier import AgentNotifier, ConsoleNotifier
- logging.basicConfig(level=logging.INFO, format="%(asctime)s [CLIENT:%(name)s] %(message)s")
- class IMClient:
- """IM Client 长驻服务。
- 通过 WebSocket 连接 Server,通过文件与 Agent 交互。
- 文件约定 (data/{contact_id}/):
- chatbox.jsonl — 所有消息历史(收发都记录)
- in_pending.json — 收到的待处理消息 (JSON 数组)
- out_pending.jsonl — 发送失败的消息
- 窗口模式 (window_mode=True):
- 每次运行生成新的 chat_id,消息按 chat_id 隔离
- 文件结构变为: data/{contact_id}/{chat_id}/...
- """
- def __init__(
- self,
- contact_id: str,
- server_url: str = "ws://localhost:8000",
- data_dir: str | None = None,
- notifier: AgentNotifier | None = None,
- notify_interval: float = 30.0,
- window_mode: bool = False,
- chat_id: str | None = None,
- ):
- self.contact_id = contact_id
- self.server_url = server_url
- self.notifier = notifier or ConsoleNotifier()
- self.notify_interval = notify_interval
- self.window_mode = window_mode
- # 窗口模式:生成或使用指定的 chat_id
- base_dir = Path(data_dir) if data_dir else Path("data") / contact_id
- if window_mode:
- self.chat_id = chat_id or datetime.now().strftime("%Y%m%d_%H%M%S_") + uuid.uuid4().hex[:6]
- self.data_dir = base_dir / "windows" / self.chat_id
- else:
- self.chat_id = None
- self.data_dir = base_dir
- self.data_dir.mkdir(parents=True, exist_ok=True)
- self.chatbox_path = self.data_dir / "chatbox.jsonl"
- self.in_pending_path = self.data_dir / "in_pending.json"
- self.out_pending_path = self.data_dir / "out_pending.jsonl"
- # 文件锁
- self._in_pending_lock = FileLock(str(self.data_dir / ".in_pending.lock"))
- self._out_pending_lock = FileLock(str(self.data_dir / ".out_pending.lock"))
- self._chatbox_lock = FileLock(str(self.data_dir / ".chatbox.lock"))
- self.ws = None
- self.log = logging.getLogger(f"{contact_id}:{self.chat_id}" if self.chat_id else contact_id)
- self._send_queue = asyncio.Queue()
- # 初始化文件
- if not self.chatbox_path.exists():
- self.chatbox_path.write_text("")
- if not self.in_pending_path.exists():
- self.in_pending_path.write_text("[]")
- if not self.out_pending_path.exists():
- self.out_pending_path.write_text("")
- async def run(self):
- """启动 Client 服务,自动重连。"""
- while True:
- try:
- # 构造 WebSocket URL,带上 chat_id 参数
- chat_id_param = self.chat_id or "default"
- ws_url = f"{self.server_url}/ws?contact_id={self.contact_id}&chat_id={chat_id_param}"
- self.log.info(f"连接 {ws_url} ...")
- async with websockets.connect(ws_url) as ws:
- self.ws = ws
- self.log.info("已连接")
- await asyncio.gather(
- self._ws_listener(),
- self._send_worker(),
- self._pending_notifier(),
- )
- except (websockets.ConnectionClosed, ConnectionRefusedError, OSError) as e:
- self.log.warning(f"连接断开: {e}, 5 秒后重连...")
- self.ws = None
- await asyncio.sleep(5)
- except asyncio.CancelledError:
- self.log.info("服务停止")
- break
- # ── 协程 1: WebSocket 收消息 ──
- async def _ws_listener(self):
- """监听 WebSocket,聊天消息写 in_pending 和 chatbox,回执打日志。"""
- async for raw in self.ws:
- try:
- data = json.loads(raw)
- except json.JSONDecodeError:
- self.log.warning(f"收到无效 JSON: {raw}")
- continue
- if "sender" in data and "receiver" in data:
- # 聊天消息
- self.log.info(f"收到消息: {data['sender']} -> {data['content'][:50]}")
- self._append_to_in_pending(data)
- self._append_to_chatbox(data)
- elif "status" in data:
- # 发送回执
- resp = IMResponse(**data)
- if resp.status == "success":
- self.log.info(f"消息 {resp.msg_id} 发送成功")
- else:
- self.log.warning(f"消息 {resp.msg_id} 发送失败: {resp.error}")
- # ── 协程 2: 发送队列处理 ──
- async def _send_worker(self):
- """从队列取消息并发送,失败则写入 out_pending。"""
- while True:
- msg_data = await self._send_queue.get()
- # 填充 sender_chat_id
- msg = IMMessage(
- sender=self.contact_id,
- sender_chat_id=self.chat_id or "default",
- **msg_data
- )
- try:
- await self.ws.send(msg.model_dump_json())
- self.log.info(f"发送消息: -> {msg.receiver}:{msg.receiver_chat_id or '*'}")
- # 记录到 chatbox
- self._append_to_chatbox(msg.model_dump())
- except Exception as e:
- self.log.error(f"发送失败: {e}")
- # 写入 out_pending
- self._append_to_out_pending(msg.model_dump())
- # ── 协程 3: 轮询 in_pending 通知 Agent ──
- async def _pending_notifier(self):
- """轮询 in_pending.json,有新消息就调通知回调。"""
- while True:
- pending = self._read_in_pending()
- if pending:
- senders = list(set(m.get("sender", "unknown") for m in pending))
- count = len(pending)
- try:
- await self.notifier.notify(count=count, from_contacts=senders)
- except Exception as e:
- self.log.error(f"通知回调异常: {e}")
- await asyncio.sleep(self.notify_interval)
- # ── 文件操作 (原子性) ──
- def _append_to_in_pending(self, msg: dict):
- """将收到的消息追加到 in_pending.json。"""
- with self._in_pending_lock:
- pending = self._load_json_array(self.in_pending_path)
- pending.append(msg)
- self._atomic_write_json(self.in_pending_path, pending)
- def _read_in_pending(self) -> list[dict]:
- """读取 in_pending.json (不清空)。"""
- with self._in_pending_lock:
- return self._load_json_array(self.in_pending_path)
- def _append_to_chatbox(self, msg: dict):
- """追加消息到 chatbox.jsonl。"""
- with self._chatbox_lock:
- with open(self.chatbox_path, "a", encoding="utf-8") as f:
- f.write(json.dumps(msg, ensure_ascii=False) + "\n")
- def _append_to_out_pending(self, msg: dict):
- """追加发送失败的消息到 out_pending.jsonl。"""
- with self._out_pending_lock:
- with open(self.out_pending_path, "a", encoding="utf-8") as f:
- f.write(json.dumps(msg, ensure_ascii=False) + "\n")
- # ── Agent 调用的工具方法 ──
- def read_pending(self) -> list[dict]:
- """Agent 读取 in_pending 中的消息,并清空。"""
- with self._in_pending_lock:
- pending = self._load_json_array(self.in_pending_path)
- if not pending:
- return []
- # 清空 in_pending
- self._atomic_write_json(self.in_pending_path, [])
- return pending
- def send_message(self, receiver: str, content: str, msg_type: str = "chat", receiver_chat_id: str | None = None):
- """Agent 调用:将消息放入发送队列。
- Args:
- receiver: 接收方 contact_id
- content: 消息内容
- msg_type: 消息类型
- receiver_chat_id: 接收方窗口 ID(指定则定向发送,否则广播给该 contact_id 的所有窗口)
- """
- msg_data = {
- "receiver": receiver,
- "content": content,
- "msg_type": msg_type,
- "receiver_chat_id": receiver_chat_id
- }
- self._send_queue.put_nowait(msg_data)
- # ── 工具方法 ──
- @staticmethod
- def _load_json_array(path: Path) -> list:
- if not path.exists():
- return []
- text = path.read_text(encoding="utf-8").strip()
- if not text:
- return []
- try:
- data = json.loads(text)
- return data if isinstance(data, list) else []
- except json.JSONDecodeError:
- return []
- @staticmethod
- def _atomic_write_json(path: Path, data):
- """原子写入:先写临时文件再 rename。"""
- tmp_fd, tmp_path = tempfile.mkstemp(dir=str(path.parent), suffix=".tmp")
- try:
- with os.fdopen(tmp_fd, "w", encoding="utf-8") as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- os.replace(tmp_path, str(path))
- except Exception:
- if os.path.exists(tmp_path):
- os.unlink(tmp_path)
- raise
|