contact_store.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. import json
  2. from pathlib import Path
  3. from filelock import FileLock
  4. class ContactStore:
  5. """联系人关系和 chat_id 存储。
  6. 存储格式 (data/contacts.json):
  7. {
  8. "alice": {
  9. "contacts": ["bob", "charlie"],
  10. "chats": {
  11. "bob": ["chat_001", "chat_002"],
  12. "charlie": ["chat_003"]
  13. }
  14. }
  15. }
  16. """
  17. def __init__(self, data_dir: str = "data"):
  18. self.data_dir = Path(data_dir)
  19. self.data_dir.mkdir(parents=True, exist_ok=True)
  20. self.contacts_path = self.data_dir / "contacts.json"
  21. self._lock = FileLock(str(self.data_dir / ".contacts.lock"))
  22. if not self.contacts_path.exists():
  23. self.contacts_path.write_text("{}")
  24. def add_contact(self, user_id: str, contact_id: str):
  25. """添加联系人(单向)。"""
  26. with self._lock:
  27. data = self._load()
  28. if user_id not in data:
  29. data[user_id] = {"contacts": [], "chats": {}}
  30. if contact_id not in data[user_id]["contacts"]:
  31. data[user_id]["contacts"].append(contact_id)
  32. if contact_id not in data[user_id]["chats"]:
  33. data[user_id]["chats"][contact_id] = []
  34. self._save(data)
  35. def add_chat(self, user_id: str, contact_id: str, chat_id: str):
  36. """为某个联系人添加新的 chat_id。"""
  37. with self._lock:
  38. data = self._load()
  39. if user_id not in data:
  40. data[user_id] = {"contacts": [], "chats": {}}
  41. if contact_id not in data[user_id]["chats"]:
  42. data[user_id]["chats"][contact_id] = []
  43. if chat_id not in data[user_id]["chats"][contact_id]:
  44. data[user_id]["chats"][contact_id].append(chat_id)
  45. self._save(data)
  46. def get_contacts(self, user_id: str) -> list[str]:
  47. """获取某用户的联系人列表。"""
  48. with self._lock:
  49. data = self._load()
  50. if user_id not in data:
  51. return []
  52. return data[user_id].get("contacts", [])
  53. def get_chats(self, user_id: str, contact_id: str) -> list[str]:
  54. """获取与某联系人的所有 chat_id。"""
  55. with self._lock:
  56. data = self._load()
  57. if user_id not in data:
  58. return []
  59. return data[user_id].get("chats", {}).get(contact_id, [])
  60. def _load(self) -> dict:
  61. text = self.contacts_path.read_text(encoding="utf-8")
  62. return json.loads(text) if text.strip() else {}
  63. def _save(self, data: dict):
  64. self.contacts_path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")