user_manager.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. #! /usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # vim:fenc=utf-8
  4. from logging_service import logger
  5. from typing import Dict, Optional, Tuple, Any, List
  6. import json
  7. import time
  8. import os
  9. import abc
  10. import pymysql.cursors
  11. import configs
  12. from database import MySQLManager
  13. class UserManager(abc.ABC):
  14. @abc.abstractmethod
  15. def get_user_profile(self, user_id) -> Dict:
  16. pass
  17. @abc.abstractmethod
  18. def save_user_profile(self, user_id, profile: Dict) -> None:
  19. pass
  20. @abc.abstractmethod
  21. def list_all_users(self):
  22. pass
  23. @abc.abstractmethod
  24. def get_staff_profile(self, staff_id) -> Dict:
  25. #FIXME(zhoutian): 重新设计用户和员工数据管理模型
  26. pass
  27. @staticmethod
  28. def get_default_profile(**kwargs) -> Dict:
  29. default_profile = {
  30. "name": "",
  31. "nickname": "",
  32. "preferred_nickname": "",
  33. "age": 0,
  34. "region": '',
  35. "interests": [],
  36. "family_members": {},
  37. "health_conditions": [],
  38. "medications": [],
  39. "reminder_preferences": {
  40. "medication": True,
  41. "health": True,
  42. "weather": True,
  43. "news": False
  44. },
  45. "interaction_style": "standard", # standard, verbose, concise
  46. "interaction_frequency": "medium", # low, medium, high
  47. "last_topics": [],
  48. "created_at": int(time.time() * 1000),
  49. "human_intervention_history": []
  50. }
  51. for key, value in kwargs.items():
  52. if key in default_profile:
  53. default_profile[key] = value
  54. return default_profile
  55. class UserRelationManager(abc.ABC):
  56. @abc.abstractmethod
  57. def list_staffs(self):
  58. pass
  59. @abc.abstractmethod
  60. def list_users(self, staff_id: str, page: int = 1, page_size: int = 100):
  61. pass
  62. @abc.abstractmethod
  63. def list_staff_users(self) -> List[Dict]:
  64. pass
  65. class LocalUserManager(UserManager):
  66. def get_user_profile(self, user_id) -> Dict:
  67. """加载用户个人资料,如不存在则创建默认资料。主要用于本地调试"""
  68. try:
  69. with open(f"user_profiles/{user_id}.json", "r", encoding="utf-8") as f:
  70. return json.load(f)
  71. except FileNotFoundError:
  72. # 创建默认用户资料
  73. default_profile = self.get_default_profile()
  74. self.save_user_profile(user_id, default_profile)
  75. return default_profile
  76. def save_user_profile(self, user_id, profile: Dict) -> None:
  77. if not user_id:
  78. raise Exception("Invalid user_id: {}".format(user_id))
  79. with open(f"user_profiles/{user_id}.json", "w", encoding="utf-8") as f:
  80. json.dump(profile, f, ensure_ascii=False, indent=2)
  81. def list_all_users(self):
  82. user_ids = []
  83. for root, dirs, files in os.walk('user_profiles/'):
  84. for file in files:
  85. if file.endswith('.json'):
  86. user_ids.append(os.path.splitext(file)[0])
  87. return user_ids
  88. def get_staff_profile(self, staff_id) -> Dict:
  89. return {}
  90. class MySQLUserManager(UserManager):
  91. def __init__(self, db_config, table_name, staff_table):
  92. self.db = MySQLManager(db_config)
  93. self.table_name = table_name
  94. self.staff_table = staff_table
  95. def get_user_profile(self, user_id) -> Dict:
  96. sql = f"SELECT name, wxid, profile_data_v1 FROM {self.table_name} WHERE third_party_user_id = {user_id}"
  97. data = self.db.select(sql, pymysql.cursors.DictCursor)
  98. if not data:
  99. logger.error(f"user[{user_id}] not found")
  100. return {}
  101. data = data[0]
  102. if not data['profile_data_v1']:
  103. logger.warning(f"user[{user_id}] profile not found, create a default one")
  104. default_profile = self.get_default_profile(nickname=data['name'])
  105. self.save_user_profile(user_id, default_profile)
  106. return default_profile
  107. else:
  108. return json.loads(data['profile_data_v1'])
  109. def save_user_profile(self, user_id, profile: Dict) -> None:
  110. if not user_id:
  111. raise Exception("Invalid user_id: {}".format(user_id))
  112. sql = f"UPDATE {self.table_name} SET profile_data_v1 = %s WHERE third_party_user_id = {user_id}"
  113. self.db.execute(sql, (json.dumps(profile),))
  114. def list_all_users(self):
  115. sql = f"SELECT third_party_user_id FROM {self.table_name}"
  116. data = self.db.select(sql, pymysql.cursors.DictCursor)
  117. return [user['third_party_user_id'] for user in data]
  118. def get_staff_profile(self, staff_id) -> Dict:
  119. if not self.staff_table:
  120. raise Exception("staff_table is not set")
  121. sql = f"SELECT agent_name, agent_age, agent_region " \
  122. f"FROM {self.staff_table} WHERE third_party_user_id = '{staff_id}'"
  123. data = self.db.select(sql, pymysql.cursors.DictCursor)
  124. if not data:
  125. logger.error(f"staff[{staff_id}] not found")
  126. return {}
  127. profile = data[0]
  128. return profile
  129. class MySQLUserRelationManager(UserRelationManager):
  130. def __init__(self, agent_db_config, wecom_db_config,
  131. agent_staff_table, agent_user_table,
  132. staff_table, relation_table, user_table):
  133. # FIXME(zhoutian): 因为现在数据库表不统一,需要从两个库读取
  134. self.agent_db = MySQLManager(agent_db_config)
  135. self.wecom_db = MySQLManager(wecom_db_config)
  136. self.agent_staff_table = agent_staff_table
  137. self.staff_table = staff_table
  138. self.relation_table = relation_table
  139. self.agent_user_table = agent_user_table
  140. self.user_table = user_table
  141. def list_staffs(self):
  142. return []
  143. def list_users(self, staff_id: str, page: int = 1, page_size: int = 100):
  144. return []
  145. def list_staff_users(self):
  146. sql = (f"SELECT third_party_user_id, wxid FROM {self.agent_staff_table} WHERE status = 1"
  147. f" AND third_party_user_id = '1688854492669990'")
  148. agent_staff_data = self.agent_db.select(sql, pymysql.cursors.DictCursor)
  149. if not agent_staff_data:
  150. return []
  151. ret = []
  152. for agent_staff in agent_staff_data:
  153. wxid = agent_staff['wxid']
  154. sql = f"SELECT id FROM {self.staff_table} WHERE carrier_id = '{wxid}'"
  155. staff_data = self.wecom_db.select(sql, pymysql.cursors.DictCursor)
  156. if not staff_data:
  157. logger.error(f"staff[{wxid}] not found in wecom database")
  158. continue
  159. staff_id = staff_data[0]['id']
  160. sql = f"SELECT user_id FROM {self.relation_table} WHERE staff_id = '{staff_id}' AND is_delete = 0"
  161. user_data = self.wecom_db.select(sql, pymysql.cursors.DictCursor)
  162. if not user_data:
  163. logger.warning(f"staff[{wxid}] has no user")
  164. continue
  165. user_ids = tuple(user['user_id'] for user in user_data)
  166. sql = f"SELECT union_id FROM {self.user_table} WHERE id IN {str(user_ids)} AND union_id is not null"
  167. user_data = self.wecom_db.select(sql, pymysql.cursors.DictCursor)
  168. if not user_data:
  169. logger.warning(f"staff[{wxid}] users not found in wecom database")
  170. continue
  171. user_union_ids = tuple(user['union_id'] for user in user_data)
  172. batch_size = 100
  173. n_batches = (len(user_union_ids) + batch_size - 1) // batch_size
  174. agent_user_data = []
  175. for i in range(n_batches):
  176. idx_begin = i * batch_size
  177. idx_end = min((i + 1) * batch_size, len(user_union_ids))
  178. batch_union_ids = user_union_ids[idx_begin:idx_end]
  179. sql = f"SELECT third_party_user_id, wxid FROM {self.agent_user_table} WHERE wxid IN {str(batch_union_ids)}"
  180. batch_agent_user_data = self.agent_db.select(sql, pymysql.cursors.DictCursor)
  181. if len(agent_user_data) != len(batch_union_ids):
  182. # logger.debug(f"staff[{wxid}] some users not found in agent database")
  183. pass
  184. agent_user_data.extend(batch_agent_user_data)
  185. staff_user_pairs = [
  186. {
  187. 'staff_id': agent_staff['third_party_user_id'],
  188. 'user_id': agent_user['third_party_user_id']
  189. }
  190. for agent_user in agent_user_data
  191. ]
  192. ret.extend(staff_user_pairs)
  193. return ret
  194. if __name__ == '__main__':
  195. config = configs.get()
  196. user_db_config = config['storage']['user']
  197. staff_db_config = config['storage']['staff']
  198. user_manager = MySQLUserManager(user_db_config['mysql'], user_db_config['table'], staff_db_config['table'])
  199. user_profile = user_manager.get_user_profile('7881301263964433')
  200. print(user_profile)
  201. wecom_db_config = config['storage']['user_relation']
  202. user_relation_manager = MySQLUserRelationManager(
  203. user_db_config['mysql'], wecom_db_config['mysql'],
  204. config['storage']['staff']['table'],
  205. user_db_config['table'],
  206. wecom_db_config['table']['staff'],
  207. wecom_db_config['table']['relation'],
  208. wecom_db_config['table']['user']
  209. )
  210. all_staff_users = user_relation_manager.list_staff_users()
  211. print(all_staff_users)