#! /usr/bin/env python # -*- coding: utf-8 -*- # vim:fenc=utf-8 from typing import Dict, List import requests from pymysql.cursors import DictCursor from pqai_agent.database import MySQLManager from pqai_agent.logging_service import logger import time from pqai_agent import configs from pqai_agent.mq_message import MessageType class HistoryDialogueService: def __init__(self, base_url: str): self.base_url = base_url @staticmethod def convert_raw_records_to_base_messages(data: List[Dict], staff_id: str, user_id: str, reverse: bool = False) -> List[Dict]: ret = [] if reverse: data = reversed(data) for record in data: sender = record.get('sender') if sender == user_id: role = 'user' elif sender == staff_id: role = 'assistant' else: logger.warning("Unknown sender in dialogue history: {}".format(sender)) continue msg_type = record.get('type', MessageType.TEXT.value) message = { 'role': role, 'content': record.get('content', ''), 'timestamp': record.get('sendTime', 0), 'type': MessageType(msg_type) } if message['type'] in (MessageType.VOICE_VIDEO_CALL, ): logger.warning(f"staff[{staff_id}], user[{user_id}]: skip unsupported message type {message['type']}") continue ret.append(message) return ret def get_dialogue_history(self, staff_id: str, user_id: str, recent_minutes: int = 1440): time_begin = int(time.time() * 1000) - recent_minutes * 60 * 1000 url = f"{self.base_url}?sender={staff_id}&receiver={user_id}&time={time_begin}" response = requests.post(url, headers={ 'Content-Type': 'application/json' }) if response.status_code != 200: raise Exception("Request error [{}]: {}".format(response.status_code, response.text)) data = response.json() if not data.get('success', False): raise Exception("Error in response: {}".format(data.get('message', 'no message returned'))) data = data.get('data', []) ret = self.convert_raw_records_to_base_messages(data, staff_id, user_id) ret = sorted(ret, key=lambda x: x['timestamp']) return ret class HistoryDialogueDatabase: PRIVATE_ROOM_ID_FORMAT = 'private:%s:%s' def __init__(self, db_config, table_name: str = 'qywx_chat_history'): self.db = MySQLManager(db_config) self.table_name = table_name def get_dialogue_history_backward(self, staff_id: str, user_id: str, end_timestamp_ms: int, limit: int = 100): if staff_id < user_id: room_id = self.PRIVATE_ROOM_ID_FORMAT % (staff_id, user_id) else: room_id = self.PRIVATE_ROOM_ID_FORMAT % (user_id, staff_id) sql = f"SELECT sender, receiver, msg_type, content, sendtime as sendTime FROM {self.table_name} " \ "WHERE roomid = %s AND sendtime <= %s ORDER BY sendtime DESC LIMIT %s" data = self.db.select(sql, DictCursor, (room_id, end_timestamp_ms, limit)) if not data: return [] ret = HistoryDialogueService.convert_raw_records_to_base_messages(data, staff_id, user_id, reverse=True) return ret if __name__ == '__main__': api_url = configs.get()['storage']['history_dialogue']['api_base_url'] service = HistoryDialogueService(api_url) resp = service.get_dialogue_history(staff_id='1688857241615085', user_id='7881299616070168', recent_minutes=5*1440) print(resp) user_db_config = configs.get()['storage']['user']['mysql'] db = HistoryDialogueDatabase(user_db_config) # print(db.get_dialogue_history_backward('1688854492669990', '7881301263964433', 1747397155000))