mysql_staff_manager.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. import abc
  2. import json
  3. import pymysql.cursors
  4. from typing import Dict, List
  5. from pqai_agent.database import MySQLManager
  6. class StaffManager(abc.ABC):
  7. @abc.abstractmethod
  8. def list_all_staffs(self, page_id: int, page_size: int) -> List[Dict]:
  9. pass
  10. @abc.abstractmethod
  11. def get_staff_profile(self, staff_id) -> Dict:
  12. pass
  13. @abc.abstractmethod
  14. def save_staff_profile(self, staff_id: str, staff_profile: Dict):
  15. pass
  16. class MySQLStaffManager(StaffManager):
  17. def __init__(self, db_config, staff_table, user_table, config):
  18. self.db = MySQLManager(db_config)
  19. self.staff_table = staff_table
  20. self.user_table = user_table
  21. self.config = config
  22. def save_staff_profile(self, staff_id: str, staff_profile: Dict):
  23. update_query = f"""
  24. update {self.staff_table} set agent_profile = %s where third_party_user_id = %s;
  25. """
  26. affected_rows = self.db.execute(
  27. update_query, (json.dumps(staff_profile), staff_id)
  28. )
  29. return affected_rows
  30. def get_staff_profile(self, staff_id) -> Dict:
  31. profile_obj = {"staff_id": staff_id}
  32. sql = f"""select agent_profile from {self.staff_table} where third_party_user_id = %s;"""
  33. response = self.db.select(
  34. sql=sql,
  35. cursor_type=pymysql.cursors.DictCursor,
  36. args=(staff_id,),
  37. )
  38. if not response:
  39. profile_obj["data"] = []
  40. return profile_obj
  41. agent_profile = response[0]["agent_profile"]
  42. if not agent_profile:
  43. profile_obj["data"] = []
  44. else:
  45. field_map_list = self.list_profile_fields()
  46. field_map = {
  47. item["field_name"]: item["display_name"] for item in field_map_list
  48. }
  49. agent_profile = json.loads(agent_profile)
  50. profile_obj["data"] = [
  51. {
  52. "field_name": key,
  53. "display_name": field_map[key],
  54. "field_value": value,
  55. }
  56. for key, value in agent_profile.items()
  57. if agent_profile.get(key)
  58. ]
  59. return profile_obj
  60. def list_all_staffs(self, page_id: int, page_size: int) -> Dict:
  61. """
  62. :param page_size:
  63. :param page_id:
  64. :return:
  65. """
  66. sql = f"""
  67. select t1.third_party_user_id as staff_id, t1.name as staff_name, t2.iconurl as avatar
  68. from {self.staff_table} t1 left join {self.user_table} t2
  69. on t1.third_party_user_id = t2.third_party_user_id
  70. limit %s offset %s;
  71. """
  72. staff_list = self.db.select(
  73. sql=sql,
  74. cursor_type=pymysql.cursors.DictCursor,
  75. args=(page_size + 1, page_size * (page_id - 1)),
  76. )
  77. if len(staff_list) > page_size:
  78. has_next_page = True
  79. next_page_id = page_id + 1
  80. staff_list = staff_list[:page_size]
  81. else:
  82. has_next_page = False
  83. next_page_id = None
  84. return {
  85. "has_next_page": has_next_page,
  86. "next_page": next_page_id,
  87. "data": staff_list,
  88. }
  89. def list_profile_fields(self) -> List[Dict]:
  90. response = self.config.get_json_value("field_map_list", [])
  91. return response