test_mysql_utils.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410
  1. """
  2. MySQL工具库测试用例
  3. 注意:运行测试前请确保:
  4. 1. 数据库连接配置正确
  5. 2. 有测试用的数据表(或者在测试中创建)
  6. 3. 有足够的数据库权限
  7. """
  8. import unittest
  9. import time
  10. import sys
  11. import os
  12. # 添加项目根目录到路径,支持直接运行测试
  13. current_dir = os.path.dirname(os.path.abspath(__file__))
  14. project_root = os.path.dirname(os.path.dirname(current_dir))
  15. sys.path.insert(0, project_root)
  16. try:
  17. # 优先尝试相对导入(从包内运行)
  18. from . import mysql_db
  19. except ImportError:
  20. # 备用绝对导入(直接运行文件)
  21. from utils.mysql import mysql_db
  22. class TestMySQLUtils(unittest.TestCase):
  23. """MySQL工具库测试类"""
  24. @classmethod
  25. def setUpClass(cls):
  26. """测试类初始化"""
  27. print("开始MySQL工具库测试...")
  28. # 创建测试表(如果不存在)
  29. try:
  30. mysql_db.execute_update("""
  31. CREATE TABLE IF NOT EXISTS test_users (
  32. id INT AUTO_INCREMENT PRIMARY KEY,
  33. name VARCHAR(100) NOT NULL,
  34. email VARCHAR(100) UNIQUE,
  35. age INT,
  36. created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  37. updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
  38. )
  39. """)
  40. print("测试表创建成功")
  41. except Exception as e:
  42. print(f"创建测试表失败: {e}")
  43. raise
  44. @classmethod
  45. def tearDownClass(cls):
  46. """测试类清理"""
  47. try:
  48. # 清理测试数据
  49. mysql_db.execute_update("DELETE FROM test_users WHERE name LIKE 'Test%'")
  50. print("测试数据清理完成")
  51. except Exception as e:
  52. print(f"清理测试数据失败: {e}")
  53. def test_01_basic_insert(self):
  54. """测试基础插入操作"""
  55. print("\n测试基础插入操作...")
  56. test_data = {
  57. 'name': 'Test User 1',
  58. 'email': 'test1@example.com',
  59. 'age': 25
  60. }
  61. user_id = mysql_db.insert('test_users', test_data)
  62. self.assertIsNotNone(user_id)
  63. self.assertGreater(user_id, 0)
  64. print(f"插入成功,ID: {user_id}")
  65. # 验证插入的数据
  66. user = mysql_db.select_one('test_users', where='id = %s', where_params=(user_id,))
  67. self.assertIsNotNone(user)
  68. self.assertEqual(user['name'], 'Test User 1')
  69. self.assertEqual(user['email'], 'test1@example.com')
  70. def test_02_basic_select(self):
  71. """测试基础查询操作"""
  72. print("\n测试基础查询操作...")
  73. # 查询所有测试用户
  74. users = mysql_db.select('test_users', where="name LIKE %s", where_params=('Test%',))
  75. self.assertGreaterEqual(len(users), 1)
  76. print(f"查询到 {len(users)} 个测试用户")
  77. # 查询单个用户
  78. user = mysql_db.select_one('test_users', where="name = %s", where_params=('Test User 1',))
  79. self.assertIsNotNone(user)
  80. self.assertEqual(user['name'], 'Test User 1')
  81. def test_03_basic_update(self):
  82. """测试基础更新操作"""
  83. print("\n测试基础更新操作...")
  84. # 更新用户年龄
  85. affected_rows = mysql_db.update(
  86. 'test_users',
  87. {'age': 26},
  88. 'name = %s',
  89. ('Test User 1',)
  90. )
  91. self.assertGreater(affected_rows, 0)
  92. print(f"更新了 {affected_rows} 条记录")
  93. # 验证更新结果
  94. user = mysql_db.select_one('test_users', where="name = %s", where_params=('Test User 1',))
  95. self.assertEqual(user['age'], 26)
  96. def test_04_count_and_exists(self):
  97. """测试计数和存在性检查"""
  98. print("\n测试计数和存在性检查...")
  99. # 测试计数
  100. count = mysql_db.count('test_users', where="name LIKE %s", where_params=('Test%',))
  101. self.assertGreaterEqual(count, 1)
  102. print(f"测试用户总数: {count}")
  103. # 测试存在性检查
  104. exists = mysql_db.exists('test_users', 'name = %s', ('Test User 1',))
  105. self.assertTrue(exists)
  106. not_exists = mysql_db.exists('test_users', 'name = %s', ('Non Existent User',))
  107. self.assertFalse(not_exists)
  108. def test_05_batch_insert(self):
  109. """测试批量插入"""
  110. print("\n测试批量插入...")
  111. users_data = [
  112. {'name': 'Test User 2', 'email': 'test2@example.com', 'age': 22},
  113. {'name': 'Test User 3', 'email': 'test3@example.com', 'age': 23},
  114. {'name': 'Test User 4', 'email': 'test4@example.com', 'age': 24}
  115. ]
  116. affected_rows = mysql_db.insert_many('test_users', users_data)
  117. self.assertEqual(affected_rows, 3)
  118. print(f"批量插入了 {affected_rows} 条记录")
  119. def test_06_pagination(self):
  120. """测试分页查询"""
  121. print("\n测试分页查询...")
  122. result = mysql_db.paginate(
  123. 'test_users',
  124. page=1,
  125. page_size=2,
  126. where="name LIKE %s",
  127. where_params=('Test%',),
  128. order_by='id ASC'
  129. )
  130. self.assertIn('data', result)
  131. self.assertIn('pagination', result)
  132. self.assertEqual(len(result['data']), 2)
  133. self.assertEqual(result['pagination']['current_page'], 1)
  134. self.assertEqual(result['pagination']['page_size'], 2)
  135. print(f"分页查询结果: 当前页 {result['pagination']['current_page']}, "
  136. f"总记录数 {result['pagination']['total_count']}")
  137. def test_07_sorting(self):
  138. """测试排序查询"""
  139. print("\n测试排序查询...")
  140. # 单字段排序
  141. users = mysql_db.select_with_sort(
  142. 'test_users',
  143. where="name LIKE %s",
  144. where_params=('Test%',),
  145. sort_field='age',
  146. sort_order='DESC',
  147. limit=3
  148. )
  149. self.assertGreaterEqual(len(users), 1)
  150. print(f"按年龄降序查询到 {len(users)} 个用户")
  151. # 验证排序结果
  152. if len(users) > 1:
  153. self.assertGreaterEqual(users[0]['age'], users[1]['age'])
  154. def test_08_aggregation(self):
  155. """测试聚合查询"""
  156. print("\n测试聚合查询...")
  157. agg_result = mysql_db.aggregate(
  158. 'test_users',
  159. {
  160. 'total_count': 'COUNT(*)',
  161. 'avg_age': 'AVG(age)',
  162. 'max_age': 'MAX(age)',
  163. 'min_age': 'MIN(age)'
  164. },
  165. where="name LIKE %s",
  166. where_params=('Test%',)
  167. )
  168. self.assertEqual(len(agg_result), 1)
  169. result = agg_result[0]
  170. self.assertGreater(result['total_count'], 0)
  171. self.assertIsNotNone(result['avg_age'])
  172. self.assertIsNotNone(result['max_age'])
  173. self.assertIsNotNone(result['min_age'])
  174. print(f"聚合查询结果: {result}")
  175. def test_09_search(self):
  176. """测试模糊搜索"""
  177. print("\n测试模糊搜索...")
  178. results = mysql_db.search(
  179. 'test_users',
  180. ['name', 'email'],
  181. 'Test',
  182. limit=10
  183. )
  184. self.assertGreaterEqual(len(results), 1)
  185. print(f"搜索到 {len(results)} 条记录")
  186. # 验证搜索结果
  187. for result in results:
  188. self.assertTrue(
  189. 'Test' in result['name'] or 'Test' in (result['email'] or '')
  190. )
  191. def test_10_transaction(self):
  192. """测试事务操作"""
  193. print("\n测试事务操作...")
  194. try:
  195. with mysql_db.transaction():
  196. # 在事务中插入数据
  197. user_id = mysql_db.insert('test_users', {
  198. 'name': 'Test Transaction User',
  199. 'email': 'trans@example.com',
  200. 'age': 30
  201. })
  202. # 更新刚插入的数据
  203. mysql_db.update('test_users', {'age': 31}, 'id = %s', (user_id,))
  204. print(f"事务中插入并更新用户,ID: {user_id}")
  205. # 验证事务提交后的数据
  206. user = mysql_db.select_one('test_users', where='id = %s', where_params=(user_id,))
  207. self.assertIsNotNone(user)
  208. self.assertEqual(user['age'], 31)
  209. except Exception as e:
  210. self.fail(f"事务测试失败: {e}")
  211. def test_11_transaction_rollback(self):
  212. """测试事务回滚"""
  213. print("\n测试事务回滚...")
  214. initial_count = mysql_db.count('test_users')
  215. print(f"事务前记录数: {initial_count}")
  216. try:
  217. with mysql_db.transaction() as conn:
  218. # 在事务中插入一个用户,传递连接参数
  219. user_id = mysql_db.insert('test_users', {
  220. 'name': 'Test Rollback User',
  221. 'email': 'rollback@example.com',
  222. 'age': 35
  223. }, connection=conn)
  224. print(f"事务中插入用户ID: {user_id}")
  225. # 人为抛出异常触发回滚
  226. raise ValueError("测试回滚")
  227. except ValueError:
  228. # 这是预期的异常
  229. print("捕获到预期的异常,事务应该已回滚")
  230. # 等待一下确保事务完全处理
  231. import time
  232. time.sleep(0.1)
  233. # 验证回滚后数据没有增加
  234. final_count = mysql_db.count('test_users')
  235. print(f"事务后记录数: {final_count}")
  236. if initial_count != final_count:
  237. # 如果计数不匹配,查看是否是我们插入的测试数据
  238. rollback_user = mysql_db.select_one('test_users', where="name = %s", where_params=('Test Rollback User',))
  239. if rollback_user:
  240. print("❌ 事务回滚失败,找到了应该被回滚的数据")
  241. # 手动清理这条数据
  242. mysql_db.delete('test_users', 'name = %s', ('Test Rollback User',))
  243. self.fail("事务回滚失败")
  244. else:
  245. print("✅ 虽然计数不同,但回滚用户确实不存在,可能是其他并发操作")
  246. print("事务回滚测试通过")
  247. def test_12_connection_pool(self):
  248. """测试连接池"""
  249. print("\n测试连接池...")
  250. # 获取连接池状态
  251. status = mysql_db.pool.get_pool_status()
  252. self.assertIn('active_connections', status)
  253. self.assertIn('pool_size', status)
  254. self.assertIn('max_connections', status)
  255. print(f"连接池状态: {status}")
  256. # 测试并发获取连接
  257. connections = []
  258. try:
  259. for i in range(3):
  260. conn = mysql_db.pool.get_connection()
  261. connections.append(conn)
  262. # 验证连接可用性
  263. for conn in connections:
  264. conn.ping()
  265. print("连接池并发测试通过")
  266. finally:
  267. # 归还连接
  268. for conn in connections:
  269. mysql_db.pool.return_connection(conn)
  270. def test_13_error_handling(self):
  271. """测试错误处理"""
  272. print("\n测试错误处理...")
  273. # 测试查询不存在的表
  274. with self.assertRaises(Exception): # 捕获任何异常,因为可能是pymysql原生异常
  275. mysql_db.select('non_existent_table')
  276. # 测试插入空数据
  277. with self.assertRaises(ValueError):
  278. mysql_db.insert('test_users', {})
  279. print("错误处理测试通过")
  280. def test_14_performance(self):
  281. """性能测试"""
  282. print("\n性能测试...")
  283. # 测试批量插入性能
  284. start_time = time.time()
  285. batch_data = []
  286. for i in range(100):
  287. batch_data.append({
  288. 'name': f'Perf Test User {i}',
  289. 'email': f'perf{i}@example.com',
  290. 'age': 20 + (i % 30)
  291. })
  292. mysql_db.insert_many('test_users', batch_data)
  293. end_time = time.time()
  294. execution_time = end_time - start_time
  295. print(f"批量插入100条记录耗时: {execution_time:.4f}秒")
  296. self.assertLess(execution_time, 5.0) # 应该在5秒内完成
  297. # 清理性能测试数据
  298. mysql_db.delete('test_users', 'name LIKE %s', ('Perf Test User%',))
  299. def test_15_cleanup(self):
  300. """清理测试数据"""
  301. print("\n清理额外的测试数据...")
  302. # 删除事务测试用户
  303. mysql_db.delete('test_users', 'name = %s', ('Test Transaction User',))
  304. print("清理完成")
  305. def run_tests():
  306. """运行测试套件"""
  307. # 创建测试套件
  308. test_suite = unittest.TestLoader().loadTestsFromTestCase(TestMySQLUtils)
  309. # 运行测试
  310. runner = unittest.TextTestRunner(verbosity=2)
  311. result = runner.run(test_suite)
  312. # 输出测试结果
  313. if result.wasSuccessful():
  314. print(f"\n✅ 所有测试通过! 运行了 {result.testsRun} 个测试")
  315. else:
  316. print(f"\n❌ 测试失败! {len(result.failures)} 个失败, {len(result.errors)} 个错误")
  317. return result.wasSuccessful()
  318. if __name__ == '__main__':
  319. try:
  320. success = run_tests()
  321. exit(0 if success else 1)
  322. except Exception as e:
  323. print(f"测试运行出错: {e}")
  324. exit(1)