dataset_server.py 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. from typing import List
  2. from pqai_agent.data_models.dataset_model import DatasetModule
  3. from pqai_agent.data_models.internal_conversation_data import InternalConversationData
  4. from pqai_agent.data_models.qywx_chat_history import QywxChatHistory
  5. from pqai_agent.data_models.qywx_employee import QywxEmployee
  6. from pqai_agent_server.utils.odps_utils import ODPSUtils
  7. class DatasetServer:
  8. def __init__(self, session_maker):
  9. self.session_maker = session_maker
  10. odps_utils = ODPSUtils()
  11. self.odps_utils = odps_utils
  12. def get_user_profile_data(self, third_party_user_id: str, date_version: str):
  13. sql = f"""
  14. SELECT * FROM third_party_user_date_version
  15. WHERE dt between '20250612' and {date_version} -- 添加分区条件
  16. and third_party_user_id = {third_party_user_id}
  17. and profile_data_v1 is not null
  18. order by dt desc
  19. limit 1
  20. """
  21. result_df = self.odps_utils.execute_sql(sql)
  22. if not result_df.empty:
  23. return result_df.iloc[0].to_dict() # 获取第一行
  24. return None
  25. def get_dataset_list_by_module(self, module_id: int):
  26. with self.session_maker() as session:
  27. return session.query(DatasetModule).filter(DatasetModule.module_id == module_id).filter(
  28. DatasetModule.is_delete == 0).all()
  29. def get_conversation_data_list_by_dataset(self, dataset_id: int):
  30. with self.session_maker() as session:
  31. return session.query(InternalConversationData).filter(
  32. InternalConversationData.dataset_id == dataset_id).filter(
  33. DatasetModule.is_delete == 0).all()
  34. def get_conversation_data_by_id(self, conversation_data_id: int):
  35. with self.session_maker() as session:
  36. return session.query(InternalConversationData).filter(
  37. InternalConversationData.id == conversation_data_id).one()
  38. def get_staff_profile_data(self, third_party_user_id: str):
  39. with self.session_maker() as session:
  40. return session.query(QywxEmployee).filter(
  41. QywxEmployee.third_party_user_id == third_party_user_id).one()
  42. def get_conversation_list_by_ids(self, conversation_ids: List[int]):
  43. with self.session_maker() as session:
  44. return session.query(QywxChatHistory).filter(QywxChatHistory.id in conversation_ids).all()