瀏覽代碼

develop-0520

luojunhui 1 月之前
父節點
當前提交
7b327448bd
共有 3 個文件被更改,包括 300 次插入7 次删除
  1. 242 0
      pqai_agent/user_manager.py
  2. 58 6
      pqai_agent_server/api_server.py
  3. 0 1
      requirements.txt

+ 242 - 0
pqai_agent/user_manager.py

@@ -264,6 +264,248 @@ class MySQLUserManager(UserManager):
         data = self.db.select(sql, pymysql.cursors.DictCursor)
         return data
 
+    def get_staff_sessions(self, staff_id, page_id: int = 1, page_size: int = 10, session_type: str = 'default') -> List[Dict]:
+        """
+        :param page_size:
+        :param page_id:
+        :param session_type:
+        :param staff_id:
+        :return:
+        """
+        match session_type:
+            case 'active':
+                sql = f"""
+                    select staff_id, current_state, user_id
+                    from agent_state
+                    where staff_id = %s and update_timestamp >= DATE_SUB(NOW(), INTERVAL 2 HOUR)
+                    order by update_timestamp desc;
+                """
+            case 'human_intervention':
+                sql = f"""
+                    select staff_id, current_state, user_id
+                    from agent_state
+                    where staff_id = %s and current_state = 5 order by update_timestamp desc;
+                """
+            case _:
+                sql = f"""
+                    select t1.staff_id, t1.current_state, t1.user_id, t2.name, t2.iconurl
+                    from agent_state t1 join third_party_user t2 
+                        on t1.user_id = t2.third_party_user_id
+                    where t1.staff_id = %s
+                    order by 
+                        IF(t1.current_state = 5, 0, 1),
+                        t1.update_timestamp desc
+                    limit {page_size + 1} offset {page_size * (page_id - 1)};
+                """
+
+        staff_sessions = self.db.select(sql, cursor_type=pymysql.cursors.DictCursor, args=(staff_id, ))
+        return staff_sessions
+
+    def get_staff_sessions_summary_v1(self, staff_id, page_id: int, page_size: int, status: int) -> Dict:
+        """
+        :param status: staff status(0: unemployed, 1: employed)
+        :param staff_id: staff
+        :param page_id: page id
+        :param page_size: page size
+        :return:
+        :todo: 未使用 Mysql 连接池,每次查询均需要与 MySQL 建立连接,性能较低,需要优化
+        """
+        if not staff_id:
+            get_staff_query = f"""
+                select third_party_user_id, name from {self.staff_table} where status = %s
+                limit %s offset %s;
+            """
+            staff_id_list = self.db.select(
+                sql=get_staff_query,
+                cursor_type=pymysql.cursors.DictCursor,
+                args=(status, page_size + 1, (page_id - 1) * page_size)
+            )
+            if not staff_id_list:
+                return {}
+
+            if len(staff_id_list) > page_size:
+                has_next_page = True
+                next_page_id = page_id + 1
+                staff_id_list = staff_id_list[:page_size]
+            else:
+                has_next_page = False
+                next_page_id = None
+        else:
+            get_staff_query = f"""
+                select third_party_user_id, name from {self.staff_table} 
+                where status = %s and third_party_user_id = %s;
+            """
+            staff_id_list = self.db.select(
+                sql=get_staff_query,
+                cursor_type=pymysql.cursors.DictCursor,
+                args=(status, staff_id)
+            )
+            if not staff_id_list:
+                return {}
+            has_next_page = False
+            next_page_id = None
+        response_data = [
+            {
+                'staff_id': staff['third_party_user_id'],
+                'staff_name': staff['name'],
+                'active_sessions': len(self.get_staff_sessions(staff['third_party_user_id'], session_type='active')),
+                'human_intervention_sessions': len(self.get_staff_sessions(staff['third_party_user_id'], session_type='human_intervention'))
+            }
+            for staff in staff_id_list
+        ]
+        return {
+            'has_next_page': has_next_page,
+            'next_page_id': next_page_id,
+            'data': response_data
+        }
+
+    def get_staff_session_list_v1(self, staff_id, page_id: int, page_size: int) -> Dict:
+        """
+        :param page_size:
+        :param page_id:
+        :param staff_id:
+        :return:
+        """
+        session_list = self.get_staff_sessions(staff_id, page_id, page_size)
+        if len(session_list) > page_size:
+            has_next_page = True
+            next_page_id = page_id + 1
+            session_list = session_list[:page_size]
+        else:
+            has_next_page = False
+            next_page_id = None
+        response_data = []
+        for session in session_list:
+            temp_obj = {}
+            user_id = session['user_id']
+            room_id = ':'.join(['private', staff_id, user_id])
+            select_query = f"""select content, max(sendtime) as max_timestamp from qywx_chat_history where roomid = %s;"""
+            last_message = self.db.select(
+                sql=select_query,
+                cursor_type=pymysql.cursors.DictCursor,
+                args=(room_id,)
+            )
+            if not last_message:
+                temp_obj['message'] = ''
+                temp_obj['timestamp'] = 0
+            else:
+                temp_obj['message'] = last_message[0]['content']
+                temp_obj['timestamp'] = last_message[0]['max_timestamp']
+            temp_obj['customer_id'] = user_id
+            temp_obj['customer_name'] = session['name']
+            temp_obj['avatar'] = session['iconurl']
+            response_data.append(temp_obj)
+        return {
+            "staff_id": staff_id,
+            "has_next_page": has_next_page,
+            "next_page_id": next_page_id,
+            "data": response_data
+        }
+
+    def get_staff_list(self, page_id: int, page_size: int) -> Dict:
+        """
+        :param page_size:
+        :param page_id:
+        :return:
+        """
+        sql = f"""
+            select t1.third_party_user_id as staff_id, t1.name as staff_name, t2.iconurl as avatar
+            from qywx_employee t1 left join third_party_user t2
+            on t1.third_party_user_id = t2.third_party_user_id
+            limit %s offset %s;
+        """
+        staff_list = self.db.select(
+            sql=sql,
+            cursor_type=pymysql.cursors.DictCursor,
+            args=(page_size + 1, page_size * (page_id - 1))
+        )
+        if len(staff_list) > page_size:
+            has_next_page = True
+            next_page_id = page_id + 1
+            staff_list = staff_list[:page_size]
+        else:
+            has_next_page = False
+            next_page_id = None
+        return {
+            "has_next_page": has_next_page,
+            "next_page": next_page_id,
+            "data": staff_list
+        }
+
+    def get_conversation_list_v1(self, staff_id: str, customer_id: str, page: Optional[int]):
+        """
+        :param staff_id:
+        :param customer_id:
+        :param page: timestamp
+        :return:
+        """
+        room_id = ':'.join(['private', staff_id, customer_id])
+        page_size = 20
+        if not page:
+            fetch_query = f"""
+                select t1.sender, t2.name, t1.sendtime, t1.content, t2.iconurl
+                from qywx_chat_history t1
+                join third_party_user t2 on t1.sender = t2.third_party_user_id
+                where roomid = %s
+                order by sendtime desc
+                limit %s;
+            """
+            messages = self.db.select(
+                sql=fetch_query,
+                cursor_type=pymysql.cursors.DictCursor,
+                args=(room_id, page_size + 1)
+            )
+        else:
+            fetch_query = f"""
+                select t1.sender, t2.name, t1.sendtime, t1.content, t2.iconurl
+                from qywx_chat_history t1
+                join third_party_user t2 on t1.sender = t2.third_party_user_id
+                where t1.roomid = %s and t1.sendtime <= %s
+                order by sendtime desc
+                limit %s;
+            """
+            messages = self.db.select(
+                sql=fetch_query,
+                cursor_type=pymysql.cursors.DictCursor,
+                args=(room_id, page, page_size + 1)
+            )
+        if messages:
+            if len(messages) > page_size:
+                has_next_page = True
+                next_page = messages[-1]['sendtime']
+            else:
+                has_next_page = False
+                next_page = None
+            response_data = [
+                {
+                    "sender_id": message['sender'],
+                    "sender_name": message['name'],
+                    "avatar": message['iconurl'],
+                    "content": message['content'],
+                    "timestamp": message['sendtime'],
+                    "role": "customer" if message['sender'] == customer_id else "staff"
+                }
+                for message in messages
+            ]
+            return {
+                "staff_id": staff_id,
+                "customer_id": customer_id,
+                "has_next_page": has_next_page,
+                "next_page": next_page,
+                "data": response_data
+            }
+        else:
+            has_next_page = False
+            next_page = None
+            return {
+                "staff_id": staff_id,
+                "customer_id": customer_id,
+                "has_next_page": has_next_page,
+                "next_page": next_page,
+                "data": []
+            }
+
+
 class LocalUserRelationManager(UserRelationManager):
     def __init__(self):
         pass

+ 58 - 6
pqai_agent_server/api_server.py

@@ -1,6 +1,7 @@
 #! /usr/bin/env python
 # -*- coding: utf-8 -*-
 # vim:fenc=utf-8
+import time
 import logging
 import werkzeug.exceptions
 from flask import Flask, request, jsonify
@@ -165,22 +166,73 @@ def health_check():
 
 @app.route("/api/getStaffSessionSummary", methods=["GET"])
 def get_staff_session_summary():
-    return wrap_response(200, msg="OK")
+    staff_id = request.args.get("staff_id")
+    status = request.args.get("status", 1)
+    page_id = request.args.get("page_id", 1)
+    page_size = request.args.get("page_size", 10)
+
+    # check params
+    try:
+        page_id = int(page_id)
+        page_size = int(page_size)
+        status = int(status)
+    except Exception as e:
+        return wrap_response(404, msg="Invalid parameter: {}".format(e))
+
+    staff_session_summary = app.user_manager.get_staff_sessions_summary_v1(
+        staff_id, page_id, page_size, status
+    )
+
+    if not staff_session_summary:
+        return wrap_response(404, msg="staff not found")
+    else:
+        return wrap_response(200, data=staff_session_summary)
 
 
 @app.route("/api/getStaffSessionList", methods=["GET"])
 def get_staff_session_list():
-    return wrap_response(200, msg="OK")
+    staff_id = request.args.get("staff_id")
+    if not staff_id:
+        return wrap_response(404, msg="staff_id is required")
+
+    page_size = request.args.get("page_size", 10)
+    page_id = request.args.get("page_id", 1)
+    staff_session_list = app.user_manager.get_staff_session_list_v1(staff_id, page_id, page_size)
+    if not staff_session_list:
+        return wrap_response(404, msg="staff not found")
+
+    return wrap_response(200, data=staff_session_list)
 
 
-@app.route("api/getConversationList", methods=["GET"])
+@app.route("/api/getStaffList", methods=["GET"])
+def get_staff_list():
+    page_size = request.args.get("page_size", 10)
+    page_id = request.args.get("page_id", 1)
+    staff_list = app.user_manager.get_staff_list(page_id, page_size)
+    if not staff_list:
+        return wrap_response(404, msg="staff not found")
+    return wrap_response(200, data=staff_list)
+
+
+@app.route("/api/getConversationList", methods=["GET"])
 def get_conversation_list():
-    return wrap_response(200, msg="OK")
+    """
+    获取staff && customer的 私聊对话列表
+    :return:
+    """
+    staff_id = request.args.get("staff_id")
+    customer_id = request.args.get("customer_id")
+    if not staff_id or not customer_id:
+        return wrap_response(404, msg="staff_id and customer_id are required")
 
+    page = request.args.get("page")
+    response = app.user_manager.get_conversation_list_v1(staff_id, customer_id, page)
+    return wrap_response(200, data=response)
 
-@app.route("api/sendMessage", methods=["POST"])
+
+@app.route("/api/sendMessage", methods=["POST"])
 def send_message():
-    return wrap_response(200, msg="OK")
+    return wrap_response(200, msg="暂不实现功能")
 
 
 @app.errorhandler(werkzeug.exceptions.BadRequest)

+ 0 - 1
requirements.txt

@@ -54,7 +54,6 @@ pyapollos~=0.1.5
 Werkzeug~=3.1.3
 Flask~=3.1.0
 jsonschema~=4.23.0
-pqai_agent~=0.1.0
 numpy~=2.2.5
 pillow~=11.2.1
 json5~=0.12.0