123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 |
- import abc
- import json
- import pymysql.cursors
- from typing import Dict, List
- from pqai_agent.database import MySQLManager
- class StaffManager(abc.ABC):
- @abc.abstractmethod
- def list_all_staffs(self, page_id: int, page_size: int) -> List[Dict]:
- pass
- @abc.abstractmethod
- def get_staff_profile(self, staff_id) -> Dict:
- pass
- @abc.abstractmethod
- def save_staff_profile(self, staff_id: str, staff_profile: Dict):
- pass
- class MySQLStaffManager(StaffManager):
- def __init__(self, db_config, staff_table, user_table, config):
- self.db = MySQLManager(db_config)
- self.staff_table = staff_table
- self.user_table = user_table
- self.config = config
- def save_staff_profile(self, staff_id: str, staff_profile: Dict):
- update_query = f"""
- update {self.staff_table} set agent_profile = %s where third_party_user_id = %s;
- """
- affected_rows = self.db.execute(
- update_query, (json.dumps(staff_profile), staff_id)
- )
- return affected_rows
- def get_staff_profile(self, staff_id) -> Dict:
- profile_obj = {"staff_id": staff_id}
- sql = f"""select agent_profile from {self.staff_table} where third_party_user_id = %s;"""
- response = self.db.select(
- sql=sql,
- cursor_type=pymysql.cursors.DictCursor,
- args=(staff_id,),
- )
- if not response:
- profile_obj["data"] = []
- return profile_obj
- agent_profile = response[0]["agent_profile"]
- if not agent_profile:
- profile_obj["data"] = []
- else:
- field_map_list = self.list_profile_fields()
- field_map = {
- item["field_name"]: item["display_name"] for item in field_map_list
- }
- agent_profile = json.loads(agent_profile)
- profile_obj["data"] = [
- {
- "field_name": key,
- "display_name": field_map[key],
- "field_value": value,
- }
- for key, value in agent_profile.items()
- if agent_profile.get(key)
- ]
- return profile_obj
- def list_all_staffs(self, page_id: int, page_size: int) -> Dict:
- """
- :param page_size:
- :param page_id:
- :return:
- """
- sql = f"""
- select t1.third_party_user_id as staff_id, t1.name as staff_name, t2.iconurl as avatar
- from {self.staff_table} t1 left join {self.user_table} t2
- on t1.third_party_user_id = t2.third_party_user_id
- limit %s offset %s;
- """
- staff_list = self.db.select(
- sql=sql,
- cursor_type=pymysql.cursors.DictCursor,
- args=(page_size + 1, page_size * (page_id - 1)),
- )
- if len(staff_list) > page_size:
- has_next_page = True
- next_page_id = page_id + 1
- staff_list = staff_list[:page_size]
- else:
- has_next_page = False
- next_page_id = None
- return {
- "has_next_page": has_next_page,
- "next_page": next_page_id,
- "data": staff_list,
- }
- def list_profile_fields(self) -> List[Dict]:
- response = self.config.get_json_value("field_map_list", [])
- return response
|