luojunhui 1 month ago
parent
commit
25086dc26f

+ 2 - 0
pqai_agent/configs/dev.yaml

@@ -32,6 +32,8 @@ storage:
       password: wqsd@2025
       database: ai_agent
     table: agent_state
+  chat_history:
+    table: qywx_chat_history
 
 agent_behavior:
   message_aggregation_sec: 3

+ 2 - 0
pqai_agent/configs/prod.yaml

@@ -32,6 +32,8 @@ storage:
       password: wqsd@2025
       database: ai_agent
     table: agent_state
+  chat_history:
+    table: qywx_chat_history
 
 chat_api:
   coze:

+ 0 - 211
pqai_agent/user_manager.py

@@ -264,144 +264,6 @@ class MySQLUserManager(UserManager):
         data = self.db.select(sql, pymysql.cursors.DictCursor)
         return data
 
-    def get_staff_sessions(self, staff_id, page_id: int = 1, page_size: int = 10, session_type: str = 'default') -> List[Dict]:
-        """
-        :param page_size:
-        :param page_id:
-        :param session_type:
-        :param staff_id:
-        :return:
-        """
-        match session_type:
-            case 'active':
-                sql = f"""
-                    select staff_id, current_state, user_id
-                    from agent_state
-                    where staff_id = %s and update_timestamp >= DATE_SUB(NOW(), INTERVAL 2 HOUR)
-                    order by update_timestamp desc;
-                """
-            case 'human_intervention':
-                sql = f"""
-                    select staff_id, current_state, user_id
-                    from agent_state
-                    where staff_id = %s and current_state = 5 order by update_timestamp desc;
-                """
-            case _:
-                sql = f"""
-                    select t1.staff_id, t1.current_state, t1.user_id, t2.name, t2.iconurl
-                    from agent_state t1 join third_party_user t2 
-                        on t1.user_id = t2.third_party_user_id
-                    where t1.staff_id = %s
-                    order by 
-                        IF(t1.current_state = 5, 0, 1),
-                        t1.update_timestamp desc
-                    limit {page_size + 1} offset {page_size * (page_id - 1)};
-                """
-
-        staff_sessions = self.db.select(sql, cursor_type=pymysql.cursors.DictCursor, args=(staff_id, ))
-        return staff_sessions
-
-    def get_staff_sessions_summary_v1(self, staff_id, page_id: int, page_size: int, status: int) -> Dict:
-        """
-        :param status: staff status(0: unemployed, 1: employed)
-        :param staff_id: staff
-        :param page_id: page id
-        :param page_size: page size
-        :return:
-        :todo: 未使用 Mysql 连接池,每次查询均需要与 MySQL 建立连接,性能较低,需要优化
-        """
-        if not staff_id:
-            get_staff_query = f"""
-                select third_party_user_id, name from {self.staff_table} where status = %s
-                limit %s offset %s;
-            """
-            staff_id_list = self.db.select(
-                sql=get_staff_query,
-                cursor_type=pymysql.cursors.DictCursor,
-                args=(status, page_size + 1, (page_id - 1) * page_size)
-            )
-            if not staff_id_list:
-                return {}
-
-            if len(staff_id_list) > page_size:
-                has_next_page = True
-                next_page_id = page_id + 1
-                staff_id_list = staff_id_list[:page_size]
-            else:
-                has_next_page = False
-                next_page_id = None
-        else:
-            get_staff_query = f"""
-                select third_party_user_id, name from {self.staff_table} 
-                where status = %s and third_party_user_id = %s;
-            """
-            staff_id_list = self.db.select(
-                sql=get_staff_query,
-                cursor_type=pymysql.cursors.DictCursor,
-                args=(status, staff_id)
-            )
-            if not staff_id_list:
-                return {}
-            has_next_page = False
-            next_page_id = None
-        response_data = [
-            {
-                'staff_id': staff['third_party_user_id'],
-                'staff_name': staff['name'],
-                'active_sessions': len(self.get_staff_sessions(staff['third_party_user_id'], session_type='active')),
-                'human_intervention_sessions': len(self.get_staff_sessions(staff['third_party_user_id'], session_type='human_intervention'))
-            }
-            for staff in staff_id_list
-        ]
-        return {
-            'has_next_page': has_next_page,
-            'next_page_id': next_page_id,
-            'data': response_data
-        }
-
-    def get_staff_session_list_v1(self, staff_id, page_id: int, page_size: int) -> Dict:
-        """
-        :param page_size:
-        :param page_id:
-        :param staff_id:
-        :return:
-        """
-        session_list = self.get_staff_sessions(staff_id, page_id, page_size)
-        if len(session_list) > page_size:
-            has_next_page = True
-            next_page_id = page_id + 1
-            session_list = session_list[:page_size]
-        else:
-            has_next_page = False
-            next_page_id = None
-        response_data = []
-        for session in session_list:
-            temp_obj = {}
-            user_id = session['user_id']
-            room_id = ':'.join(['private', staff_id, user_id])
-            select_query = f"""select content, max(sendtime) as max_timestamp from qywx_chat_history where roomid = %s;"""
-            last_message = self.db.select(
-                sql=select_query,
-                cursor_type=pymysql.cursors.DictCursor,
-                args=(room_id,)
-            )
-            if not last_message:
-                temp_obj['message'] = ''
-                temp_obj['timestamp'] = 0
-            else:
-                temp_obj['message'] = last_message[0]['content']
-                temp_obj['timestamp'] = last_message[0]['max_timestamp']
-            temp_obj['customer_id'] = user_id
-            temp_obj['customer_name'] = session['name']
-            temp_obj['avatar'] = session['iconurl']
-            response_data.append(temp_obj)
-        return {
-            "staff_id": staff_id,
-            "has_next_page": has_next_page,
-            "next_page_id": next_page_id,
-            "data": response_data
-        }
-
     def get_staff_list(self, page_id: int, page_size: int) -> Dict:
         """
         :param page_size:
@@ -432,79 +294,6 @@ class MySQLUserManager(UserManager):
             "data": staff_list
         }
 
-    def get_conversation_list_v1(self, staff_id: str, customer_id: str, page: Optional[int]):
-        """
-        :param staff_id:
-        :param customer_id:
-        :param page: timestamp
-        :return:
-        """
-        room_id = ':'.join(['private', staff_id, customer_id])
-        page_size = 20
-        if not page:
-            fetch_query = f"""
-                select t1.sender, t2.name, t1.sendtime, t1.content, t2.iconurl
-                from qywx_chat_history t1
-                join third_party_user t2 on t1.sender = t2.third_party_user_id
-                where roomid = %s
-                order by sendtime desc
-                limit %s;
-            """
-            messages = self.db.select(
-                sql=fetch_query,
-                cursor_type=pymysql.cursors.DictCursor,
-                args=(room_id, page_size + 1)
-            )
-        else:
-            fetch_query = f"""
-                select t1.sender, t2.name, t1.sendtime, t1.content, t2.iconurl
-                from qywx_chat_history t1
-                join third_party_user t2 on t1.sender = t2.third_party_user_id
-                where t1.roomid = %s and t1.sendtime <= %s
-                order by sendtime desc
-                limit %s;
-            """
-            messages = self.db.select(
-                sql=fetch_query,
-                cursor_type=pymysql.cursors.DictCursor,
-                args=(room_id, page, page_size + 1)
-            )
-        if messages:
-            if len(messages) > page_size:
-                has_next_page = True
-                next_page = messages[-1]['sendtime']
-            else:
-                has_next_page = False
-                next_page = None
-            response_data = [
-                {
-                    "sender_id": message['sender'],
-                    "sender_name": message['name'],
-                    "avatar": message['iconurl'],
-                    "content": message['content'],
-                    "timestamp": message['sendtime'],
-                    "role": "customer" if message['sender'] == customer_id else "staff"
-                }
-                for message in messages
-            ]
-            return {
-                "staff_id": staff_id,
-                "customer_id": customer_id,
-                "has_next_page": has_next_page,
-                "next_page": next_page,
-                "data": response_data
-            }
-        else:
-            has_next_page = False
-            next_page = None
-            return {
-                "staff_id": staff_id,
-                "customer_id": customer_id,
-                "has_next_page": has_next_page,
-                "next_page": next_page,
-                "data": []
-            }
-
 
 class LocalUserRelationManager(UserRelationManager):
     def __init__(self):

+ 46 - 12
pqai_agent_server/api_server.py

@@ -12,7 +12,9 @@ from pqai_agent import configs
 from pqai_agent import logging_service, chat_service, prompt_templates
 from pqai_agent.history_dialogue_service import HistoryDialogueService
 from pqai_agent.user_manager import MySQLUserManager, MySQLUserRelationManager
-from pqai_agent_server.utils import wrap_response
+from pqai_agent_server.const import AgentApiConst
+from pqai_agent_server.models import MySQLSessionManager
+from pqai_agent_server.utils import wrap_response, quit_human_intervention_status
 from pqai_agent_server.utils import (
     run_extractor_prompt,
     run_chat_prompt,
@@ -21,7 +23,7 @@ from pqai_agent_server.utils import (
 
 app = Flask('agent_api_server')
 logger = logging_service.logger
-
+const = AgentApiConst()
 
 @app.route('/api/listStaffs', methods=['GET'])
 def list_staffs():
@@ -178,9 +180,9 @@ def health_check():
 @app.route("/api/getStaffSessionSummary", methods=["GET"])
 def get_staff_session_summary():
     staff_id = request.args.get("staff_id")
-    status = request.args.get("status", 1)
-    page_id = request.args.get("page_id", 1)
-    page_size = request.args.get("page_size", 10)
+    status = request.args.get("status", const.DEFAULT_STAFF_STATUS)
+    page_id = request.args.get("page_id", const.DEFAULT_PAGE_ID)
+    page_size = request.args.get("page_size", const.DEFAULT_PAGE_SIZE)
 
     # check params
     try:
@@ -190,7 +192,7 @@ def get_staff_session_summary():
     except Exception as e:
         return wrap_response(404, msg="Invalid parameter: {}".format(e))
 
-    staff_session_summary = app.user_manager.get_staff_sessions_summary_v1(
+    staff_session_summary = app.session_manager.get_staff_sessions_summary(
         staff_id, page_id, page_size, status
     )
 
@@ -206,9 +208,9 @@ def get_staff_session_list():
     if not staff_id:
         return wrap_response(404, msg="staff_id is required")
 
-    page_size = request.args.get("page_size", 10)
-    page_id = request.args.get("page_id", 1)
-    staff_session_list = app.user_manager.get_staff_session_list_v1(staff_id, page_id, page_size)
+    page_size = request.args.get("page_size", const.DEFAULT_PAGE_SIZE)
+    page_id = request.args.get("page_id", const.DEFAULT_PAGE_ID)
+    staff_session_list = app.session_manager.get_staff_session_list(staff_id, page_id, page_size)
     if not staff_session_list:
         return wrap_response(404, msg="staff not found")
 
@@ -217,8 +219,8 @@ def get_staff_session_list():
 
 @app.route("/api/getStaffList", methods=["GET"])
 def get_staff_list():
-    page_size = request.args.get("page_size", 10)
-    page_id = request.args.get("page_id", 1)
+    page_size = request.args.get("page_size", const.DEFAULT_PAGE_SIZE)
+    page_id = request.args.get("page_id", const.DEFAULT_PAGE_ID)
     staff_list = app.user_manager.get_staff_list(page_id, page_size)
     if not staff_list:
         return wrap_response(404, msg="staff not found")
@@ -237,7 +239,7 @@ def get_conversation_list():
         return wrap_response(404, msg="staff_id and customer_id are required")
 
     page = request.args.get("page")
-    response = app.user_manager.get_conversation_list_v1(staff_id, customer_id, page)
+    response = app.session_manager.get_conversation_list(staff_id, customer_id, page, const.DEFAULT_CONVERSATION_SIZE)
     return wrap_response(200, data=response)
 
 
@@ -246,6 +248,23 @@ def send_message():
     return wrap_response(200, msg="暂不实现功能")
 
 
+@app.route("/api/quitHumanInterventionStatus", methods=["GET"])
+def quit_human_interventions_status():
+    """
+    退出人工介入状态
+    :return:
+    """
+    staff_id = request.args.get("staff_id")
+    customer_id = request.args.get("customer_id")
+    # 测试环境: staff_id 强制等于1688854492669990
+    staff_id = 1688854492669990
+    if not customer_id or not staff_id:
+        return wrap_response(404, msg="user_id and staff_id are required")
+    response = quit_human_intervention_status(customer_id, staff_id)
+
+    return wrap_response(200, data=response)
+
+
 @app.errorhandler(werkzeug.exceptions.BadRequest)
 def handle_bad_request(e):
     logger.error(e)
@@ -264,11 +283,26 @@ if __name__ == '__main__':
     logging_level = logging.getLevelName(args.log_level)
     logging_service.setup_root_logger(level=logging_level, logfile_name='agent_api_server.log')
 
+    # set config
     user_db_config = config['storage']['user']
     staff_db_config = config['storage']['staff']
+    agent_state_db_config = config['storage']['agent_state']
+    chat_history_db_config = config['storage']['chat_history']
+
+    # init user manager
     user_manager = MySQLUserManager(user_db_config['mysql'], user_db_config['table'], staff_db_config['table'])
     app.user_manager = user_manager
 
+    # init session manager
+    session_manager = MySQLSessionManager(
+        db_config=user_db_config['mysql'],
+        staff_table=staff_db_config['table'],
+        user_table=user_db_config['table'],
+        agent_state_table=agent_state_db_config['table'],
+        chat_history_table=chat_history_db_config['table']
+    )
+    app.session_manager = session_manager
+
     wecom_db_config = config['storage']['user_relation']
     user_relation_manager = MySQLUserRelationManager(
         user_db_config['mysql'], wecom_db_config['mysql'],

+ 1 - 0
pqai_agent_server/models/__init__.py

@@ -0,0 +1 @@
+from .mysql_session_manager import MySQLSessionManager

+ 281 - 0
pqai_agent_server/models/mysql_session_manager.py

@@ -0,0 +1,281 @@
+import abc
+import pymysql.cursors
+
+from typing import Dict, List, Optional
+
+from pqai_agent.database import MySQLManager
+
+
+class SessionManager(abc.ABC):
+    @abc.abstractmethod
+    def get_staff_sessions(
+        self,
+        staff_id: str,
+        page_id: int = 1,
+        page_size: int = 10,
+        session_type: str = "default",
+    ) -> List[Dict]:
+        pass
+
+    @abc.abstractmethod
+    def get_staff_sessions_summary(
+        self,
+        staff_id: str,
+        page_id: int,
+        page_size: int,
+        status: int,
+    ) -> Dict:
+        pass
+
+    @abc.abstractmethod
+    def get_staff_session_list(
+        self, staff_id: str, page_id: int, page_size: int
+    ) -> Dict:
+        pass
+
+    @abc.abstractmethod
+    def get_conversation_list(
+        self, staff_id: str, customer_id: str, page: Optional[int], page_size: int
+    ) -> Dict:
+        pass
+
+
+class MySQLSessionManager(SessionManager):
+
+    def __init__(self, db_config, staff_table, user_table, agent_state_table, chat_history_table):
+        self.db = MySQLManager(db_config)
+        self.staff_table = staff_table
+        self.user_table = user_table
+        self.agent_state_table = agent_state_table
+        self.chat_history_table = chat_history_table
+
+    def get_staff_sessions(
+        self,
+        staff_id,
+        page_id: int = 1,
+        page_size: int = 10,
+        session_type: str = "default",
+    ) -> List[Dict]:
+        """
+        :param page_size:
+        :param page_id:
+        :param session_type:
+        :param staff_id:
+        :return:
+        """
+        match session_type:
+            case "active":
+                sql = f"""
+                    select staff_id, current_state, user_id
+                    from {self.agent_state_table}
+                    where staff_id = %s and update_timestamp >= DATE_SUB(NOW(), INTERVAL 2 HOUR)
+                    order by update_timestamp desc;
+                """
+            case "human_intervention":
+                sql = f"""
+                    select staff_id, current_state, user_id
+                    from {self.agent_state_table}
+                    where staff_id = %s and current_state = 5 order by update_timestamp desc;
+                """
+            case _:
+                sql = f"""
+                    select t1.staff_id, t1.current_state, t1.user_id, t2.name, t2.iconurl
+                    from {self.agent_state_table} t1 join {self.user_table} t2 
+                        on t1.user_id = t2.third_party_user_id
+                    where t1.staff_id = %s
+                    order by 
+                        IF(t1.current_state = 5, 0, 1),
+                        t1.update_timestamp desc
+                    limit {page_size + 1} offset {page_size * (page_id - 1)};
+                """
+
+        staff_sessions = self.db.select(
+            sql, cursor_type=pymysql.cursors.DictCursor, args=(staff_id,)
+        )
+        return staff_sessions
+
+    def get_staff_sessions_summary(
+        self, staff_id, page_id: int, page_size: int, status: int
+    ) -> Dict:
+        """
+        :param status: staff status(0: unemployed, 1: employed)
+        :param staff_id: staff
+        :param page_id: page id
+        :param page_size: page size
+        :return:
+        :todo: 未使用 Mysql 连接池,每次查询均需要与 MySQL 建立连接,性能较低,需要优化
+        """
+        if not staff_id:
+            get_staff_query = f"""
+                select third_party_user_id, name from {self.staff_table} where status = %s
+                limit %s offset %s;
+            """
+            staff_id_list = self.db.select(
+                sql=get_staff_query,
+                cursor_type=pymysql.cursors.DictCursor,
+                args=(status, page_size + 1, (page_id - 1) * page_size),
+            )
+            if not staff_id_list:
+                return {}
+
+            if len(staff_id_list) > page_size:
+                has_next_page = True
+                next_page_id = page_id + 1
+                staff_id_list = staff_id_list[:page_size]
+            else:
+                has_next_page = False
+                next_page_id = None
+        else:
+            get_staff_query = f"""
+                select third_party_user_id, name from {self.staff_table} 
+                where status = %s and third_party_user_id = %s;
+            """
+            staff_id_list = self.db.select(
+                sql=get_staff_query,
+                cursor_type=pymysql.cursors.DictCursor,
+                args=(status, staff_id),
+            )
+            if not staff_id_list:
+                return {}
+            has_next_page = False
+            next_page_id = None
+        response_data = [
+            {
+                "staff_id": staff["third_party_user_id"],
+                "staff_name": staff["name"],
+                "active_sessions": len(
+                    self.get_staff_sessions(
+                        staff["third_party_user_id"], session_type="active"
+                    )
+                ),
+                "human_intervention_sessions": len(
+                    self.get_staff_sessions(
+                        staff["third_party_user_id"], session_type="human_intervention"
+                    )
+                ),
+            }
+            for staff in staff_id_list
+        ]
+        return {
+            "has_next_page": has_next_page,
+            "next_page_id": next_page_id,
+            "data": response_data,
+        }
+
+    def get_staff_session_list(self, staff_id, page_id: int, page_size: int) -> Dict:
+        """
+        :param page_size:
+        :param page_id:
+        :param staff_id:
+        :return:
+        """
+        session_list = self.get_staff_sessions(staff_id, page_id, page_size)
+        if len(session_list) > page_size:
+            has_next_page = True
+            next_page_id = page_id + 1
+            session_list = session_list[:page_size]
+        else:
+            has_next_page = False
+            next_page_id = None
+        response_data = []
+        for session in session_list:
+            temp_obj = {}
+            user_id = session["user_id"]
+            room_id = ":".join(["private", staff_id, user_id])
+            select_query = f"""select content, max(sendtime) as max_timestamp from {self.chat_history_table} where roomid = %s;"""
+            last_message = self.db.select(
+                sql=select_query,
+                cursor_type=pymysql.cursors.DictCursor,
+                args=(room_id,),
+            )
+            if not last_message:
+                temp_obj["message"] = ""
+                temp_obj["timestamp"] = 0
+            else:
+                temp_obj["message"] = last_message[0]["content"]
+                temp_obj["timestamp"] = last_message[0]["max_timestamp"]
+            temp_obj["customer_id"] = user_id
+            temp_obj["customer_name"] = session["name"]
+            temp_obj["avatar"] = session["iconurl"]
+            response_data.append(temp_obj)
+        return {
+            "staff_id": staff_id,
+            "has_next_page": has_next_page,
+            "next_page_id": next_page_id,
+            "data": response_data,
+        }
+
+    def get_conversation_list(
+        self, staff_id: str, customer_id: str, page: Optional[int], page_size: int
+    ):
+        """
+        :param page_size:
+        :param staff_id:
+        :param customer_id:
+        :param page: timestamp
+        :return:
+        """
+        room_id = ":".join(["private", staff_id, customer_id])
+        if not page:
+            fetch_query = f"""
+                select t1.sender, t2.name, t1.sendtime, t1.content, t2.iconurl
+                from {self.chat_history_table} t1
+                join {self.user_table} t2 on t1.sender = t2.third_party_user_id
+                where roomid = %s
+                order by sendtime desc
+                limit %s;
+            """
+            messages = self.db.select(
+                sql=fetch_query,
+                cursor_type=pymysql.cursors.DictCursor,
+                args=(room_id, page_size + 1),
+            )
+        else:
+            fetch_query = f"""
+                select t1.sender, t2.name, t1.sendtime, t1.content, t2.iconurl
+                from {self.chat_history_table} t1
+                join {self.user_table} t2 on t1.sender = t2.third_party_user_id
+                where t1.roomid = %s and t1.sendtime <= %s
+                order by sendtime desc
+                limit %s;
+            """
+            messages = self.db.select(
+                sql=fetch_query,
+                cursor_type=pymysql.cursors.DictCursor,
+                args=(room_id, page, page_size + 1),
+            )
+        if messages:
+            if len(messages) > page_size:
+                has_next_page = True
+                next_page = messages[-1]["sendtime"]
+            else:
+                has_next_page = False
+                next_page = None
+            response_data = [
+                {
+                    "sender_id": message["sender"],
+                    "sender_name": message["name"],
+                    "avatar": message["iconurl"],
+                    "content": message["content"],
+                    "timestamp": message["sendtime"],
+                    "role": "customer" if message["sender"] == customer_id else "staff",
+                }
+                for message in messages
+            ]
+            return {
+                "staff_id": staff_id,
+                "customer_id": customer_id,
+                "has_next_page": has_next_page,
+                "next_page": next_page,
+                "data": response_data,
+            }
+        else:
+            has_next_page = False
+            next_page = None
+            return {
+                "staff_id": staff_id,
+                "customer_id": customer_id,
+                "has_next_page": has_next_page,
+                "next_page": next_page,
+                "data": [],
+            }