database.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. from sqlalchemy import create_engine, Column, Integer, String, DateTime
  2. from sqlalchemy.ext.declarative import declarative_base
  3. from sqlalchemy.orm import sessionmaker
  4. from sqlalchemy.exc import SQLAlchemyError
  5. from urllib.parse import quote_plus
  6. import configs
  7. # 配置日志
  8. from core.config import logger
  9. # 创建基础类
  10. Base = declarative_base()
  11. def create_sql_engine(config):
  12. user = config['user']
  13. passwd = quote_plus(config['password'])
  14. host = config['host']
  15. db_name = config['database']
  16. charset = config.get('charset', 'utf8mb4')
  17. engine = create_engine(f'mysql+mysqlconnector://{user}:{passwd}@{host}/{db_name}?charset={charset}')
  18. return engine
  19. def create_rag_db_engine():
  20. config = configs.get()['database']['rag']
  21. return create_sql_engine(config)
  22. # 创建数据库引擎
  23. engine = create_rag_db_engine()
  24. # 创建会话
  25. Session = sessionmaker(bind=engine)
  26. class DBHelper:
  27. def __init__(self):
  28. """初始化数据库连接"""
  29. self.session = Session()
  30. def add(self, entity):
  31. """插入实体对象"""
  32. try:
  33. self.session.add(entity)
  34. self.session.commit()
  35. logger.info(f"添加成功: {entity}")
  36. return entity
  37. except SQLAlchemyError as e:
  38. self.session.rollback()
  39. logger.error(f"添加失败: {e}")
  40. raise
  41. def get(self, model, **filters):
  42. """根据过滤条件获取实体对象"""
  43. try:
  44. entity = self.session.query(model).filter_by(**filters).first()
  45. logger.info(f"查询成功: {entity}")
  46. return entity
  47. except SQLAlchemyError as e:
  48. logger.error(f"查询失败: {e}")
  49. raise
  50. def update(self, model, filters, updates):
  51. """更新实体对象"""
  52. try:
  53. entity = self.session.query(model).filter_by(**filters).first()
  54. if entity:
  55. for key, value in updates.items():
  56. setattr(entity, key, value)
  57. self.session.commit()
  58. logger.info(f"更新成功: {entity}")
  59. return entity
  60. else:
  61. logger.warning(f"未找到符合条件的实体: {filters}")
  62. return None
  63. except SQLAlchemyError as e:
  64. self.session.rollback()
  65. logger.error(f"更新失败: {e}")
  66. raise
  67. def delete(self, model, **filters):
  68. """删除实体对象"""
  69. try:
  70. entity = self.session.query(model).filter_by(**filters).first()
  71. if entity:
  72. self.session.delete(entity)
  73. self.session.commit()
  74. logger.info(f"删除成功: {entity}")
  75. return entity
  76. else:
  77. logger.warning(f"未找到符合条件的实体: {filters}")
  78. return None
  79. except SQLAlchemyError as e:
  80. self.session.rollback()
  81. logger.error(f"删除失败: {e}")
  82. raise
  83. # def get_all(self, model, **filters):
  84. # """获取所有符合条件的实体对象"""
  85. # try:
  86. # entities = self.session.query(model).filter_by(**filters).all()
  87. # logger.info(f"查询成功: {entities}")
  88. # return entities
  89. # except SQLAlchemyError as e:
  90. # logger.error(f"查询失败: {e}")
  91. # raise
  92. def get_all(self, model, **filters):
  93. """获取所有符合条件的实体对象,支持更复杂的查询条件"""
  94. try:
  95. query = self.session.query(model)
  96. # 处理特殊条件如 __in
  97. actual_filters = {}
  98. for key, value in filters.items():
  99. if key.endswith('__in'):
  100. # 处理 IN 查询
  101. field_name = key[:-4]
  102. field = getattr(model, field_name)
  103. query = query.filter(field.in_(value))
  104. else:
  105. actual_filters[key] = value
  106. # 应用其他过滤条件
  107. if actual_filters:
  108. query = query.filter_by(**actual_filters)
  109. entities = query.all()
  110. logger.info(f"查询成功: {entities}")
  111. return entities
  112. except SQLAlchemyError as e:
  113. logger.error(f"查询失败: {e}")
  114. raise
  115. # 创建表
  116. Base.metadata.create_all(engine)