Browse Source

Update history_dialogue_service: add HistoryDialogueDatabase

StrayWarrior 1 month ago
parent
commit
638efa968a
1 changed files with 50 additions and 14 deletions
  1. 50 14
      pqai_agent/history_dialogue_service.py

+ 50 - 14
pqai_agent/history_dialogue_service.py

@@ -1,8 +1,12 @@
 #! /usr/bin/env python
 # -*- coding: utf-8 -*-
 # vim:fenc=utf-8
+from typing import Dict, List
 
 import requests
+from pymysql.cursors import DictCursor
+
+from pqai_agent.database import MySQLManager
 from pqai_agent.logging_service import logger
 import time
 
@@ -14,19 +18,11 @@ class HistoryDialogueService:
     def __init__(self, base_url: str):
         self.base_url = base_url
 
-    def get_dialogue_history(self, staff_id: str, user_id: str, recent_minutes: int = 1440):
-        time_begin = int(time.time() * 1000) - recent_minutes * 60 * 1000
-        url = f"{self.base_url}?sender={staff_id}&receiver={user_id}&time={time_begin}"
-        response = requests.post(url, headers={
-            'Content-Type': 'application/json'
-        })
-        if response.status_code != 200:
-            raise Exception("Request error [{}]: {}".format(response.status_code, response.text))
-        data = response.json()
-        if not data.get('success', False):
-            raise Exception("Error in response: {}".format(data.get('message', 'no message returned')))
-        data = data.get('data', [])
+    @staticmethod
+    def convert_raw_records_to_base_messages(data: List[Dict], staff_id: str, user_id: str, reverse: bool = False) -> List[Dict]:
         ret = []
+        if reverse:
+            data = reversed(data)
         for record in data:
             sender = record.get('sender')
             if sender == user_id:
@@ -47,12 +43,52 @@ class HistoryDialogueService:
                 logger.warning(f"staff[{staff_id}], user[{user_id}]: skip unsupported message type {message['type']}")
                 continue
             ret.append(message)
+        return ret
+
+    def get_dialogue_history(self, staff_id: str, user_id: str, recent_minutes: int = 1440):
+        time_begin = int(time.time() * 1000) - recent_minutes * 60 * 1000
+        url = f"{self.base_url}?sender={staff_id}&receiver={user_id}&time={time_begin}"
+        response = requests.post(url, headers={
+            'Content-Type': 'application/json'
+        })
+        if response.status_code != 200:
+            raise Exception("Request error [{}]: {}".format(response.status_code, response.text))
+        data = response.json()
+        if not data.get('success', False):
+            raise Exception("Error in response: {}".format(data.get('message', 'no message returned')))
+        data = data.get('data', [])
+        ret = self.convert_raw_records_to_base_messages(data, staff_id, user_id)
         ret = sorted(ret, key=lambda x: x['timestamp'])
         return ret
 
 
+class HistoryDialogueDatabase:
+    PRIVATE_ROOM_ID_FORMAT = 'private:%s:%s'
+
+    def __init__(self, db_config, table_name: str = 'qywx_chat_history'):
+        self.db = MySQLManager(db_config)
+        self.table_name = table_name
+
+    def get_dialogue_history_backward(self, staff_id: str, user_id: str, end_timestamp_ms: int, limit: int = 100):
+        if staff_id < user_id:
+            room_id = self.PRIVATE_ROOM_ID_FORMAT % (staff_id, user_id)
+        else:
+            room_id = self.PRIVATE_ROOM_ID_FORMAT % (user_id, staff_id)
+        sql = f"SELECT sender, receiver, msg_type, content, sendtime as sendTime FROM {self.table_name} " \
+                "WHERE roomid = %s AND sendtime < %s ORDER BY sendtime DESC LIMIT %s"
+        data = self.db.select(sql, DictCursor, (room_id, end_timestamp_ms, limit))
+        if not data:
+            return []
+        ret = HistoryDialogueService.convert_raw_records_to_base_messages(data, staff_id, user_id, reverse=True)
+        return ret
+
+
+
 if __name__ == '__main__':
     api_url = configs.get()['storage']['history_dialogue']['api_base_url']
     service = HistoryDialogueService(api_url)
-    resp = service.get_dialogue_history(staff_id='1688854492669990', user_id='7881301263964433')
-    print(resp)
+    resp = service.get_dialogue_history(staff_id='1688857241615085', user_id='7881299616070168', recent_minutes=5*1440)
+    print(resp)
+    user_db_config = configs.get()['storage']['user']['mysql']
+    db = HistoryDialogueDatabase(user_db_config)
+    # print(db.get_dialogue_history_backward('1688854492669990', '7881301263964433', 1747397155000))