#! /usr/bin/env python # -*- coding: utf-8 -*- # vim:fenc=utf-8 from pqai_agent.logging_service import logger from typing import Dict, Optional, List import json import time import os import abc import pymysql.cursors from pqai_agent import configs from pqai_agent.database import MySQLManager class UserManager(abc.ABC): @abc.abstractmethod def get_user_profile(self, user_id) -> Dict: pass @abc.abstractmethod def save_user_profile(self, user_id, profile: Dict) -> None: pass @abc.abstractmethod def list_all_users(self): pass @abc.abstractmethod def get_staff_profile(self, staff_id) -> Dict: # FIXME(zhoutian): 重新设计用户和员工数据管理模型 pass @staticmethod def get_default_profile(**kwargs) -> Dict: default_profile = { "name": "", "nickname": "", "avatar": "", "preferred_nickname": "", "gender": "未知", "age": 0, "region": "", "interests": [], "family_members": {}, "health_conditions": [], "medications": [], "reminder_preferences": { "medication": True, "health": True, "weather": True, "news": False, }, "interaction_style": "standard", # standard, verbose, concise "interaction_frequency": "medium", # low, medium, high "last_topics": [], "created_at": int(time.time() * 1000), "human_intervention_history": [], } for key, value in kwargs.items(): if key in default_profile: default_profile[key] = value return default_profile def list_users(self, **kwargs) -> List[Dict]: pass class UserRelationManager(abc.ABC): @abc.abstractmethod def list_staffs(self): pass @abc.abstractmethod def list_users(self, staff_id: str, page: int = 1, page_size: int = 100): pass @abc.abstractmethod def list_staff_users(self, staff_id: str = None, tag_id: int = None) -> List[Dict]: pass @abc.abstractmethod def get_user_tags(self, user_id: str) -> List[str]: pass @abc.abstractmethod def stop_user_daily_push(self, user_id: str) -> bool: pass class LocalUserManager(UserManager): def get_user_profile(self, user_id) -> Dict: """加载用户个人资料,如不存在则创建默认资料。主要用于本地调试""" default_profile = self.get_default_profile() try: with open(f"user_profiles/{user_id}.json", "r", encoding="utf-8") as f: profile = json.load(f) entry_added = False for key, value in default_profile.items(): if key not in profile: logger.debug( f"user[{user_id}] add profile key[{key}] value[{value}]" ) profile[key] = value entry_added = True if entry_added: self.save_user_profile(user_id, profile) return profile except FileNotFoundError: # 创建默认用户资料 self.save_user_profile(user_id, default_profile) return default_profile def save_user_profile(self, user_id, profile: Dict) -> None: if not user_id: raise Exception("Invalid user_id: {}".format(user_id)) with open(f"user_profiles/{user_id}.json", "w", encoding="utf-8") as f: json.dump(profile, f, ensure_ascii=False, indent=2) def list_all_users(self): user_ids = [] for root, dirs, files in os.walk("../user_profiles/"): for file in files: if file.endswith(".json"): user_ids.append(os.path.splitext(file)[0]) return user_ids def get_staff_profile(self, staff_id) -> Dict: try: with open(f"user_profiles/{staff_id}.json", "r", encoding="utf-8") as f: profile = json.load(f) entry_added = False if entry_added: self.save_user_profile(staff_id, profile) return profile except Exception as e: logger.error("staff profile not found: {}".format(e)) return {} def list_users(self, **kwargs) -> List[Dict]: pass class MySQLUserManager(UserManager): PROFILE_EXCLUDE_ITEMS = [ "avatar", ] def __init__(self, db_config, table_name, staff_table): self.db = MySQLManager(db_config) self.table_name = table_name self.staff_table = staff_table def get_user_profile(self, user_id) -> Dict: sql = ( f"SELECT name, wxid, profile_data_v1, gender, iconurl as avatar" f" FROM {self.table_name} WHERE third_party_user_id = {user_id}" ) data = self.db.select(sql, pymysql.cursors.DictCursor) if not data: logger.error(f"user[{user_id}] not found") return {} data = data[0] gender_map = {0: "未知", 1: "男", 2: "女", None: "未知"} gender = gender_map[data["gender"]] default_profile = self.get_default_profile( nickname=data["name"], gender=gender, avatar=data["avatar"] ) if not data["profile_data_v1"]: logger.warning(f"user[{user_id}] profile not found, create a default one") self.save_user_profile(user_id, default_profile) return default_profile else: profile = json.loads(data["profile_data_v1"]) # 资料条目有增加时,需合并更新 entry_added = False for key, value in default_profile.items(): if key not in profile: # logger.debug(f"user[{user_id}] add profile key[{key}] value[{value}]") profile[key] = value entry_added = True if entry_added: self.save_user_profile(user_id, profile) return profile def save_user_profile(self, user_id, profile: Dict) -> None: if not user_id: raise Exception("Invalid user_id: {}".format(user_id)) if configs.get().get("debug_flags", {}).get("disable_database_write", False): return profile = profile.copy() for name in self.PROFILE_EXCLUDE_ITEMS: profile.pop(name, None) sql = f"UPDATE {self.table_name} SET profile_data_v1 = %s WHERE third_party_user_id = {user_id}" self.db.execute(sql, (json.dumps(profile),)) def list_all_users(self): sql = f"SELECT third_party_user_id FROM {self.table_name}" data = self.db.select(sql, pymysql.cursors.DictCursor) return [user["third_party_user_id"] for user in data] def get_staff_profile(self, staff_id) -> Dict: if not self.staff_table: raise Exception("staff_table is not set") return self.get_staff_profile_v3(staff_id) def get_staff_profile_v1(self, staff_id) -> Dict: sql = ( f"SELECT agent_name, agent_gender, agent_age, agent_region, agent_profile " f"FROM {self.staff_table} WHERE third_party_user_id = '{staff_id}'" ) data = self.db.select(sql, pymysql.cursors.DictCursor) if not data: logger.error(f"staff[{staff_id}] not found") return {} profile = data[0] # 转换性别格式 gender_map = {0: "未知", 1: "男", 2: "女", None: "未知"} profile["agent_gender"] = gender_map[profile["agent_gender"]] return profile def get_staff_profile_v2(self, staff_id) -> Dict: sql = ( f"SELECT agent_name as name, agent_gender as gender, agent_age as age, agent_region as region, agent_profile " f"FROM {self.staff_table} WHERE third_party_user_id = '{staff_id}'" ) data = self.db.select(sql, pymysql.cursors.DictCursor) if not data: logger.error(f"staff[{staff_id}] not found") return {} profile = data[0] # 转换性别格式 gender_map = {0: "未知", 1: "男", 2: "女", None: "未知"} profile["gender"] = gender_map[profile["gender"]] # 合并JSON字段(新版本)数据 if profile["agent_profile"]: detail_profile = json.loads(profile["agent_profile"]) profile.update(detail_profile) # 去除原始字段 profile.pop("agent_profile", None) return profile def get_staff_profile_v3(self, staff_id) -> Dict: sql = ( f"SELECT agent_profile " f"FROM {self.staff_table} WHERE third_party_user_id = '{staff_id}'" ) data = self.db.select(sql) if not data: logger.error(f"staff[{staff_id}] not found") return {} profile_str = data[0][0] if not profile_str: return {} profile = json.loads(profile_str) return profile def save_staff_profile(self, staff_id: str, profile: Dict): # 正常情况下不应该有此操作 if not self.staff_table: raise Exception("staff_table is not set") if not staff_id: raise Exception("Invalid staff_id: {}".format(staff_id)) sql = f"UPDATE {self.staff_table} SET agent_profile = %s WHERE third_party_user_id = '{staff_id}'" self.db.execute(sql, (json.dumps(profile),)) def list_users(self, **kwargs) -> List[Dict]: user_union_id = kwargs.get("user_union_id", None) user_name = kwargs.get("user_name", None) if not user_union_id and not user_name: raise Exception("user_union_id or user_name is required") sql = f"SELECT third_party_user_id, wxid, name, iconurl, gender FROM {self.table_name} WHERE 1=1 " if user_name: sql += f"AND name = '{user_name}' COLLATE utf8mb4_bin " if user_union_id: sql += f"AND wxid = '{user_union_id}' " data = self.db.select(sql, pymysql.cursors.DictCursor) return data def get_staff_sessions( self, staff_id, page_id: int = 1, page_size: int = 10, session_type: str = "default", ) -> List[Dict]: """ :param page_size: :param page_id: :param session_type: :param staff_id: :return: """ match session_type: case "active": sql = f""" select staff_id, current_state, user_id from agent_state where staff_id = %s and update_timestamp >= DATE_SUB(NOW(), INTERVAL 2 HOUR) order by update_timestamp desc; """ case "human_intervention": sql = f""" select staff_id, current_state, user_id from agent_state where staff_id = %s and current_state = 5 order by update_timestamp desc; """ case _: sql = f""" select t1.staff_id, t1.current_state, t1.user_id, t2.name, t2.iconurl from agent_state t1 join third_party_user t2 on t1.user_id = t2.third_party_user_id where t1.staff_id = %s order by IF(t1.current_state = 5, 0, 1), t1.update_timestamp desc limit {page_size + 1} offset {page_size * (page_id - 1)}; """ staff_sessions = self.db.select( sql, cursor_type=pymysql.cursors.DictCursor, args=(staff_id,) ) return staff_sessions def get_staff_sessions_summary_v1( self, staff_id, page_id: int, page_size: int, status: int ) -> Dict: """ :param status: staff status(0: unemployed, 1: employed) :param staff_id: staff :param page_id: page id :param page_size: page size :return: :todo: 未使用 Mysql 连接池,每次查询均需要与 MySQL 建立连接,性能较低,需要优化 """ if not staff_id: get_staff_query = f""" select third_party_user_id, name from {self.staff_table} where status = %s limit %s offset %s; """ staff_id_list = self.db.select( sql=get_staff_query, cursor_type=pymysql.cursors.DictCursor, args=(status, page_size + 1, (page_id - 1) * page_size), ) if not staff_id_list: return {} if len(staff_id_list) > page_size: has_next_page = True next_page_id = page_id + 1 staff_id_list = staff_id_list[:page_size] else: has_next_page = False next_page_id = None else: get_staff_query = f""" select third_party_user_id, name from {self.staff_table} where status = %s and third_party_user_id = %s; """ staff_id_list = self.db.select( sql=get_staff_query, cursor_type=pymysql.cursors.DictCursor, args=(status, staff_id), ) if not staff_id_list: return {} has_next_page = False next_page_id = None response_data = [ { "staff_id": staff["third_party_user_id"], "staff_name": staff["name"], "active_sessions": len( self.get_staff_sessions( staff["third_party_user_id"], session_type="active" ) ), "human_intervention_sessions": len( self.get_staff_sessions( staff["third_party_user_id"], session_type="human_intervention" ) ), } for staff in staff_id_list ] return { "has_next_page": has_next_page, "next_page_id": next_page_id, "data": response_data, } def get_staff_session_list_v1(self, staff_id, page_id: int, page_size: int) -> Dict: """ :param page_size: :param page_id: :param staff_id: :return: """ session_list = self.get_staff_sessions(staff_id, page_id, page_size) if len(session_list) > page_size: has_next_page = True next_page_id = page_id + 1 session_list = session_list[:page_size] else: has_next_page = False next_page_id = None response_data = [] for session in session_list: temp_obj = {} user_id = session["user_id"] room_id = ":".join(["private", staff_id, user_id]) select_query = f"""select content, max(sendtime) as max_timestamp from qywx_chat_history where roomid = %s;""" last_message = self.db.select( sql=select_query, cursor_type=pymysql.cursors.DictCursor, args=(room_id,), ) if not last_message: temp_obj["message"] = "" temp_obj["timestamp"] = 0 else: temp_obj["message"] = last_message[0]["content"] temp_obj["timestamp"] = last_message[0]["max_timestamp"] temp_obj["customer_id"] = user_id temp_obj["customer_name"] = session["name"] temp_obj["avatar"] = session["iconurl"] response_data.append(temp_obj) return { "staff_id": staff_id, "has_next_page": has_next_page, "next_page_id": next_page_id, "data": response_data, } def get_staff_list(self, page_id: int, page_size: int) -> Dict: """ :param page_size: :param page_id: :return: """ sql = f""" select t1.third_party_user_id as staff_id, t1.name as staff_name, t2.iconurl as avatar from qywx_employee t1 left join third_party_user t2 on t1.third_party_user_id = t2.third_party_user_id limit %s offset %s; """ staff_list = self.db.select( sql=sql, cursor_type=pymysql.cursors.DictCursor, args=(page_size + 1, page_size * (page_id - 1)), ) if len(staff_list) > page_size: has_next_page = True next_page_id = page_id + 1 staff_list = staff_list[:page_size] else: has_next_page = False next_page_id = None return { "has_next_page": has_next_page, "next_page": next_page_id, "data": staff_list, } def get_conversation_list_v1( self, staff_id: str, customer_id: str, page: Optional[int] ): """ :param staff_id: :param customer_id: :param page: timestamp :return: """ room_id = ":".join(["private", staff_id, customer_id]) page_size = 20 if not page: fetch_query = f""" select t1.sender, t2.name, t1.sendtime, t1.content, t2.iconurl, t1.msg_type from qywx_chat_history t1 join third_party_user t2 on t1.sender = t2.third_party_user_id where roomid = %s order by sendtime desc limit %s; """ messages = self.db.select( sql=fetch_query, cursor_type=pymysql.cursors.DictCursor, args=(room_id, page_size + 1), ) else: fetch_query = f""" select t1.sender, t2.name, t1.sendtime, t1.content, t2.iconurl, t1.msg_type from qywx_chat_history t1 join third_party_user t2 on t1.sender = t2.third_party_user_id where t1.roomid = %s and t1.sendtime <= %s order by sendtime desc limit %s; """ messages = self.db.select( sql=fetch_query, cursor_type=pymysql.cursors.DictCursor, args=(room_id, page, page_size + 1), ) if messages: if len(messages) > page_size: has_next_page = True next_page = messages[-1]["sendtime"] else: has_next_page = False next_page = None response_data = [ { "sender_id": message["sender"], "sender_name": message["name"], "avatar": message["iconurl"], "content": message["content"], "timestamp": message["sendtime"], "role": "customer" if message["sender"] == customer_id else "staff", "message_type": message["msg_type"], } for message in messages ] return { "staff_id": staff_id, "customer_id": customer_id, "has_next_page": has_next_page, "next_page": next_page, "data": response_data, } else: has_next_page = False next_page = None return { "staff_id": staff_id, "customer_id": customer_id, "has_next_page": has_next_page, "next_page": next_page, "data": [], } class LocalUserRelationManager(UserRelationManager): def __init__(self): pass def list_staffs(self): return [ { "third_party_user_id": "1688855931724582", "name": "", "wxid": "ShengHuoLeQu", "agent_name": "小芳", } ] def list_users(self, staff_id: str, page: int = 1, page_size: int = 100): return [] def list_staff_users(self, staff_id: str = None, tag_id: int = None): user_ids = [ "7881299453089278", "7881299453132630", "7881299454186909", "7881299455103430", "7881299455173476", "7881299456216398", "7881299457990953", "7881299461167644", "7881299463002136", "7881299464081604", "7881299465121735", "7881299465998082", "7881299466221881", "7881299467152300", "7881299470051791", "7881299470112816", "7881299471149567", "7881299471168030", "7881299471277650", "7881299473321703", ] user_ids = user_ids[:5] return [ {"staff_id": "1688855931724582", "user_id": "7881299670930896"}, *[ {"staff_id": "1688855931724582", "user_id": user_id} for user_id in user_ids ], ] def get_user_tags(self, user_id: str): return [] def stop_user_daily_push(self, user_id: str) -> bool: return True class MySQLUserRelationManager(UserRelationManager): def __init__( self, agent_db_config, wecom_db_config, agent_staff_table, agent_user_table, staff_table, relation_table, user_table, ): # FIXME(zhoutian): 因为现在数据库表不统一,需要从两个库读取 self.agent_db = MySQLManager(agent_db_config) self.wecom_db = MySQLManager(wecom_db_config) self.agent_staff_table = agent_staff_table self.staff_table = staff_table self.relation_table = relation_table self.agent_user_table = agent_user_table self.user_table = user_table def list_staffs(self): sql = f"SELECT third_party_user_id, name, wxid, agent_name FROM {self.agent_staff_table} WHERE status = 1" data = self.agent_db.select(sql, pymysql.cursors.DictCursor) return data def list_users(self, staff_id: str, page: int = 1, page_size: int = 100): return [] def list_staff_users(self, staff_id: str = None, tag_id: int = None): sql = f"SELECT third_party_user_id, wxid FROM {self.agent_staff_table} WHERE status = 1" if staff_id: sql += f" AND third_party_user_id = '{staff_id}'" agent_staff_data = self.agent_db.select(sql, pymysql.cursors.DictCursor) if not agent_staff_data: return [] ret = [] for agent_staff in agent_staff_data: wxid = agent_staff["wxid"] sql = f"SELECT id FROM {self.staff_table} WHERE carrier_id = '{wxid}'" staff_data = self.wecom_db.select(sql, pymysql.cursors.DictCursor) if not staff_data: logger.error(f"staff[{wxid}] not found in wecom database") continue staff_id = staff_data[0]["id"] sql = f"SELECT user_id FROM {self.relation_table} WHERE staff_id = '{staff_id}' AND is_delete = 0" user_data = self.wecom_db.select(sql, pymysql.cursors.DictCursor) if not user_data: logger.warning(f"staff[{wxid}] has no user") continue user_ids = tuple(user["user_id"] for user in user_data) sql = f"SELECT union_id FROM {self.user_table} WHERE id IN {str(user_ids)} AND union_id is not null" if tag_id: sql += f" AND id in (SELECT distinct user_id FROM we_com_user_with_tag WHERE tag_id = {tag_id} and is_delete = 0)" user_data = self.wecom_db.select(sql, pymysql.cursors.DictCursor) if not user_data: logger.warning(f"staff[{wxid}] users not found in wecom database") continue user_union_ids = tuple(user["union_id"] for user in user_data) batch_size = 500 n_batches = (len(user_union_ids) + batch_size - 1) // batch_size agent_user_data = [] for i in range(n_batches): idx_begin = i * batch_size idx_end = min((i + 1) * batch_size, len(user_union_ids)) batch_union_ids = user_union_ids[idx_begin:idx_end] sql = f"SELECT third_party_user_id, wxid FROM {self.agent_user_table} WHERE wxid IN {str(batch_union_ids)}" batch_agent_user_data = self.agent_db.select( sql, pymysql.cursors.DictCursor ) if len(agent_user_data) != len(batch_union_ids): # logger.debug(f"staff[{wxid}] some users not found in agent database") pass agent_user_data.extend(batch_agent_user_data) staff_user_pairs = [ { "staff_id": agent_staff["third_party_user_id"], "user_id": agent_user["third_party_user_id"], } for agent_user in agent_user_data ] ret.extend(staff_user_pairs) return ret def get_user_union_id(self, user_id: str) -> Optional[str]: sql = f"SELECT wxid FROM {self.agent_user_table} WHERE third_party_user_id = '{user_id}' AND wxid is not null" user_data = self.agent_db.select(sql, pymysql.cursors.DictCursor) if not user_data: logger.error(f"user[{user_id}] has no union id") return None union_id = user_data[0]["wxid"] return union_id def get_user_tags(self, user_id: str) -> List[str]: union_id = self.get_user_union_id(user_id) if not union_id: return [] sql = f""" select b.tag_id, c.`tag_name` from `we_com_user` as a join `we_com_user_with_tag` as b join `we_com_tag` as c on a.`id` = b.`user_id` and b.`tag_id` = c.id where a.union_id = '{union_id}' """ tag_data = self.wecom_db.select(sql, pymysql.cursors.DictCursor) tag_names = [tag["tag_name"] for tag in tag_data] return tag_names def stop_user_daily_push(self, user_id: str) -> bool: try: union_id = self.get_user_union_id(user_id) if not union_id: return False sql = f"UPDATE {self.user_table} SET group_msg_disabled = 1 WHERE union_id = %s" rows = self.wecom_db.execute(sql, (union_id,)) if rows > 0: return True else: return False except Exception as e: logger.error(f"stop_user_daily_push failed: {e}") return False if __name__ == "__main__": config = configs.get() user_db_config = config["storage"]["user"] staff_db_config = config["storage"]["staff"] user_manager = MySQLUserManager( user_db_config["mysql"], user_db_config["table"], staff_db_config["table"] ) user_profile = user_manager.get_user_profile("7881301263964433") print(user_profile) wecom_db_config = config["storage"]["user_relation"] user_relation_manager = MySQLUserRelationManager( user_db_config["mysql"], wecom_db_config["mysql"], config["storage"]["staff"]["table"], user_db_config["table"], wecom_db_config["table"]["staff"], wecom_db_config["table"]["relation"], wecom_db_config["table"]["user"], ) # all_staff_users = user_relation_manager.list_staff_users() user_tags = user_relation_manager.get_user_tags("7881302078008656") print(user_tags)