import json from cgitb import reset from typing import List from sqlalchemy import func from pqai_agent.data_models.dataset_model import DatasetModule from pqai_agent.data_models.datasets import Datasets from pqai_agent.data_models.internal_conversation_data import InternalConversationData from pqai_agent.data_models.qywx_chat_history import QywxChatHistory from pqai_agent.data_models.qywx_employee import QywxEmployee from pqai_agent_server.const.type_enum import get_dataset_type_desc from pqai_agent_server.utils.odps_utils import ODPSUtils class DatasetService: def __init__(self, session_maker): self.session_maker = session_maker odps_utils = ODPSUtils() self.odps_utils = odps_utils def get_user_profile_data(self, third_party_user_id: str, date_version: str): sql = f""" SELECT * FROM third_party_user_date_version WHERE dt between '20250612' and {date_version} -- 添加分区条件 and third_party_user_id = {third_party_user_id} and profile_data_v1 is not null order by dt desc limit 1 """ result_df = self.odps_utils.execute_sql(sql) if not result_df.empty: return result_df.iloc[0].to_dict() # 获取第一行 return None def get_dataset_list_by_module(self, module_id: int): with self.session_maker() as session: return session.query(DatasetModule).filter(DatasetModule.module_id == module_id).filter( DatasetModule.is_delete == 0).all() def get_conversation_data_list_by_dataset(self, dataset_id: int): with self.session_maker() as session: return session.query(InternalConversationData).filter( InternalConversationData.dataset_id == dataset_id).filter( InternalConversationData.is_delete == 0).all() def get_conversation_data_by_id(self, conversation_data_id: int): with self.session_maker() as session: return session.query(InternalConversationData).filter( InternalConversationData.id == conversation_data_id).one() def get_staff_profile_data(self, third_party_user_id: str): with self.session_maker() as session: return session.query(QywxEmployee).filter( QywxEmployee.third_party_user_id == third_party_user_id).one() def get_conversation_list_by_ids(self, conversation_ids: List[int]): with self.session_maker() as session: conversations = session.query(QywxChatHistory).filter(QywxChatHistory.id.in_(conversation_ids)).all() result = [] for conversation in conversations: data = {} data["id"] = conversation.id data["sender"] = conversation.sender data["receiver"] = conversation.receiver data["roomid"] = conversation.roomid data["sendtime"] = conversation.sendtime / 1000 data["msg_type"] = conversation.msg_type data["content"] = conversation.content result.append(data) return result def get_dataset_list(self, page_num: int, page_size: int): with self.session_maker() as session: # 计算偏移量 offset = (page_num - 1) * page_size # 查询分页数据 result = (session.query(Datasets) .filter(Datasets.is_delete == 0) .limit(page_size).offset(offset).all()) # 查询总记录数 total = session.query(func.count(Datasets.id)).filter(Datasets.is_delete == 0).scalar() total_page = total // page_size + 1 if total % page_size > 0 else total // page_size total_page = 1 if total_page <= 0 else total_page response_data = [ { "id": dataset.id, "name": dataset.name, "type": get_dataset_type_desc(dataset.type), "description": dataset.description, "createTime": dataset.create_time.strftime("%Y-%m-%d %H:%M:%S"), "updateTime": dataset.update_time.strftime("%Y-%m-%d %H:%M:%S") } for dataset in result ] return { "currentPage": page_num, "pageSize": page_size, "totalSize": total_page, "total": total, "list": response_data, } def get_conversation_data_list(self, dataset_id: int, page_num: int, page_size: int): with self.session_maker() as session: # 计算偏移量 offset = (page_num - 1) * page_size # 查询分页数据 result = (session.query(InternalConversationData) .filter(InternalConversationData.dataset_id == dataset_id) .filter(InternalConversationData.is_delete == 0) .limit(page_size).offset(offset).all()) # 查询总记录数 total = session.query(func.count(InternalConversationData.id)).filter( InternalConversationData.is_delete == 0).scalar() total_page = total // page_size + 1 if total % page_size > 0 else total // page_size total_page = 1 if total_page <= 0 else total_page response_data = [] for conversation_data in result: data = {} data["id"] = conversation_data.id data["datasetId"] = conversation_data.dataset_id data["staff"] = self.get_staff_profile_data(conversation_data.staff_id).agent_profile data["user"] = self.get_user_profile_data(conversation_data.user_id, conversation_data.version_date.replace("-", ""))['profile_data_v1'] data["conversation"] = self.get_conversation_list_by_ids(json.loads(conversation_data.conversation)) data["content"] = conversation_data.content data["sendTime"] = conversation_data.send_time data["sendType"] = conversation_data.send_type data["userActiveRate"] = conversation_data.user_active_rate data["createTime"]: conversation_data.create_time.strftime("%Y-%m-%d %H:%M:%S") data["updateTime"]: conversation_data.update_time.strftime("%Y-%m-%d %H:%M:%S") response_data.append(data) return { "currentPage": page_num, "pageSize": page_size, "totalSize": total_page, "total": total, "list": response_data, }