from typing import List from pqai_agent.data_models.dataset_model import DatasetModule from pqai_agent.data_models.internal_conversation_data import InternalConversationData from pqai_agent.data_models.qywx_chat_history import QywxChatHistory from pqai_agent.data_models.qywx_employee import QywxEmployee from pqai_agent_server.utils.odps_utils import ODPSUtils class DatasetServer: def __init__(self, session_maker): self.session_maker = session_maker odps_utils = ODPSUtils() self.odps_utils = odps_utils def get_user_profile_data(self, third_party_user_id: str, date_version: str): sql = f""" SELECT * FROM third_party_user_date_version WHERE dt between '20250612' and {date_version} -- 添加分区条件 and third_party_user_id = {third_party_user_id} and profile_data_v1 is not null order by dt desc limit 1 """ result_df = self.odps_utils.execute_sql(sql) if not result_df.empty: return result_df.iloc[0].to_dict() # 获取第一行 return None def get_dataset_list_by_module(self, module_id: int): with self.session_maker() as session: return session.query(DatasetModule).filter(DatasetModule.module_id == module_id).filter( DatasetModule.is_delete == 0).all() def get_conversation_data_list_by_dataset(self, dataset_id: int): with self.session_maker() as session: return session.query(InternalConversationData).filter( InternalConversationData.dataset_id == dataset_id).filter( DatasetModule.is_delete == 0).all() def get_conversation_data_by_id(self, conversation_data_id: int): with self.session_maker() as session: return session.query(InternalConversationData).filter( InternalConversationData.id == conversation_data_id).one() def get_staff_profile_data(self, third_party_user_id: str): with self.session_maker() as session: return session.query(QywxEmployee).filter( QywxEmployee.third_party_user_id == third_party_user_id).one() def get_conversation_list_by_ids(self, conversation_ids: List[int]): with self.session_maker() as session: return session.query(QywxChatHistory).filter(QywxChatHistory.id in conversation_ids).all()