12345678910111213141516171819202122232425262728293031323334353637383940414243 |
- from sqlalchemy import create_engine
- from sqlalchemy.orm import sessionmaker, declarative_base
- from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
- import os
- from dotenv import load_dotenv
- # 加载环境变量
- load_dotenv()
- # 数据库连接配置
- DATABASE_URL = os.getenv(
- "DATABASE_URL",
- "mysql+pymysql://wqsd:wqsd%402025@knowledge.rwlb.rds.aliyuncs.com:3306/ai_knowledge?charset=utf8&connect_timeout=60&read_timeout=300&write_timeout=300"
- )
- # 创建同步引擎和会话
- engine = create_engine(
- DATABASE_URL,
- pool_size=10,
- max_overflow=20,
- pool_timeout=60, # 增加池连接超时时间
- pool_recycle=1800, # 减少连接回收时间,单位为秒,防止连接长时间占用
- pool_pre_ping=True, # 添加连接前ping测试,确保连接有效
- echo=False, # 设为True可查看SQL日志
- connect_args={
- "connect_timeout": 60, # 连接超时时间
- "read_timeout": 300, # 读取超时时间
- "write_timeout": 300, # 写入超时时间
- }
- )
- SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
- # 声明基类
- Base = declarative_base()
- # 获取数据库会话的依赖函数
- def get_db():
- db = SessionLocal()
- try:
- yield db
- finally:
- db.close()
|