client.py.bak 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. import asyncio
  2. import json
  3. import logging
  4. import os
  5. import tempfile
  6. import uuid
  7. from datetime import datetime
  8. from pathlib import Path
  9. import websockets
  10. from filelock import FileLock
  11. from protocol import IMMessage, IMResponse
  12. from notifier import AgentNotifier, ConsoleNotifier
  13. logging.basicConfig(level=logging.INFO, format="%(asctime)s [CLIENT:%(name)s] %(message)s")
  14. class IMClient:
  15. """IM Client 长驻服务。
  16. 通过 WebSocket 连接 Server,通过文件与 Agent 交互。
  17. 文件约定 (data/{contact_id}/):
  18. chatbox.jsonl — 所有消息历史(收发都记录)
  19. in_pending.json — 收到的待处理消息 (JSON 数组)
  20. out_pending.jsonl — 发送失败的消息
  21. 窗口模式 (window_mode=True):
  22. 每次运行生成新的 chat_id,消息按 chat_id 隔离
  23. 文件结构变为: data/{contact_id}/{chat_id}/...
  24. """
  25. def __init__(
  26. self,
  27. contact_id: str,
  28. server_url: str = "ws://localhost:8000",
  29. data_dir: str | None = None,
  30. notifier: AgentNotifier | None = None,
  31. notify_interval: float = 30.0,
  32. window_mode: bool = False,
  33. chat_id: str | None = None,
  34. ):
  35. self.contact_id = contact_id
  36. self.server_url = server_url
  37. self.notifier = notifier or ConsoleNotifier()
  38. self.notify_interval = notify_interval
  39. self.window_mode = window_mode
  40. # 窗口模式:生成或使用指定的 chat_id
  41. base_dir = Path(data_dir) if data_dir else Path("data") / contact_id
  42. if window_mode:
  43. self.chat_id = chat_id or datetime.now().strftime("%Y%m%d_%H%M%S_") + uuid.uuid4().hex[:6]
  44. self.data_dir = base_dir / "windows" / self.chat_id
  45. else:
  46. self.chat_id = None
  47. self.data_dir = base_dir
  48. self.data_dir.mkdir(parents=True, exist_ok=True)
  49. self.chatbox_path = self.data_dir / "chatbox.jsonl"
  50. self.in_pending_path = self.data_dir / "in_pending.json"
  51. self.out_pending_path = self.data_dir / "out_pending.jsonl"
  52. # 文件锁
  53. self._in_pending_lock = FileLock(str(self.data_dir / ".in_pending.lock"))
  54. self._out_pending_lock = FileLock(str(self.data_dir / ".out_pending.lock"))
  55. self._chatbox_lock = FileLock(str(self.data_dir / ".chatbox.lock"))
  56. self.ws = None
  57. self.log = logging.getLogger(f"{contact_id}:{self.chat_id}" if self.chat_id else contact_id)
  58. self._send_queue = asyncio.Queue()
  59. # 初始化文件
  60. if not self.chatbox_path.exists():
  61. self.chatbox_path.write_text("")
  62. if not self.in_pending_path.exists():
  63. self.in_pending_path.write_text("[]")
  64. if not self.out_pending_path.exists():
  65. self.out_pending_path.write_text("")
  66. async def run(self):
  67. """启动 Client 服务,自动重连。"""
  68. while True:
  69. try:
  70. # 构造 WebSocket URL,带上 chat_id 参数
  71. chat_id_param = self.chat_id or "default"
  72. ws_url = f"{self.server_url}/ws?contact_id={self.contact_id}&chat_id={chat_id_param}"
  73. self.log.info(f"连接 {ws_url} ...")
  74. async with websockets.connect(ws_url) as ws:
  75. self.ws = ws
  76. self.log.info("已连接")
  77. await asyncio.gather(
  78. self._ws_listener(),
  79. self._send_worker(),
  80. self._pending_notifier(),
  81. )
  82. except (websockets.ConnectionClosed, ConnectionRefusedError, OSError) as e:
  83. self.log.warning(f"连接断开: {e}, 5 秒后重连...")
  84. self.ws = None
  85. await asyncio.sleep(5)
  86. except asyncio.CancelledError:
  87. self.log.info("服务停止")
  88. break
  89. # ── 协程 1: WebSocket 收消息 ──
  90. async def _ws_listener(self):
  91. """监听 WebSocket,聊天消息写 in_pending 和 chatbox,回执打日志。"""
  92. async for raw in self.ws:
  93. try:
  94. data = json.loads(raw)
  95. except json.JSONDecodeError:
  96. self.log.warning(f"收到无效 JSON: {raw}")
  97. continue
  98. if "sender" in data and "receiver" in data:
  99. # 聊天消息
  100. self.log.info(f"收到消息: {data['sender']} -> {data['content'][:50]}")
  101. self._append_to_in_pending(data)
  102. self._append_to_chatbox(data)
  103. elif "status" in data:
  104. # 发送回执
  105. resp = IMResponse(**data)
  106. if resp.status == "success":
  107. self.log.info(f"消息 {resp.msg_id} 发送成功")
  108. else:
  109. self.log.warning(f"消息 {resp.msg_id} 发送失败: {resp.error}")
  110. # ── 协程 2: 发送队列处理 ──
  111. async def _send_worker(self):
  112. """从队列取消息并发送,失败则写入 out_pending。"""
  113. while True:
  114. msg_data = await self._send_queue.get()
  115. # 填充 sender_chat_id
  116. msg = IMMessage(
  117. sender=self.contact_id,
  118. sender_chat_id=self.chat_id or "default",
  119. **msg_data
  120. )
  121. try:
  122. await self.ws.send(msg.model_dump_json())
  123. self.log.info(f"发送消息: -> {msg.receiver}:{msg.receiver_chat_id or '*'}")
  124. # 记录到 chatbox
  125. self._append_to_chatbox(msg.model_dump())
  126. except Exception as e:
  127. self.log.error(f"发送失败: {e}")
  128. # 写入 out_pending
  129. self._append_to_out_pending(msg.model_dump())
  130. # ── 协程 3: 轮询 in_pending 通知 Agent ──
  131. async def _pending_notifier(self):
  132. """轮询 in_pending.json,有新消息就调通知回调。"""
  133. while True:
  134. pending = self._read_in_pending()
  135. if pending:
  136. senders = list(set(m.get("sender", "unknown") for m in pending))
  137. count = len(pending)
  138. try:
  139. await self.notifier.notify(count=count, from_contacts=senders)
  140. except Exception as e:
  141. self.log.error(f"通知回调异常: {e}")
  142. await asyncio.sleep(self.notify_interval)
  143. # ── 文件操作 (原子性) ──
  144. def _append_to_in_pending(self, msg: dict):
  145. """将收到的消息追加到 in_pending.json。"""
  146. with self._in_pending_lock:
  147. pending = self._load_json_array(self.in_pending_path)
  148. pending.append(msg)
  149. self._atomic_write_json(self.in_pending_path, pending)
  150. def _read_in_pending(self) -> list[dict]:
  151. """读取 in_pending.json (不清空)。"""
  152. with self._in_pending_lock:
  153. return self._load_json_array(self.in_pending_path)
  154. def _append_to_chatbox(self, msg: dict):
  155. """追加消息到 chatbox.jsonl。"""
  156. with self._chatbox_lock:
  157. with open(self.chatbox_path, "a", encoding="utf-8") as f:
  158. f.write(json.dumps(msg, ensure_ascii=False) + "\n")
  159. def _append_to_out_pending(self, msg: dict):
  160. """追加发送失败的消息到 out_pending.jsonl。"""
  161. with self._out_pending_lock:
  162. with open(self.out_pending_path, "a", encoding="utf-8") as f:
  163. f.write(json.dumps(msg, ensure_ascii=False) + "\n")
  164. # ── Agent 调用的工具方法 ──
  165. def read_pending(self) -> list[dict]:
  166. """Agent 读取 in_pending 中的消息,并清空。"""
  167. with self._in_pending_lock:
  168. pending = self._load_json_array(self.in_pending_path)
  169. if not pending:
  170. return []
  171. # 清空 in_pending
  172. self._atomic_write_json(self.in_pending_path, [])
  173. return pending
  174. def send_message(self, receiver: str, content: str, msg_type: str = "chat", receiver_chat_id: str | None = None):
  175. """Agent 调用:将消息放入发送队列。
  176. Args:
  177. receiver: 接收方 contact_id
  178. content: 消息内容
  179. msg_type: 消息类型
  180. receiver_chat_id: 接收方窗口 ID(指定则定向发送,否则广播给该 contact_id 的所有窗口)
  181. """
  182. msg_data = {
  183. "receiver": receiver,
  184. "content": content,
  185. "msg_type": msg_type,
  186. "receiver_chat_id": receiver_chat_id
  187. }
  188. self._send_queue.put_nowait(msg_data)
  189. # ── 工具方法 ──
  190. @staticmethod
  191. def _load_json_array(path: Path) -> list:
  192. if not path.exists():
  193. return []
  194. text = path.read_text(encoding="utf-8").strip()
  195. if not text:
  196. return []
  197. try:
  198. data = json.loads(text)
  199. return data if isinstance(data, list) else []
  200. except json.JSONDecodeError:
  201. return []
  202. @staticmethod
  203. def _atomic_write_json(path: Path, data):
  204. """原子写入:先写临时文件再 rename。"""
  205. tmp_fd, tmp_path = tempfile.mkstemp(dir=str(path.parent), suffix=".tmp")
  206. try:
  207. with os.fdopen(tmp_fd, "w", encoding="utf-8") as f:
  208. json.dump(data, f, ensure_ascii=False, indent=2)
  209. os.replace(tmp_path, str(path))
  210. except Exception:
  211. if os.path.exists(tmp_path):
  212. os.unlink(tmp_path)
  213. raise