123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290 |
- import abc
- import pymysql.cursors
- from typing import Dict, List, Optional
- from pqai_agent.database import MySQLManager
- class SessionManager(abc.ABC):
- @abc.abstractmethod
- def get_staff_sessions(
- self,
- staff_id: str,
- page_id: int = 1,
- page_size: int = 10,
- session_type: str = "default",
- ) -> List[Dict]:
- pass
- @abc.abstractmethod
- def get_staff_sessions_summary(
- self,
- staff_id: str,
- page_id: int,
- page_size: int,
- status: int,
- ) -> Dict:
- pass
- @abc.abstractmethod
- def get_staff_session_list(
- self, staff_id: str, page_id: int, page_size: int
- ) -> Dict:
- pass
- @abc.abstractmethod
- def get_conversation_list(
- self, staff_id: str, user_id: str, page: Optional[int], page_size: int
- ) -> Dict:
- pass
- class MySQLSessionManager(SessionManager):
- def __init__(self, db_config, staff_table, user_table, agent_state_table, chat_history_table):
- self.db = MySQLManager(db_config)
- self.staff_table = staff_table
- self.user_table = user_table
- self.agent_state_table = agent_state_table
- self.chat_history_table = chat_history_table
- def get_staff_sessions(
- self,
- staff_id,
- page_id: int = 1,
- page_size: int = 10,
- session_type: str = "default",
- ) -> List[Dict]:
- """
- :param page_size:
- :param page_id:
- :param session_type:
- :param staff_id:
- :return:
- """
- match session_type:
- case "active":
- sql = f"""
- select staff_id, current_state, user_id
- from {self.agent_state_table}
- where staff_id = %s and update_timestamp >= DATE_SUB(NOW(), INTERVAL 2 HOUR)
- order by update_timestamp desc;
- """
- case "human_intervention":
- sql = f"""
- select staff_id, current_state, user_id
- from {self.agent_state_table}
- where staff_id = %s and current_state = 5 order by update_timestamp desc;
- """
- case _:
- sql = f"""
- select t1.staff_id, t1.current_state, t1.user_id, t2.name, t2.iconurl
- from {self.agent_state_table} t1 join {self.user_table} t2
- on t1.user_id = t2.third_party_user_id
- where t1.staff_id = %s
- order by
- IF(t1.current_state = 5, 0, 1),
- t1.update_timestamp desc
- limit {page_size + 1} offset {page_size * (page_id - 1)};
- """
- staff_sessions = self.db.select(
- sql, cursor_type=pymysql.cursors.DictCursor, args=(staff_id,)
- )
- return staff_sessions
- def get_staff_sessions_summary(
- self, staff_id, page_id: int, page_size: int, status: int
- ) -> Dict:
- """
- :param status: staff status(0: unemployed, 1: employed)
- :param staff_id: staff
- :param page_id: page id
- :param page_size: page size
- :return:
- :todo: 未使用 Mysql 连接池,每次查询均需要与 MySQL 建立连接,性能较低,需要优化
- """
- if not staff_id:
- get_staff_query = f"""
- select third_party_user_id, name from {self.staff_table} where status = %s
- limit %s offset %s;
- """
- staff_id_list = self.db.select(
- sql=get_staff_query,
- cursor_type=pymysql.cursors.DictCursor,
- args=(status, page_size + 1, (page_id - 1) * page_size),
- )
- if not staff_id_list:
- return {}
- if len(staff_id_list) > page_size:
- has_next_page = True
- next_page_id = page_id + 1
- staff_id_list = staff_id_list[:page_size]
- else:
- has_next_page = False
- next_page_id = None
- else:
- get_staff_query = f"""
- select third_party_user_id, name from {self.staff_table}
- where status = %s and third_party_user_id = %s;
- """
- staff_id_list = self.db.select(
- sql=get_staff_query,
- cursor_type=pymysql.cursors.DictCursor,
- args=(status, staff_id),
- )
- if not staff_id_list:
- return {}
- has_next_page = False
- next_page_id = None
- response_data = [
- {
- "staff_id": staff["third_party_user_id"],
- "staff_name": staff["name"],
- "active_sessions": len(
- self.get_staff_sessions(
- staff["third_party_user_id"], session_type="active"
- )
- ),
- "human_intervention_sessions": len(
- self.get_staff_sessions(
- staff["third_party_user_id"], session_type="human_intervention"
- )
- ),
- }
- for staff in staff_id_list
- ]
- return {
- "has_next_page": has_next_page,
- "next_page_id": next_page_id,
- "data": response_data,
- }
- def get_staff_session_list(self, staff_id, page_id: int, page_size: int) -> Dict:
- """
- :param page_size:
- :param page_id:
- :param staff_id:
- :return:
- """
- session_list = self.get_staff_sessions(staff_id, page_id, page_size)
- if len(session_list) > page_size:
- has_next_page = True
- next_page_id = page_id + 1
- session_list = session_list[:page_size]
- else:
- has_next_page = False
- next_page_id = None
- response_data = []
- for session in session_list:
- temp_obj = {}
- user_id = session["user_id"]
- room_id = ":".join(["private", staff_id, user_id])
- select_query = f"""
- select content, sendtime as max_timestamp, msg_type
- from {self.chat_history_table}
- where roomid = %s
- order by sendtime desc limit %s;
- """
- last_message = self.db.select(
- sql=select_query,
- cursor_type=pymysql.cursors.DictCursor,
- args=(room_id, 1),
- )
- if not last_message:
- temp_obj["message"] = None
- temp_obj["timestamp"] = 0
- temp_obj["msg_type"] = None
- else:
- temp_obj["message"] = last_message[0]["content"]
- temp_obj["timestamp"] = last_message[0]["max_timestamp"]
- temp_obj["msg_type"] = last_message[0]["msg_type"]
- temp_obj["user_id"] = user_id
- temp_obj["user_name"] = session["name"]
- temp_obj["avatar"] = session["iconurl"]
- temp_obj["current_state"] = session["current_state"]
- response_data.append(temp_obj)
- return {
- "staff_id": staff_id,
- "has_next_page": has_next_page,
- "next_page_id": next_page_id,
- "data": response_data,
- }
- def get_conversation_list(
- self, staff_id: str, user_id: str, page: Optional[int], page_size: int
- ):
- """
- :param page_size:
- :param staff_id:
- :param user_id:
- :param page: timestamp
- :return:
- """
- room_id = ":".join(["private", staff_id, user_id])
- if not page:
- fetch_query = f"""
- select t1.sender, t2.name, t1.sendtime, t1.content, t2.iconurl, t1.msg_type
- from {self.chat_history_table} t1
- join {self.user_table} t2 on t1.sender = t2.third_party_user_id
- where roomid = %s
- order by sendtime desc
- limit %s;
- """
- messages = self.db.select(
- sql=fetch_query,
- cursor_type=pymysql.cursors.DictCursor,
- args=(room_id, page_size + 1),
- )
- else:
- fetch_query = f"""
- select t1.sender, t2.name, t1.sendtime, t1.content, t2.iconurl, t1.msg_type
- from {self.chat_history_table} t1
- join {self.user_table} t2 on t1.sender = t2.third_party_user_id
- where t1.roomid = %s and t1.sendtime <= %s
- order by sendtime desc
- limit %s;
- """
- messages = self.db.select(
- sql=fetch_query,
- cursor_type=pymysql.cursors.DictCursor,
- args=(room_id, page, page_size + 1),
- )
- if messages:
- if len(messages) > page_size:
- has_next_page = True
- next_page = messages[-1]["sendtime"]
- else:
- has_next_page = False
- next_page = None
- response_data = [
- {
- "sender_id": message["sender"],
- "sender_name": message["name"],
- "avatar": message["iconurl"],
- "content": message["content"],
- "timestamp": message["sendtime"],
- "msg_type": message["msg_type"],
- "role": "user" if message["sender"] == user_id else "staff",
- }
- for message in messages
- ]
- return {
- "staff_id": staff_id,
- "user_id": user_id,
- "has_next_page": has_next_page,
- "next_page": next_page,
- "data": response_data,
- }
- else:
- has_next_page = False
- next_page = None
- return {
- "staff_id": staff_id,
- "user_id": user_id,
- "has_next_page": has_next_page,
- "next_page": next_page,
- "data": [],
- }
|