| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475 |
- import json
- from pathlib import Path
- from filelock import FileLock
- class ContactStore:
- """联系人关系和 chat_id 存储。
- 存储格式 (data/contacts.json):
- {
- "alice": {
- "contacts": ["bob", "charlie"],
- "chats": {
- "bob": ["chat_001", "chat_002"],
- "charlie": ["chat_003"]
- }
- }
- }
- """
- def __init__(self, data_dir: str = "data"):
- self.data_dir = Path(data_dir)
- self.data_dir.mkdir(parents=True, exist_ok=True)
- self.contacts_path = self.data_dir / "contacts.json"
- self._lock = FileLock(str(self.data_dir / ".contacts.lock"))
- if not self.contacts_path.exists():
- self.contacts_path.write_text("{}")
- def add_contact(self, user_id: str, contact_id: str):
- """添加联系人(单向)。"""
- with self._lock:
- data = self._load()
- if user_id not in data:
- data[user_id] = {"contacts": [], "chats": {}}
- if contact_id not in data[user_id]["contacts"]:
- data[user_id]["contacts"].append(contact_id)
- if contact_id not in data[user_id]["chats"]:
- data[user_id]["chats"][contact_id] = []
- self._save(data)
- def add_chat(self, user_id: str, contact_id: str, chat_id: str):
- """为某个联系人添加新的 chat_id。"""
- with self._lock:
- data = self._load()
- if user_id not in data:
- data[user_id] = {"contacts": [], "chats": {}}
- if contact_id not in data[user_id]["chats"]:
- data[user_id]["chats"][contact_id] = []
- if chat_id not in data[user_id]["chats"][contact_id]:
- data[user_id]["chats"][contact_id].append(chat_id)
- self._save(data)
- def get_contacts(self, user_id: str) -> list[str]:
- """获取某用户的联系人列表。"""
- with self._lock:
- data = self._load()
- if user_id not in data:
- return []
- return data[user_id].get("contacts", [])
- def get_chats(self, user_id: str, contact_id: str) -> list[str]:
- """获取与某联系人的所有 chat_id。"""
- with self._lock:
- data = self._load()
- if user_id not in data:
- return []
- return data[user_id].get("chats", {}).get(contact_id, [])
- def _load(self) -> dict:
- text = self.contacts_path.read_text(encoding="utf-8")
- return json.loads(text) if text.strip() else {}
- def _save(self, data: dict):
- self.contacts_path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
|