history_dialogue_service.py 3.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. #! /usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # vim:fenc=utf-8
  4. from typing import Dict, List
  5. import requests
  6. from pymysql.cursors import DictCursor
  7. from pqai_agent.database import MySQLManager
  8. from pqai_agent.logging_service import logger
  9. import time
  10. from pqai_agent import configs
  11. from pqai_agent.message import MessageType
  12. class HistoryDialogueService:
  13. def __init__(self, base_url: str):
  14. self.base_url = base_url
  15. @staticmethod
  16. def convert_raw_records_to_base_messages(data: List[Dict], staff_id: str, user_id: str, reverse: bool = False) -> List[Dict]:
  17. ret = []
  18. if reverse:
  19. data = reversed(data)
  20. for record in data:
  21. sender = record.get('sender')
  22. if sender == user_id:
  23. role = 'user'
  24. elif sender == staff_id:
  25. role = 'assistant'
  26. else:
  27. logger.warning("Unknown sender in dialogue history: {}".format(sender))
  28. continue
  29. msg_type = record.get('type', MessageType.TEXT.value)
  30. message = {
  31. 'role': role,
  32. 'content': record.get('content', ''),
  33. 'timestamp': record.get('sendTime', 0),
  34. 'type': MessageType(msg_type)
  35. }
  36. if message['type'] in (MessageType.VOICE_VIDEO_CALL, ):
  37. logger.warning(f"staff[{staff_id}], user[{user_id}]: skip unsupported message type {message['type']}")
  38. continue
  39. ret.append(message)
  40. return ret
  41. def get_dialogue_history(self, staff_id: str, user_id: str, recent_minutes: int = 1440):
  42. time_begin = int(time.time() * 1000) - recent_minutes * 60 * 1000
  43. url = f"{self.base_url}?sender={staff_id}&receiver={user_id}&time={time_begin}"
  44. response = requests.post(url, headers={
  45. 'Content-Type': 'application/json'
  46. })
  47. if response.status_code != 200:
  48. raise Exception("Request error [{}]: {}".format(response.status_code, response.text))
  49. data = response.json()
  50. if not data.get('success', False):
  51. raise Exception("Error in response: {}".format(data.get('message', 'no message returned')))
  52. data = data.get('data', [])
  53. ret = self.convert_raw_records_to_base_messages(data, staff_id, user_id)
  54. ret = sorted(ret, key=lambda x: x['timestamp'])
  55. return ret
  56. class HistoryDialogueDatabase:
  57. PRIVATE_ROOM_ID_FORMAT = 'private:%s:%s'
  58. def __init__(self, db_config, table_name: str = 'qywx_chat_history'):
  59. self.db = MySQLManager(db_config)
  60. self.table_name = table_name
  61. def get_dialogue_history_backward(self, staff_id: str, user_id: str, end_timestamp_ms: int, limit: int = 100):
  62. if staff_id < user_id:
  63. room_id = self.PRIVATE_ROOM_ID_FORMAT % (staff_id, user_id)
  64. else:
  65. room_id = self.PRIVATE_ROOM_ID_FORMAT % (user_id, staff_id)
  66. sql = f"SELECT sender, receiver, msg_type, content, sendtime as sendTime FROM {self.table_name} " \
  67. "WHERE roomid = %s AND sendtime < %s ORDER BY sendtime DESC LIMIT %s"
  68. data = self.db.select(sql, DictCursor, (room_id, end_timestamp_ms, limit))
  69. if not data:
  70. return []
  71. ret = HistoryDialogueService.convert_raw_records_to_base_messages(data, staff_id, user_id, reverse=True)
  72. return ret
  73. if __name__ == '__main__':
  74. api_url = configs.get()['storage']['history_dialogue']['api_base_url']
  75. service = HistoryDialogueService(api_url)
  76. resp = service.get_dialogue_history(staff_id='1688857241615085', user_id='7881299616070168', recent_minutes=5*1440)
  77. print(resp)
  78. user_db_config = configs.get()['storage']['user']['mysql']
  79. db = HistoryDialogueDatabase(user_db_config)
  80. # print(db.get_dialogue_history_backward('1688854492669990', '7881301263964433', 1747397155000))