123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160 |
- import json
- from cgitb import reset
- from typing import List
- from sqlalchemy import func
- from pqai_agent.data_models.dataset_model import DatasetModule
- from pqai_agent.data_models.datasets import Datasets
- from pqai_agent.data_models.internal_conversation_data import InternalConversationData
- from pqai_agent.data_models.qywx_chat_history import QywxChatHistory
- from pqai_agent.data_models.qywx_employee import QywxEmployee
- from pqai_agent_server.const.type_enum import get_dataset_type_desc
- from pqai_agent_server.utils.odps_utils import ODPSUtils
- class DatasetService:
- def __init__(self, session_maker):
- self.session_maker = session_maker
- odps_utils = ODPSUtils()
- self.odps_utils = odps_utils
- def get_user_profile_data(self, third_party_user_id: str, date_version: str):
- sql = f"""
- SELECT * FROM third_party_user_date_version
- WHERE dt >= '20250612' and dt < {date_version} -- 添加分区条件
- and third_party_user_id = {third_party_user_id}
- and profile_data_v1 is not null
- order by dt desc
- limit 1
- """
- result_df = self.odps_utils.execute_sql(sql)
- if not result_df.empty:
- return result_df.iloc[0].to_dict() # 获取第一行
- return None
- def get_dataset_module_list_by_module(self, module_id: int):
- with self.session_maker() as session:
- return session.query(DatasetModule).filter(DatasetModule.module_id == module_id).filter(
- DatasetModule.is_delete == 0).all()
- def get_conversation_data_list_by_dataset(self, dataset_id: int):
- with self.session_maker() as session:
- return session.query(InternalConversationData).filter(
- InternalConversationData.dataset_id == dataset_id).filter(
- InternalConversationData.is_delete == 0).order_by(
- InternalConversationData.id.asc()
- ).all()
- def get_conversation_data_by_id(self, conversation_data_id: int):
- with self.session_maker() as session:
- return session.query(InternalConversationData).filter(
- InternalConversationData.id == conversation_data_id).one()
- def get_staff_profile_data(self, third_party_user_id: str):
- with self.session_maker() as session:
- return session.query(QywxEmployee).filter(
- QywxEmployee.third_party_user_id == third_party_user_id).one()
- def get_conversation_list_by_ids(self, conversation_ids: List[int]):
- with self.session_maker() as session:
- conversations = session.query(QywxChatHistory).filter(QywxChatHistory.id.in_(conversation_ids)).order_by(
- QywxChatHistory.id.asc()).all()
- result = []
- for conversation in conversations:
- data = {}
- data["id"] = conversation.id
- data["sender"] = conversation.sender
- data["receiver"] = conversation.receiver
- data["roomid"] = conversation.roomid
- data["sendtime"] = conversation.sendtime / 1000
- data["msg_type"] = conversation.msg_type
- data["content"] = conversation.content
- result.append(data)
- return result
- def get_chat_conversation_list_by_ids(self, conversation_ids: List[int], staff_id):
- result = self.get_conversation_list_by_ids(conversation_ids)
- conversations = [
- {
- "content": conversation['content'],
- "role": "assistant" if conversation['sender'] == staff_id else "user",
- "timestamp": conversation['sendtime']
- } for conversation in result
- ]
- return conversations
- def get_dataset_list(self, page_num: int, page_size: int):
- with self.session_maker() as session:
- # 计算偏移量
- offset = (page_num - 1) * page_size
- # 查询分页数据
- result = (session.query(Datasets)
- .filter(Datasets.is_delete == 0)
- .limit(page_size).offset(offset).all())
- # 查询总记录数
- total = session.query(func.count(Datasets.id)).filter(Datasets.is_delete == 0).scalar()
- total_page = total // page_size + 1 if total % page_size > 0 else total // page_size
- total_page = 1 if total_page <= 0 else total_page
- response_data = [
- {
- "id": dataset.id,
- "name": dataset.name,
- "type": get_dataset_type_desc(dataset.type),
- "description": dataset.description,
- "createTime": dataset.create_time.strftime("%Y-%m-%d %H:%M:%S"),
- "updateTime": dataset.update_time.strftime("%Y-%m-%d %H:%M:%S")
- }
- for dataset in result
- ]
- return {
- "currentPage": page_num,
- "pageSize": page_size,
- "totalSize": total_page,
- "total": total,
- "list": response_data,
- }
- def get_conversation_data_list(self, dataset_id: int, page_num: int, page_size: int):
- with self.session_maker() as session:
- # 计算偏移量
- offset = (page_num - 1) * page_size
- # 查询分页数据
- result = (session.query(InternalConversationData)
- .filter(InternalConversationData.dataset_id == dataset_id)
- .filter(InternalConversationData.is_delete == 0)
- .limit(page_size).offset(offset).all())
- # 查询总记录数
- total = session.query(func.count(InternalConversationData.id)).filter(
- InternalConversationData.is_delete == 0).scalar()
- total_page = total // page_size + 1 if total % page_size > 0 else total // page_size
- total_page = 1 if total_page <= 0 else total_page
- response_data = []
- for conversation_data in result:
- data = {}
- data["id"] = conversation_data.id
- data["datasetId"] = conversation_data.dataset_id
- data["staff"] = self.get_staff_profile_data(conversation_data.staff_id).agent_profile
- data["user"] = self.get_user_profile_data(conversation_data.user_id,
- conversation_data.version_date.replace("-", ""))[
- 'profile_data_v1']
- data["conversation"] = self.get_conversation_list_by_ids(json.loads(conversation_data.conversation))
- data["content"] = conversation_data.content
- data["sendTime"] = conversation_data.send_time
- data["sendType"] = conversation_data.send_type
- data["userActiveRate"] = conversation_data.user_active_rate
- data["createTime"]: conversation_data.create_time.strftime("%Y-%m-%d %H:%M:%S")
- data["updateTime"]: conversation_data.update_time.strftime("%Y-%m-%d %H:%M:%S")
- response_data.append(data)
- return {
- "currentPage": page_num,
- "pageSize": page_size,
- "totalSize": total_page,
- "total": total,
- "list": response_data,
- }
|