| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308 |
- import asyncio
- import json
- import logging
- import os
- import tempfile
- import uuid
- from datetime import datetime
- from pathlib import Path
- import websockets
- from filelock import FileLock
- from protocol import IMMessage, IMResponse
- from notifier import AgentNotifier, ConsoleNotifier
- logging.basicConfig(level=logging.INFO, format="%(asctime)s [CLIENT:%(name)s] %(message)s")
- class ChatWindow:
- """单个聊天窗口的数据管理。"""
- def __init__(self, chat_id: str, data_dir: Path):
- self.chat_id = chat_id
- self.data_dir = data_dir
- self.data_dir.mkdir(parents=True, exist_ok=True)
- self.chatbox_path = data_dir / "chatbox.jsonl"
- self.in_pending_path = data_dir / "in_pending.json"
- self.out_pending_path = data_dir / "out_pending.jsonl"
- # 文件锁
- self._in_pending_lock = FileLock(str(data_dir / ".in_pending.lock"))
- self._out_pending_lock = FileLock(str(data_dir / ".out_pending.lock"))
- self._chatbox_lock = FileLock(str(data_dir / ".chatbox.lock"))
- # 初始化文件
- if not self.chatbox_path.exists():
- self.chatbox_path.write_text("")
- if not self.in_pending_path.exists():
- self.in_pending_path.write_text("[]")
- if not self.out_pending_path.exists():
- self.out_pending_path.write_text("")
- def append_to_in_pending(self, msg: dict):
- with self._in_pending_lock:
- pending = self._load_json_array(self.in_pending_path)
- pending.append(msg)
- self._atomic_write_json(self.in_pending_path, pending)
- def read_in_pending(self) -> list[dict]:
- with self._in_pending_lock:
- return self._load_json_array(self.in_pending_path)
- def clear_in_pending(self):
- with self._in_pending_lock:
- self._atomic_write_json(self.in_pending_path, [])
- def append_to_chatbox(self, msg: dict):
- with self._chatbox_lock:
- with open(self.chatbox_path, "a", encoding="utf-8") as f:
- f.write(json.dumps(msg, ensure_ascii=False) + "\n")
- def append_to_out_pending(self, msg: dict):
- with self._out_pending_lock:
- with open(self.out_pending_path, "a", encoding="utf-8") as f:
- f.write(json.dumps(msg, ensure_ascii=False) + "\n")
- @staticmethod
- def _load_json_array(path: Path) -> list:
- if not path.exists():
- return []
- text = path.read_text(encoding="utf-8").strip()
- if not text:
- return []
- try:
- data = json.loads(text)
- return data if isinstance(data, list) else []
- except json.JSONDecodeError:
- return []
- @staticmethod
- def _atomic_write_json(path: Path, data):
- tmp_fd, tmp_path = tempfile.mkstemp(dir=str(path.parent), suffix=".tmp")
- try:
- with os.fdopen(tmp_fd, "w", encoding="utf-8") as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- os.replace(tmp_path, str(path))
- except Exception:
- if os.path.exists(tmp_path):
- os.unlink(tmp_path)
- raise
- class IMClient:
- """IM Client - 一个实例管理多个聊天窗口。
- 一个 Agent (contact_id) 对应一个 IMClient 实例。
- 该实例可以管理多个 chat_id(窗口),每个窗口有独立的消息存储。
- """
- def __init__(
- self,
- contact_id: str,
- server_url: str = "ws://localhost:8000",
- data_dir: str | None = None,
- notify_interval: float = 30.0,
- ):
- self.contact_id = contact_id
- self.server_url = server_url
- self.notify_interval = notify_interval
- self.base_dir = Path(data_dir) if data_dir else Path("data") / contact_id
- self.base_dir.mkdir(parents=True, exist_ok=True)
- # 窗口管理
- self._windows: dict[str, ChatWindow] = {}
- self._notifiers: dict[str, AgentNotifier] = {}
- self.ws = None
- self.log = logging.getLogger(contact_id)
- self._send_queue = asyncio.Queue()
- def open_window(self, chat_id: str | None = None, notifier: AgentNotifier | None = None) -> str:
- """打开一个新窗口。
- Args:
- chat_id: 窗口 ID(留空自动生成)
- notifier: 该窗口的通知器
- Returns:
- 窗口的 chat_id
- """
- if chat_id is None:
- chat_id = datetime.now().strftime("%Y%m%d_%H%M%S_") + uuid.uuid4().hex[:6]
- if chat_id in self._windows:
- return chat_id
- window_dir = self.base_dir / "windows" / chat_id
- self._windows[chat_id] = ChatWindow(chat_id, window_dir)
- self._notifiers[chat_id] = notifier or ConsoleNotifier()
- self.log.info(f"打开窗口: {chat_id}")
- return chat_id
- def close_window(self, chat_id: str):
- """关闭一个窗口。"""
- self._windows.pop(chat_id, None)
- self._notifiers.pop(chat_id, None)
- self.log.info(f"关闭窗口: {chat_id}")
- def list_windows(self) -> list[str]:
- """列出所有打开的窗口。"""
- return list(self._windows.keys())
- async def run(self):
- """启动 Client 服务,自动重连。"""
- while True:
- try:
- # 连接时不带 chat_id,因为一个实例管理多个窗口
- ws_url = f"{self.server_url}/ws?contact_id={self.contact_id}&chat_id=__multi__"
- self.log.info(f"连接 {ws_url} ...")
- async with websockets.connect(ws_url) as ws:
- self.ws = ws
- self.log.info("已连接")
- await asyncio.gather(
- self._ws_listener(),
- self._send_worker(),
- self._pending_notifier(),
- )
- except (websockets.ConnectionClosed, ConnectionRefusedError, OSError) as e:
- self.log.warning(f"连接断开: {e}, 5 秒后重连...")
- self.ws = None
- await asyncio.sleep(5)
- except asyncio.CancelledError:
- self.log.info("服务停止")
- break
- async def _ws_listener(self):
- """监听 WebSocket,根据 receiver_chat_id 分发到对应窗口。"""
- async for raw in self.ws:
- try:
- data = json.loads(raw)
- except json.JSONDecodeError:
- self.log.warning(f"收到无效 JSON: {raw}")
- continue
- if "sender" in data and "receiver" in data:
- # 聊天消息
- receiver_chat_id = data.get("receiver_chat_id")
- if receiver_chat_id and receiver_chat_id in self._windows:
- # 定向发送到指定窗口
- window = self._windows[receiver_chat_id]
- window.append_to_in_pending(data)
- window.append_to_chatbox(data)
- self.log.info(f"收到消息 -> 窗口 {receiver_chat_id}: {data['sender']}")
- elif not receiver_chat_id:
- # 广播到所有窗口
- for chat_id, window in self._windows.items():
- window.append_to_in_pending(data)
- window.append_to_chatbox(data)
- self.log.info(f"收到消息 -> 广播到 {len(self._windows)} 个窗口: {data['sender']}")
- else:
- self.log.warning(f"收到消息但窗口 {receiver_chat_id} 不存在")
- elif "status" in data:
- # 发送回执
- resp = IMResponse(**data)
- if resp.status == "success":
- self.log.info(f"消息 {resp.msg_id} 发送成功")
- else:
- self.log.warning(f"消息 {resp.msg_id} 发送失败: {resp.error}")
- async def _send_worker(self):
- """从队列取消息并发送。"""
- while True:
- msg_data = await self._send_queue.get()
- msg = IMMessage(sender=self.contact_id, **msg_data)
- try:
- await self.ws.send(msg.model_dump_json())
- self.log.info(f"发送消息: -> {msg.receiver}:{msg.receiver_chat_id or '*'}")
- # 记录到发送方窗口的 chatbox
- if msg.sender_chat_id and msg.sender_chat_id in self._windows:
- self._windows[msg.sender_chat_id].append_to_chatbox(msg.model_dump())
- except Exception as e:
- self.log.error(f"发送失败: {e}")
- if msg.sender_chat_id and msg.sender_chat_id in self._windows:
- self._windows[msg.sender_chat_id].append_to_out_pending(msg.model_dump())
- async def _pending_notifier(self):
- """轮询各窗口的 in_pending,有新消息就调通知回调。"""
- while True:
- for chat_id, window in list(self._windows.items()):
- pending = window.read_in_pending()
- if pending:
- senders = list(set(m.get("sender", "unknown") for m in pending))
- count = len(pending)
- notifier = self._notifiers.get(chat_id)
- if notifier:
- try:
- await notifier.notify(count=count, from_contacts=senders)
- except Exception as e:
- self.log.error(f"窗口 {chat_id} 通知回调异常: {e}")
- await asyncio.sleep(self.notify_interval)
- # ── Agent 调用的工具方法 ──
- def read_pending(self, chat_id: str) -> list[dict]:
- """读取某个窗口的待处理消息,并清空。"""
- window = self._windows.get(chat_id)
- if window is None:
- return []
- pending = window.read_in_pending()
- if pending:
- window.clear_in_pending()
- return pending
- def send_message(
- self,
- chat_id: str,
- receiver: str,
- content: str,
- msg_type: str = "chat",
- receiver_chat_id: str | None = None,
- ):
- """从某个窗口发送消息。"""
- msg_data = {
- "sender_chat_id": chat_id,
- "receiver": receiver,
- "content": content,
- "msg_type": msg_type,
- "receiver_chat_id": receiver_chat_id,
- }
- self._send_queue.put_nowait(msg_data)
- def get_chat_history(self, chat_id: str, peer_id: str | None = None, limit: int = 20) -> list[dict]:
- """查询某个窗口的聊天历史。"""
- window = self._windows.get(chat_id)
- if window is None or not window.chatbox_path.exists():
- return []
- lines = window.chatbox_path.read_text(encoding="utf-8").strip().splitlines()
- messages = []
- for line in reversed(lines):
- if not line.strip():
- continue
- try:
- m = json.loads(line)
- except json.JSONDecodeError:
- continue
- if peer_id and m.get("sender") != peer_id and m.get("receiver") != peer_id:
- continue
- messages.append({
- "sender": m.get("sender", "unknown"),
- "receiver": m.get("receiver", "unknown"),
- "content": m.get("content", ""),
- "msg_type": m.get("msg_type", "chat"),
- })
- if len(messages) >= limit:
- break
- messages.reverse()
- return messages
|