user_manager.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671
  1. #! /usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # vim:fenc=utf-8
  4. from pqai_agent.logging_service import logger
  5. from typing import Dict, Optional, List
  6. import json
  7. import time
  8. import os
  9. import abc
  10. import pymysql.cursors
  11. from pqai_agent import configs
  12. from pqai_agent.database import MySQLManager
  13. class UserManager(abc.ABC):
  14. @abc.abstractmethod
  15. def get_user_profile(self, user_id) -> Dict:
  16. pass
  17. @abc.abstractmethod
  18. def save_user_profile(self, user_id, profile: Dict) -> None:
  19. pass
  20. @abc.abstractmethod
  21. def list_all_users(self):
  22. pass
  23. @abc.abstractmethod
  24. def get_staff_profile(self, staff_id) -> Dict:
  25. #FIXME(zhoutian): 重新设计用户和员工数据管理模型
  26. pass
  27. @staticmethod
  28. def get_default_profile(**kwargs) -> Dict:
  29. default_profile = {
  30. "name": "",
  31. "nickname": "",
  32. "avatar": "",
  33. "preferred_nickname": "",
  34. "gender": "未知",
  35. "age": 0,
  36. "region": '',
  37. "interests": [],
  38. "family_members": {},
  39. "health_conditions": [],
  40. "medications": [],
  41. "reminder_preferences": {
  42. "medication": True,
  43. "health": True,
  44. "weather": True,
  45. "news": False
  46. },
  47. "interaction_style": "standard", # standard, verbose, concise
  48. "interaction_frequency": "medium", # low, medium, high
  49. "last_topics": [],
  50. "created_at": int(time.time() * 1000),
  51. "human_intervention_history": []
  52. }
  53. for key, value in kwargs.items():
  54. if key in default_profile:
  55. default_profile[key] = value
  56. return default_profile
  57. def list_users(self, **kwargs) -> List[Dict]:
  58. pass
  59. class UserRelationManager(abc.ABC):
  60. @abc.abstractmethod
  61. def list_staffs(self):
  62. pass
  63. @abc.abstractmethod
  64. def list_users(self, staff_id: str, page: int = 1, page_size: int = 100):
  65. pass
  66. @abc.abstractmethod
  67. def list_staff_users(self, staff_id: str = None, tag_id: int = None) -> List[Dict]:
  68. pass
  69. @abc.abstractmethod
  70. def get_user_tags(self, user_id: str) -> List[str]:
  71. pass
  72. @abc.abstractmethod
  73. def stop_user_daily_push(self, user_id: str) -> bool:
  74. pass
  75. class LocalUserManager(UserManager):
  76. def get_user_profile(self, user_id) -> Dict:
  77. """加载用户个人资料,如不存在则创建默认资料。主要用于本地调试"""
  78. default_profile = self.get_default_profile()
  79. try:
  80. with open(f"user_profiles/{user_id}.json", "r", encoding="utf-8") as f:
  81. profile = json.load(f)
  82. entry_added = False
  83. for key, value in default_profile.items():
  84. if key not in profile:
  85. logger.debug(f"user[{user_id}] add profile key[{key}] value[{value}]")
  86. profile[key] = value
  87. entry_added = True
  88. if entry_added:
  89. self.save_user_profile(user_id, profile)
  90. return profile
  91. except FileNotFoundError:
  92. # 创建默认用户资料
  93. self.save_user_profile(user_id, default_profile)
  94. return default_profile
  95. def save_user_profile(self, user_id, profile: Dict) -> None:
  96. if not user_id:
  97. raise Exception("Invalid user_id: {}".format(user_id))
  98. with open(f"user_profiles/{user_id}.json", "w", encoding="utf-8") as f:
  99. json.dump(profile, f, ensure_ascii=False, indent=2)
  100. def list_all_users(self):
  101. user_ids = []
  102. for root, dirs, files in os.walk('../user_profiles/'):
  103. for file in files:
  104. if file.endswith('.json'):
  105. user_ids.append(os.path.splitext(file)[0])
  106. return user_ids
  107. def get_staff_profile(self, staff_id) -> Dict:
  108. try:
  109. with open(f"user_profiles/{staff_id}.json", "r", encoding="utf-8") as f:
  110. profile = json.load(f)
  111. entry_added = False
  112. if entry_added:
  113. self.save_user_profile(staff_id, profile)
  114. return profile
  115. except Exception as e:
  116. logger.error("staff profile not found: {}".format(e))
  117. return {}
  118. def list_users(self, **kwargs) -> List[Dict]:
  119. pass
  120. class MySQLUserManager(UserManager):
  121. PROFILE_EXCLUDE_ITEMS = ['avatar', ]
  122. def __init__(self, db_config, table_name, staff_table):
  123. self.db = MySQLManager(db_config)
  124. self.table_name = table_name
  125. self.staff_table = staff_table
  126. def get_user_profile(self, user_id) -> Dict:
  127. sql = f"SELECT name, wxid, profile_data_v1, gender, iconurl as avatar" \
  128. f" FROM {self.table_name} WHERE third_party_user_id = {user_id}"
  129. data = self.db.select(sql, pymysql.cursors.DictCursor)
  130. if not data:
  131. logger.error(f"user[{user_id}] not found")
  132. return {}
  133. data = data[0]
  134. gender_map = {0: '未知', 1: '男', 2: '女', None: '未知'}
  135. gender = gender_map[data['gender']]
  136. default_profile = self.get_default_profile(nickname=data['name'], gender=gender, avatar=data['avatar'])
  137. if not data['profile_data_v1']:
  138. logger.warning(f"user[{user_id}] profile not found, create a default one")
  139. self.save_user_profile(user_id, default_profile)
  140. return default_profile
  141. else:
  142. profile = json.loads(data['profile_data_v1'])
  143. # 资料条目有增加时,需合并更新
  144. entry_added = False
  145. for key, value in default_profile.items():
  146. if key not in profile:
  147. # logger.debug(f"user[{user_id}] add profile key[{key}] value[{value}]")
  148. profile[key] = value
  149. entry_added = True
  150. if entry_added:
  151. self.save_user_profile(user_id, profile)
  152. return profile
  153. def save_user_profile(self, user_id, profile: Dict) -> None:
  154. if not user_id:
  155. raise Exception("Invalid user_id: {}".format(user_id))
  156. if configs.get().get('debug_flags', {}).get('disable_database_write', False):
  157. return
  158. profile = profile.copy()
  159. for name in self.PROFILE_EXCLUDE_ITEMS:
  160. profile.pop(name, None)
  161. sql = f"UPDATE {self.table_name} SET profile_data_v1 = %s WHERE third_party_user_id = {user_id}"
  162. self.db.execute(sql, (json.dumps(profile),))
  163. def list_all_users(self):
  164. sql = f"SELECT third_party_user_id FROM {self.table_name}"
  165. data = self.db.select(sql, pymysql.cursors.DictCursor)
  166. return [user['third_party_user_id'] for user in data]
  167. def get_staff_profile(self, staff_id) -> Dict:
  168. if not self.staff_table:
  169. raise Exception("staff_table is not set")
  170. return self.get_staff_profile_v3(staff_id)
  171. def get_staff_profile_v1(self, staff_id) -> Dict:
  172. sql = f"SELECT agent_name, agent_gender, agent_age, agent_region, agent_profile " \
  173. f"FROM {self.staff_table} WHERE third_party_user_id = '{staff_id}'"
  174. data = self.db.select(sql, pymysql.cursors.DictCursor)
  175. if not data:
  176. logger.error(f"staff[{staff_id}] not found")
  177. return {}
  178. profile = data[0]
  179. # 转换性别格式
  180. gender_map = {0: '未知', 1: '男', 2: '女', None: '未知'}
  181. profile['agent_gender'] = gender_map[profile['agent_gender']]
  182. return profile
  183. def get_staff_profile_v2(self, staff_id) -> Dict:
  184. sql = f"SELECT agent_name as name, agent_gender as gender, agent_age as age, agent_region as region, agent_profile " \
  185. f"FROM {self.staff_table} WHERE third_party_user_id = '{staff_id}'"
  186. data = self.db.select(sql, pymysql.cursors.DictCursor)
  187. if not data:
  188. logger.error(f"staff[{staff_id}] not found")
  189. return {}
  190. profile = data[0]
  191. # 转换性别格式
  192. gender_map = {0: '未知', 1: '男', 2: '女', None: '未知'}
  193. profile['gender'] = gender_map[profile['gender']]
  194. # 合并JSON字段(新版本)数据
  195. if profile['agent_profile']:
  196. detail_profile = json.loads(profile['agent_profile'])
  197. profile.update(detail_profile)
  198. # 去除原始字段
  199. profile.pop('agent_profile', None)
  200. return profile
  201. def get_staff_profile_v3(self, staff_id) -> Dict:
  202. sql = f"SELECT agent_profile " \
  203. f"FROM {self.staff_table} WHERE third_party_user_id = '{staff_id}'"
  204. data = self.db.select(sql)
  205. if not data:
  206. logger.error(f"staff[{staff_id}] not found")
  207. return {}
  208. profile_str = data[0][0]
  209. if not profile_str:
  210. return {}
  211. profile = json.loads(profile_str)
  212. return profile
  213. def save_staff_profile(self, staff_id: str, profile: Dict):
  214. # 正常情况下不应该有此操作
  215. if not self.staff_table:
  216. raise Exception("staff_table is not set")
  217. if not staff_id:
  218. raise Exception("Invalid staff_id: {}".format(staff_id))
  219. sql = f"UPDATE {self.staff_table} SET agent_profile = %s WHERE third_party_user_id = '{staff_id}'"
  220. self.db.execute(sql, (json.dumps(profile),))
  221. def list_users(self, **kwargs) -> List[Dict]:
  222. user_union_id = kwargs.get('user_union_id', None)
  223. user_name = kwargs.get('user_name', None)
  224. if not user_union_id and not user_name:
  225. raise Exception("user_union_id or user_name is required")
  226. sql = f"SELECT third_party_user_id, wxid, name, iconurl, gender FROM {self.table_name} WHERE 1=1 "
  227. if user_name:
  228. sql += f"AND name = '{user_name}' COLLATE utf8mb4_bin "
  229. if user_union_id:
  230. sql += f"AND wxid = '{user_union_id}' "
  231. data = self.db.select(sql, pymysql.cursors.DictCursor)
  232. return data
  233. def get_staff_sessions(self, staff_id, page_id: int = 1, page_size: int = 10, session_type: str = 'default') -> List[Dict]:
  234. """
  235. :param page_size:
  236. :param page_id:
  237. :param session_type:
  238. :param staff_id:
  239. :return:
  240. """
  241. match session_type:
  242. case 'active':
  243. sql = f"""
  244. select staff_id, current_state, user_id
  245. from agent_state
  246. where staff_id = %s and update_timestamp >= DATE_SUB(NOW(), INTERVAL 2 HOUR)
  247. order by update_timestamp desc;
  248. """
  249. case 'human_intervention':
  250. sql = f"""
  251. select staff_id, current_state, user_id
  252. from agent_state
  253. where staff_id = %s and current_state = 5 order by update_timestamp desc;
  254. """
  255. case _:
  256. sql = f"""
  257. select t1.staff_id, t1.current_state, t1.user_id, t2.name, t2.iconurl
  258. from agent_state t1 join third_party_user t2
  259. on t1.user_id = t2.third_party_user_id
  260. where t1.staff_id = %s
  261. order by
  262. IF(t1.current_state = 5, 0, 1),
  263. t1.update_timestamp desc
  264. limit {page_size + 1} offset {page_size * (page_id - 1)};
  265. """
  266. staff_sessions = self.db.select(sql, cursor_type=pymysql.cursors.DictCursor, args=(staff_id, ))
  267. return staff_sessions
  268. def get_staff_sessions_summary_v1(self, staff_id, page_id: int, page_size: int, status: int) -> Dict:
  269. """
  270. :param status: staff status(0: unemployed, 1: employed)
  271. :param staff_id: staff
  272. :param page_id: page id
  273. :param page_size: page size
  274. :return:
  275. :todo: 未使用 Mysql 连接池,每次查询均需要与 MySQL 建立连接,性能较低,需要优化
  276. """
  277. if not staff_id:
  278. get_staff_query = f"""
  279. select third_party_user_id, name from {self.staff_table} where status = %s
  280. limit %s offset %s;
  281. """
  282. staff_id_list = self.db.select(
  283. sql=get_staff_query,
  284. cursor_type=pymysql.cursors.DictCursor,
  285. args=(status, page_size + 1, (page_id - 1) * page_size)
  286. )
  287. if not staff_id_list:
  288. return {}
  289. if len(staff_id_list) > page_size:
  290. has_next_page = True
  291. next_page_id = page_id + 1
  292. staff_id_list = staff_id_list[:page_size]
  293. else:
  294. has_next_page = False
  295. next_page_id = None
  296. else:
  297. get_staff_query = f"""
  298. select third_party_user_id, name from {self.staff_table}
  299. where status = %s and third_party_user_id = %s;
  300. """
  301. staff_id_list = self.db.select(
  302. sql=get_staff_query,
  303. cursor_type=pymysql.cursors.DictCursor,
  304. args=(status, staff_id)
  305. )
  306. if not staff_id_list:
  307. return {}
  308. has_next_page = False
  309. next_page_id = None
  310. response_data = [
  311. {
  312. 'staff_id': staff['third_party_user_id'],
  313. 'staff_name': staff['name'],
  314. 'active_sessions': len(self.get_staff_sessions(staff['third_party_user_id'], session_type='active')),
  315. 'human_intervention_sessions': len(self.get_staff_sessions(staff['third_party_user_id'], session_type='human_intervention'))
  316. }
  317. for staff in staff_id_list
  318. ]
  319. return {
  320. 'has_next_page': has_next_page,
  321. 'next_page_id': next_page_id,
  322. 'data': response_data
  323. }
  324. def get_staff_session_list_v1(self, staff_id, page_id: int, page_size: int) -> Dict:
  325. """
  326. :param page_size:
  327. :param page_id:
  328. :param staff_id:
  329. :return:
  330. """
  331. session_list = self.get_staff_sessions(staff_id, page_id, page_size)
  332. if len(session_list) > page_size:
  333. has_next_page = True
  334. next_page_id = page_id + 1
  335. session_list = session_list[:page_size]
  336. else:
  337. has_next_page = False
  338. next_page_id = None
  339. response_data = []
  340. for session in session_list:
  341. temp_obj = {}
  342. user_id = session['user_id']
  343. room_id = ':'.join(['private', staff_id, user_id])
  344. select_query = f"""select content, max(sendtime) as max_timestamp from qywx_chat_history where roomid = %s;"""
  345. last_message = self.db.select(
  346. sql=select_query,
  347. cursor_type=pymysql.cursors.DictCursor,
  348. args=(room_id,)
  349. )
  350. if not last_message:
  351. temp_obj['message'] = ''
  352. temp_obj['timestamp'] = 0
  353. else:
  354. temp_obj['message'] = last_message[0]['content']
  355. temp_obj['timestamp'] = last_message[0]['max_timestamp']
  356. temp_obj['customer_id'] = user_id
  357. temp_obj['customer_name'] = session['name']
  358. temp_obj['avatar'] = session['iconurl']
  359. response_data.append(temp_obj)
  360. return {
  361. "staff_id": staff_id,
  362. "has_next_page": has_next_page,
  363. "next_page_id": next_page_id,
  364. "data": response_data
  365. }
  366. def get_staff_list(self, page_id: int, page_size: int) -> Dict:
  367. """
  368. :param page_size:
  369. :param page_id:
  370. :return:
  371. """
  372. sql = f"""
  373. select t1.third_party_user_id as staff_id, t1.name as staff_name, t2.iconurl as avatar
  374. from qywx_employee t1 left join third_party_user t2
  375. on t1.third_party_user_id = t2.third_party_user_id
  376. limit %s offset %s;
  377. """
  378. staff_list = self.db.select(
  379. sql=sql,
  380. cursor_type=pymysql.cursors.DictCursor,
  381. args=(page_size + 1, page_size * (page_id - 1))
  382. )
  383. if len(staff_list) > page_size:
  384. has_next_page = True
  385. next_page_id = page_id + 1
  386. staff_list = staff_list[:page_size]
  387. else:
  388. has_next_page = False
  389. next_page_id = None
  390. return {
  391. "has_next_page": has_next_page,
  392. "next_page": next_page_id,
  393. "data": staff_list
  394. }
  395. def get_conversation_list_v1(self, staff_id: str, customer_id: str, page: Optional[int]):
  396. """
  397. :param staff_id:
  398. :param customer_id:
  399. :param page: timestamp
  400. :return:
  401. """
  402. room_id = ':'.join(['private', staff_id, customer_id])
  403. page_size = 20
  404. if not page:
  405. fetch_query = f"""
  406. select t1.sender, t2.name, t1.sendtime, t1.content, t2.iconurl
  407. from qywx_chat_history t1
  408. join third_party_user t2 on t1.sender = t2.third_party_user_id
  409. where roomid = %s
  410. order by sendtime desc
  411. limit %s;
  412. """
  413. messages = self.db.select(
  414. sql=fetch_query,
  415. cursor_type=pymysql.cursors.DictCursor,
  416. args=(room_id, page_size + 1)
  417. )
  418. else:
  419. fetch_query = f"""
  420. select t1.sender, t2.name, t1.sendtime, t1.content, t2.iconurl
  421. from qywx_chat_history t1
  422. join third_party_user t2 on t1.sender = t2.third_party_user_id
  423. where t1.roomid = %s and t1.sendtime <= %s
  424. order by sendtime desc
  425. limit %s;
  426. """
  427. messages = self.db.select(
  428. sql=fetch_query,
  429. cursor_type=pymysql.cursors.DictCursor,
  430. args=(room_id, page, page_size + 1)
  431. )
  432. if messages:
  433. if len(messages) > page_size:
  434. has_next_page = True
  435. next_page = messages[-1]['sendtime']
  436. else:
  437. has_next_page = False
  438. next_page = None
  439. response_data = [
  440. {
  441. "sender_id": message['sender'],
  442. "sender_name": message['name'],
  443. "avatar": message['iconurl'],
  444. "content": message['content'],
  445. "timestamp": message['sendtime'],
  446. "role": "customer" if message['sender'] == customer_id else "staff"
  447. }
  448. for message in messages
  449. ]
  450. return {
  451. "staff_id": staff_id,
  452. "customer_id": customer_id,
  453. "has_next_page": has_next_page,
  454. "next_page": next_page,
  455. "data": response_data
  456. }
  457. else:
  458. has_next_page = False
  459. next_page = None
  460. return {
  461. "staff_id": staff_id,
  462. "customer_id": customer_id,
  463. "has_next_page": has_next_page,
  464. "next_page": next_page,
  465. "data": []
  466. }
  467. class LocalUserRelationManager(UserRelationManager):
  468. def __init__(self):
  469. pass
  470. def list_staffs(self):
  471. return [
  472. {"third_party_user_id": '1688855931724582', "name": "", "wxid": "ShengHuoLeQu", "agent_name": "小芳"}
  473. ]
  474. def list_users(self, staff_id: str, page: int = 1, page_size: int = 100):
  475. return []
  476. def list_staff_users(self, staff_id: str = None, tag_id: int = None):
  477. user_ids = ['7881299453089278', '7881299453132630', '7881299454186909', '7881299455103430', '7881299455173476',
  478. '7881299456216398', '7881299457990953', '7881299461167644', '7881299463002136', '7881299464081604',
  479. '7881299465121735', '7881299465998082', '7881299466221881', '7881299467152300', '7881299470051791',
  480. '7881299470112816', '7881299471149567', '7881299471168030', '7881299471277650', '7881299473321703']
  481. user_ids = user_ids[:5]
  482. return [
  483. {"staff_id": "1688855931724582", "user_id": "7881299670930896"},
  484. *[{"staff_id": "1688855931724582", "user_id": user_id} for user_id in user_ids]
  485. ]
  486. def get_user_tags(self, user_id: str):
  487. return []
  488. def stop_user_daily_push(self, user_id: str) -> bool:
  489. return True
  490. class MySQLUserRelationManager(UserRelationManager):
  491. def __init__(self, agent_db_config, wecom_db_config,
  492. agent_staff_table, agent_user_table,
  493. staff_table, relation_table, user_table):
  494. # FIXME(zhoutian): 因为现在数据库表不统一,需要从两个库读取
  495. self.agent_db = MySQLManager(agent_db_config)
  496. self.wecom_db = MySQLManager(wecom_db_config)
  497. self.agent_staff_table = agent_staff_table
  498. self.staff_table = staff_table
  499. self.relation_table = relation_table
  500. self.agent_user_table = agent_user_table
  501. self.user_table = user_table
  502. def list_staffs(self):
  503. sql = f"SELECT third_party_user_id, name, wxid, agent_name FROM {self.agent_staff_table} WHERE status = 1"
  504. data = self.agent_db.select(sql, pymysql.cursors.DictCursor)
  505. return data
  506. def list_users(self, staff_id: str, page: int = 1, page_size: int = 100):
  507. return []
  508. def list_staff_users(self, staff_id: str = None, tag_id: int = None):
  509. sql = f"SELECT third_party_user_id, wxid FROM {self.agent_staff_table} WHERE status = 1"
  510. if staff_id:
  511. sql += f" AND third_party_user_id = '{staff_id}'"
  512. agent_staff_data = self.agent_db.select(sql, pymysql.cursors.DictCursor)
  513. if not agent_staff_data:
  514. return []
  515. ret = []
  516. for agent_staff in agent_staff_data:
  517. wxid = agent_staff['wxid']
  518. sql = f"SELECT id FROM {self.staff_table} WHERE carrier_id = '{wxid}'"
  519. staff_data = self.wecom_db.select(sql, pymysql.cursors.DictCursor)
  520. if not staff_data:
  521. logger.error(f"staff[{wxid}] not found in wecom database")
  522. continue
  523. staff_id = staff_data[0]['id']
  524. sql = f"SELECT user_id FROM {self.relation_table} WHERE staff_id = '{staff_id}' AND is_delete = 0"
  525. user_data = self.wecom_db.select(sql, pymysql.cursors.DictCursor)
  526. if not user_data:
  527. logger.warning(f"staff[{wxid}] has no user")
  528. continue
  529. user_ids = tuple(user['user_id'] for user in user_data)
  530. sql = f"SELECT union_id FROM {self.user_table} WHERE id IN {str(user_ids)} AND union_id is not null"
  531. if tag_id:
  532. sql += f" AND id in (SELECT distinct user_id FROM we_com_user_with_tag WHERE tag_id = {tag_id} and is_delete = 0)"
  533. user_data = self.wecom_db.select(sql, pymysql.cursors.DictCursor)
  534. if not user_data:
  535. logger.warning(f"staff[{wxid}] users not found in wecom database")
  536. continue
  537. user_union_ids = tuple(user['union_id'] for user in user_data)
  538. batch_size = 500
  539. n_batches = (len(user_union_ids) + batch_size - 1) // batch_size
  540. agent_user_data = []
  541. for i in range(n_batches):
  542. idx_begin = i * batch_size
  543. idx_end = min((i + 1) * batch_size, len(user_union_ids))
  544. batch_union_ids = user_union_ids[idx_begin:idx_end]
  545. sql = f"SELECT third_party_user_id, wxid FROM {self.agent_user_table} WHERE wxid IN {str(batch_union_ids)}"
  546. batch_agent_user_data = self.agent_db.select(sql, pymysql.cursors.DictCursor)
  547. if len(agent_user_data) != len(batch_union_ids):
  548. # logger.debug(f"staff[{wxid}] some users not found in agent database")
  549. pass
  550. agent_user_data.extend(batch_agent_user_data)
  551. staff_user_pairs = [
  552. {
  553. 'staff_id': agent_staff['third_party_user_id'],
  554. 'user_id': agent_user['third_party_user_id']
  555. }
  556. for agent_user in agent_user_data
  557. ]
  558. ret.extend(staff_user_pairs)
  559. return ret
  560. def get_user_union_id(self, user_id: str) -> Optional[str]:
  561. sql = f"SELECT wxid FROM {self.agent_user_table} WHERE third_party_user_id = '{user_id}' AND wxid is not null"
  562. user_data = self.agent_db.select(sql, pymysql.cursors.DictCursor)
  563. if not user_data:
  564. logger.error(f"user[{user_id}] has no union id")
  565. return None
  566. union_id = user_data[0]['wxid']
  567. return union_id
  568. def get_user_tags(self, user_id: str) -> List[str]:
  569. union_id = self.get_user_union_id(user_id)
  570. if not union_id:
  571. return []
  572. sql = f"""
  573. select b.tag_id, c.`tag_name` from `we_com_user` as a
  574. join `we_com_user_with_tag` as b
  575. join `we_com_tag` as c
  576. on a.`id` = b.`user_id`
  577. and b.`tag_id` = c.id
  578. where a.union_id = '{union_id}' """
  579. tag_data = self.wecom_db.select(sql, pymysql.cursors.DictCursor)
  580. tag_names = [tag['tag_name'] for tag in tag_data]
  581. return tag_names
  582. def stop_user_daily_push(self, user_id: str) -> bool:
  583. try:
  584. union_id = self.get_user_union_id(user_id)
  585. if not union_id:
  586. return False
  587. sql = f"UPDATE {self.user_table} SET group_msg_disabled = 1 WHERE union_id = %s"
  588. rows = self.wecom_db.execute(sql, (union_id, ))
  589. if rows > 0:
  590. return True
  591. else:
  592. return False
  593. except Exception as e:
  594. logger.error(f"stop_user_daily_push failed: {e}")
  595. return False
  596. if __name__ == '__main__':
  597. config = configs.get()
  598. user_db_config = config['storage']['user']
  599. staff_db_config = config['storage']['staff']
  600. user_manager = MySQLUserManager(user_db_config['mysql'], user_db_config['table'], staff_db_config['table'])
  601. user_profile = user_manager.get_user_profile('7881301263964433')
  602. print(user_profile)
  603. wecom_db_config = config['storage']['user_relation']
  604. user_relation_manager = MySQLUserRelationManager(
  605. user_db_config['mysql'], wecom_db_config['mysql'],
  606. config['storage']['staff']['table'],
  607. user_db_config['table'],
  608. wecom_db_config['table']['staff'],
  609. wecom_db_config['table']['relation'],
  610. wecom_db_config['table']['user']
  611. )
  612. # all_staff_users = user_relation_manager.list_staff_users()
  613. user_tags = user_relation_manager.get_user_tags('7881302078008656')
  614. print(user_tags)