12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- #! /usr/bin/env python
- # -*- coding: utf-8 -*-
- # vim:fenc=utf-8
- from typing import Dict, List
- import requests
- from pymysql.cursors import DictCursor
- from pqai_agent.database import MySQLManager
- from pqai_agent.logging_service import logger
- import time
- from pqai_agent import configs
- from pqai_agent.mq_message import MessageType
- class HistoryDialogueService:
- def __init__(self, base_url: str):
- self.base_url = base_url
- @staticmethod
- def convert_raw_records_to_base_messages(data: List[Dict], staff_id: str, user_id: str, reverse: bool = False) -> List[Dict]:
- ret = []
- if reverse:
- data = reversed(data)
- for record in data:
- sender = record.get('sender')
- if sender == user_id:
- role = 'user'
- elif sender == staff_id:
- role = 'assistant'
- else:
- logger.warning("Unknown sender in dialogue history: {}".format(sender))
- continue
- msg_type = record.get('type', MessageType.TEXT.value)
- message = {
- 'role': role,
- 'content': record.get('content', ''),
- 'timestamp': record.get('sendTime', 0),
- 'type': MessageType(msg_type)
- }
- if message['type'] in (MessageType.VOICE_VIDEO_CALL, ):
- logger.warning(f"staff[{staff_id}], user[{user_id}]: skip unsupported message type {message['type']}")
- continue
- ret.append(message)
- return ret
- def get_dialogue_history(self, staff_id: str, user_id: str, recent_minutes: int = 1440):
- time_begin = int(time.time() * 1000) - recent_minutes * 60 * 1000
- url = f"{self.base_url}?sender={staff_id}&receiver={user_id}&time={time_begin}"
- response = requests.post(url, headers={
- 'Content-Type': 'application/json'
- })
- if response.status_code != 200:
- raise Exception("Request error [{}]: {}".format(response.status_code, response.text))
- data = response.json()
- if not data.get('success', False):
- raise Exception("Error in response: {}".format(data.get('message', 'no message returned')))
- data = data.get('data', [])
- ret = self.convert_raw_records_to_base_messages(data, staff_id, user_id)
- ret = sorted(ret, key=lambda x: x['timestamp'])
- return ret
- class HistoryDialogueDatabase:
- PRIVATE_ROOM_ID_FORMAT = 'private:%s:%s'
- def __init__(self, db_config, table_name: str = 'qywx_chat_history'):
- self.db = MySQLManager(db_config)
- self.table_name = table_name
- def get_dialogue_history_backward(self, staff_id: str, user_id: str, end_timestamp_ms: int, limit: int = 100):
- if staff_id < user_id:
- room_id = self.PRIVATE_ROOM_ID_FORMAT % (staff_id, user_id)
- else:
- room_id = self.PRIVATE_ROOM_ID_FORMAT % (user_id, staff_id)
- sql = f"SELECT sender, receiver, msg_type, content, sendtime as sendTime FROM {self.table_name} " \
- "WHERE roomid = %s AND sendtime <= %s ORDER BY sendtime DESC LIMIT %s"
- data = self.db.select(sql, DictCursor, (room_id, end_timestamp_ms, limit))
- if not data:
- return []
- ret = HistoryDialogueService.convert_raw_records_to_base_messages(data, staff_id, user_id, reverse=True)
- return ret
- if __name__ == '__main__':
- api_url = configs.get()['storage']['history_dialogue']['api_base_url']
- service = HistoryDialogueService(api_url)
- resp = service.get_dialogue_history(staff_id='1688857241615085', user_id='7881299616070168', recent_minutes=5*1440)
- print(resp)
- user_db_config = configs.get()['storage']['user']['mysql']
- db = HistoryDialogueDatabase(user_db_config)
- # print(db.get_dialogue_history_backward('1688854492669990', '7881301263964433', 1747397155000))
|