mysql_staff_manager.py 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. import abc
  2. import json
  3. import pymysql.cursors
  4. from typing import Dict, List
  5. from pqai_agent.database import MySQLManager
  6. class StaffManager(abc.ABC):
  7. @abc.abstractmethod
  8. def list_all_staffs(self, page_id: int, page_size: int) -> List[Dict]:
  9. pass
  10. @abc.abstractmethod
  11. def get_staff_profile(self, staff_id) -> Dict:
  12. pass
  13. @abc.abstractmethod
  14. def save_staff_profile(self, staff_id: str, staff_profile: Dict):
  15. pass
  16. class MySQLStaffManager(StaffManager):
  17. def __init__(self, db_config, staff_table, user_table):
  18. self.db = MySQLManager(db_config)
  19. self.staff_table = staff_table
  20. self.user_table = user_table
  21. def save_staff_profile(self, staff_id: str, staff_profile: Dict):
  22. update_query = f"""
  23. update {self.staff_table} set agent_profile = %s where third_party_user_id = %s;
  24. """
  25. affected_rows = self.db.execute(
  26. update_query, (json.dumps(staff_profile), staff_id)
  27. )
  28. return affected_rows
  29. def get_staff_profile(self, staff_id) -> Dict:
  30. empty_profile = {}
  31. sql = f"""select agent_profile from {self.staff_table} where third_party_user_id = %s;"""
  32. response = self.db.select(
  33. sql=sql,
  34. cursor_type=pymysql.cursors.DictCursor,
  35. args=(staff_id,),
  36. )
  37. if not response:
  38. return empty_profile
  39. agent_profile = response[0]["agent_profile"]
  40. profile = json.loads(agent_profile)
  41. if not profile:
  42. return empty_profile
  43. return profile
  44. def list_all_staffs(self, page_id: int, page_size: int) -> Dict:
  45. """
  46. :param page_size:
  47. :param page_id:
  48. :return:
  49. """
  50. sql = f"""
  51. select t1.third_party_user_id as staff_id, t1.name as staff_name, t2.iconurl as avatar
  52. from {self.staff_table} t1 left join {self.user_table} t2
  53. on t1.third_party_user_id = t2.third_party_user_id
  54. limit %s offset %s;
  55. """
  56. staff_list = self.db.select(
  57. sql=sql,
  58. cursor_type=pymysql.cursors.DictCursor,
  59. args=(page_size + 1, page_size * (page_id - 1)),
  60. )
  61. if len(staff_list) > page_size:
  62. has_next_page = True
  63. next_page_id = page_id + 1
  64. staff_list = staff_list[:page_size]
  65. else:
  66. has_next_page = False
  67. next_page_id = None
  68. return {
  69. "has_next_page": has_next_page,
  70. "next_page": next_page_id,
  71. "data": staff_list,
  72. }