user_manager.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. #! /usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # vim:fenc=utf-8
  4. from logging_service import logger
  5. from typing import Dict, Optional, Tuple, Any, List
  6. import json
  7. import time
  8. import os
  9. import abc
  10. import pymysql.cursors
  11. import configs
  12. from database import MySQLManager
  13. class UserManager(abc.ABC):
  14. @abc.abstractmethod
  15. def get_user_profile(self, user_id) -> Dict:
  16. pass
  17. @abc.abstractmethod
  18. def save_user_profile(self, user_id, profile: Dict) -> None:
  19. pass
  20. @abc.abstractmethod
  21. def list_all_users(self):
  22. pass
  23. @abc.abstractmethod
  24. def get_staff_profile(self, staff_id) -> Dict:
  25. #FIXME(zhoutian): 重新设计用户和员工数据管理模型
  26. pass
  27. @staticmethod
  28. def get_default_profile(**kwargs) -> Dict:
  29. default_profile = {
  30. "name": "",
  31. "nickname": "",
  32. "preferred_nickname": "",
  33. "gender": "未知",
  34. "age": 0,
  35. "region": '',
  36. "interests": [],
  37. "family_members": {},
  38. "health_conditions": [],
  39. "medications": [],
  40. "reminder_preferences": {
  41. "medication": True,
  42. "health": True,
  43. "weather": True,
  44. "news": False
  45. },
  46. "interaction_style": "standard", # standard, verbose, concise
  47. "interaction_frequency": "medium", # low, medium, high
  48. "last_topics": [],
  49. "created_at": int(time.time() * 1000),
  50. "human_intervention_history": []
  51. }
  52. for key, value in kwargs.items():
  53. if key in default_profile:
  54. default_profile[key] = value
  55. return default_profile
  56. def list_users(self, **kwargs) -> List[Dict]:
  57. pass
  58. class UserRelationManager(abc.ABC):
  59. @abc.abstractmethod
  60. def list_staffs(self):
  61. pass
  62. @abc.abstractmethod
  63. def list_users(self, staff_id: str, page: int = 1, page_size: int = 100):
  64. pass
  65. @abc.abstractmethod
  66. def list_staff_users(self) -> List[Dict]:
  67. pass
  68. @abc.abstractmethod
  69. def get_user_tags(self, user_id: str) -> List[str]:
  70. pass
  71. class LocalUserManager(UserManager):
  72. def get_user_profile(self, user_id) -> Dict:
  73. """加载用户个人资料,如不存在则创建默认资料。主要用于本地调试"""
  74. try:
  75. with open(f"user_profiles/{user_id}.json", "r", encoding="utf-8") as f:
  76. return json.load(f)
  77. except FileNotFoundError:
  78. # 创建默认用户资料
  79. default_profile = self.get_default_profile()
  80. self.save_user_profile(user_id, default_profile)
  81. return default_profile
  82. def save_user_profile(self, user_id, profile: Dict) -> None:
  83. if not user_id:
  84. raise Exception("Invalid user_id: {}".format(user_id))
  85. with open(f"user_profiles/{user_id}.json", "w", encoding="utf-8") as f:
  86. json.dump(profile, f, ensure_ascii=False, indent=2)
  87. def list_all_users(self):
  88. user_ids = []
  89. for root, dirs, files in os.walk('user_profiles/'):
  90. for file in files:
  91. if file.endswith('.json'):
  92. user_ids.append(os.path.splitext(file)[0])
  93. return user_ids
  94. def get_staff_profile(self, staff_id) -> Dict:
  95. return {}
  96. def list_users(self, **kwargs) -> List[Dict]:
  97. pass
  98. class MySQLUserManager(UserManager):
  99. def __init__(self, db_config, table_name, staff_table):
  100. self.db = MySQLManager(db_config)
  101. self.table_name = table_name
  102. self.staff_table = staff_table
  103. def get_user_profile(self, user_id) -> Dict:
  104. sql = f"SELECT name, wxid, profile_data_v1, gender" \
  105. f" FROM {self.table_name} WHERE third_party_user_id = {user_id}"
  106. data = self.db.select(sql, pymysql.cursors.DictCursor)
  107. if not data:
  108. logger.error(f"user[{user_id}] not found")
  109. return {}
  110. data = data[0]
  111. gender_map = {0: '未知', 1: '男', 2: '女', None: '未知'}
  112. gender = gender_map[data['gender']]
  113. default_profile = self.get_default_profile(nickname=data['name'], gender=gender)
  114. if not data['profile_data_v1']:
  115. logger.warning(f"user[{user_id}] profile not found, create a default one")
  116. self.save_user_profile(user_id, default_profile)
  117. return default_profile
  118. else:
  119. profile = json.loads(data['profile_data_v1'])
  120. # 资料条目有增加时,需合并更新
  121. entry_added = False
  122. for key, value in default_profile.items():
  123. if key not in profile:
  124. logger.debug(f"user[{user_id}] add profile key[{key}] value[{value}]")
  125. profile[key] = value
  126. entry_added = True
  127. if entry_added:
  128. self.save_user_profile(user_id, profile)
  129. return profile
  130. def save_user_profile(self, user_id, profile: Dict) -> None:
  131. if not user_id:
  132. raise Exception("Invalid user_id: {}".format(user_id))
  133. sql = f"UPDATE {self.table_name} SET profile_data_v1 = %s WHERE third_party_user_id = {user_id}"
  134. self.db.execute(sql, (json.dumps(profile),))
  135. def list_all_users(self):
  136. sql = f"SELECT third_party_user_id FROM {self.table_name}"
  137. data = self.db.select(sql, pymysql.cursors.DictCursor)
  138. return [user['third_party_user_id'] for user in data]
  139. def get_staff_profile(self, staff_id) -> Dict:
  140. if not self.staff_table:
  141. raise Exception("staff_table is not set")
  142. sql = f"SELECT agent_name, agent_gender, agent_age, agent_region " \
  143. f"FROM {self.staff_table} WHERE third_party_user_id = '{staff_id}'"
  144. data = self.db.select(sql, pymysql.cursors.DictCursor)
  145. if not data:
  146. logger.error(f"staff[{staff_id}] not found")
  147. return {}
  148. profile = data[0]
  149. # 转换性别格式
  150. gender_map = {0: '未知', 1: '男', 2: '女', None: '未知'}
  151. profile['agent_gender'] = gender_map[profile['agent_gender']]
  152. return profile
  153. def list_users(self, **kwargs) -> List[Dict]:
  154. user_union_id = kwargs.get('user_union_id', None)
  155. user_name = kwargs.get('user_name', None)
  156. if not user_union_id and not user_name:
  157. raise Exception("user_union_id or user_name is required")
  158. sql = f"SELECT third_party_user_id, wxid, name, iconurl, gender FROM {self.table_name} WHERE 1=1 "
  159. if user_name:
  160. sql += f"AND name = '{user_name}' COLLATE utf8mb4_bin "
  161. if user_union_id:
  162. sql += f"AND wxid = '{user_union_id}' "
  163. data = self.db.select(sql, pymysql.cursors.DictCursor)
  164. return data
  165. class MySQLUserRelationManager(UserRelationManager):
  166. def __init__(self, agent_db_config, wecom_db_config,
  167. agent_staff_table, agent_user_table,
  168. staff_table, relation_table, user_table):
  169. # FIXME(zhoutian): 因为现在数据库表不统一,需要从两个库读取
  170. self.agent_db = MySQLManager(agent_db_config)
  171. self.wecom_db = MySQLManager(wecom_db_config)
  172. self.agent_staff_table = agent_staff_table
  173. self.staff_table = staff_table
  174. self.relation_table = relation_table
  175. self.agent_user_table = agent_user_table
  176. self.user_table = user_table
  177. def list_staffs(self):
  178. sql = f"SELECT third_party_user_id, name, wxid, agent_name FROM {self.agent_staff_table} WHERE status = 1"
  179. data = self.agent_db.select(sql, pymysql.cursors.DictCursor)
  180. return data
  181. def list_users(self, staff_id: str, page: int = 1, page_size: int = 100):
  182. return []
  183. def list_staff_users(self):
  184. # FIXME(zhoutian)
  185. # 测试期间逻辑,只取一个账号
  186. sql = (f"SELECT third_party_user_id, wxid FROM {self.agent_staff_table} WHERE status = 1"
  187. f" AND third_party_user_id in ('1688854492669990', '1688855931724582')")
  188. agent_staff_data = self.agent_db.select(sql, pymysql.cursors.DictCursor)
  189. if not agent_staff_data:
  190. return []
  191. ret = []
  192. for agent_staff in agent_staff_data:
  193. wxid = agent_staff['wxid']
  194. sql = f"SELECT id FROM {self.staff_table} WHERE carrier_id = '{wxid}'"
  195. staff_data = self.wecom_db.select(sql, pymysql.cursors.DictCursor)
  196. if not staff_data:
  197. logger.error(f"staff[{wxid}] not found in wecom database")
  198. continue
  199. staff_id = staff_data[0]['id']
  200. sql = f"SELECT user_id FROM {self.relation_table} WHERE staff_id = '{staff_id}' AND is_delete = 0"
  201. user_data = self.wecom_db.select(sql, pymysql.cursors.DictCursor)
  202. if not user_data:
  203. logger.warning(f"staff[{wxid}] has no user")
  204. continue
  205. user_ids = tuple(user['user_id'] for user in user_data)
  206. # FIXME(zhoutian): 测试期间临时逻辑
  207. if agent_staff['third_party_user_id'] == '1688854492669990':
  208. sql = f"SELECT union_id FROM {self.user_table} WHERE id IN {str(user_ids)} AND union_id is not null"
  209. else:
  210. sql = f"SELECT union_id FROM {self.user_table} WHERE id IN {str(user_ids)} AND union_id is not null" \
  211. f" AND id in (SELECT distinct user_id FROM we_com_user_with_tag WHERE tag_id = 15 and is_delete = 0)"
  212. user_data = self.wecom_db.select(sql, pymysql.cursors.DictCursor)
  213. if not user_data:
  214. logger.warning(f"staff[{wxid}] users not found in wecom database")
  215. continue
  216. user_union_ids = tuple(user['union_id'] for user in user_data)
  217. batch_size = 100
  218. n_batches = (len(user_union_ids) + batch_size - 1) // batch_size
  219. agent_user_data = []
  220. for i in range(n_batches):
  221. idx_begin = i * batch_size
  222. idx_end = min((i + 1) * batch_size, len(user_union_ids))
  223. batch_union_ids = user_union_ids[idx_begin:idx_end]
  224. sql = f"SELECT third_party_user_id, wxid FROM {self.agent_user_table} WHERE wxid IN {str(batch_union_ids)}"
  225. batch_agent_user_data = self.agent_db.select(sql, pymysql.cursors.DictCursor)
  226. if len(agent_user_data) != len(batch_union_ids):
  227. # logger.debug(f"staff[{wxid}] some users not found in agent database")
  228. pass
  229. agent_user_data.extend(batch_agent_user_data)
  230. staff_user_pairs = [
  231. {
  232. 'staff_id': agent_staff['third_party_user_id'],
  233. 'user_id': agent_user['third_party_user_id']
  234. }
  235. for agent_user in agent_user_data
  236. ]
  237. ret.extend(staff_user_pairs)
  238. return ret
  239. def get_user_tags(self, user_id: str) -> List[str]:
  240. sql = f"SELECT wxid FROM {self.agent_user_table} WHERE third_party_user_id = '{user_id}' AND wxid is not null"
  241. user_data = self.agent_db.select(sql, pymysql.cursors.DictCursor)
  242. if not user_data:
  243. logger.error(f"user[{user_id}] has no wxid")
  244. return []
  245. user_wxid = user_data[0]['wxid']
  246. sql = f"""
  247. select b.tag_id, c.`tag_name` from `we_com_user` as a
  248. join `we_com_user_with_tag` as b
  249. join `we_com_tag` as c
  250. on a.`id` = b.`user_id`
  251. and b.`tag_id` = c.id
  252. where a.union_id = '{user_wxid}' """
  253. tag_data = self.wecom_db.select(sql, pymysql.cursors.DictCursor)
  254. tag_names = [tag['tag_name'] for tag in tag_data]
  255. return tag_names
  256. if __name__ == '__main__':
  257. config = configs.get()
  258. user_db_config = config['storage']['user']
  259. staff_db_config = config['storage']['staff']
  260. user_manager = MySQLUserManager(user_db_config['mysql'], user_db_config['table'], staff_db_config['table'])
  261. user_profile = user_manager.get_user_profile('7881301263964433')
  262. print(user_profile)
  263. wecom_db_config = config['storage']['user_relation']
  264. user_relation_manager = MySQLUserRelationManager(
  265. user_db_config['mysql'], wecom_db_config['mysql'],
  266. config['storage']['staff']['table'],
  267. user_db_config['table'],
  268. wecom_db_config['table']['staff'],
  269. wecom_db_config['table']['relation'],
  270. wecom_db_config['table']['user']
  271. )
  272. # all_staff_users = user_relation_manager.list_staff_users()
  273. user_tags = user_relation_manager.get_user_tags('7881302078008656')
  274. print(user_tags)