async_mysql_service.py 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. # application/functions/async_mysql_service.py
  2. import asyncio
  3. import json
  4. import os
  5. from typing import List, Optional, Dict, Any
  6. from application.base.async_mysql_client import AsyncMySQLClient
  7. from utils.env_loader import load_env
  8. class AsyncMysqlService:
  9. """
  10. 项目业务逻辑封装类,基于 AsyncMySQLClient 实现异步数据库访问
  11. 功能:
  12. - 封装与业务相关的 SQL 操作
  13. - 自动读取环境变量进行配置初始化
  14. - 与爬虫、任务处理逻辑解耦
  15. """
  16. def __init__(self):
  17. """
  18. 初始化时自动从环境变量读取配置并构造底层连接池客户端
  19. """
  20. db_config = {
  21. "host": os.getenv("DB_HOST"),
  22. "port": int(os.getenv("DB_PORT")),
  23. "user": os.getenv("DB_USER"),
  24. "password": os.getenv("DB_PASSWORD"),
  25. "db": os.getenv("DB_NAME"),
  26. "charset": os.getenv("DB_CHARSET")
  27. }
  28. self.client = AsyncMySQLClient(**db_config)
  29. async def init(self):
  30. """连接池初始化,在服务启动时调用一次"""
  31. await self.client.init_pool()
  32. async def get_user_list(self,id) -> List[Dict[str, Any]]:
  33. sql = "SELECT uid, link, nick_name from crawler_user_v3 where task_id = %s"
  34. return await self.client.fetch_all(sql, [id])
  35. async def get_rule_dict(self, rule_id: int) -> Optional[Dict[str, Any]]:
  36. sql = "SELECT rule FROM crawler_task_v3 WHERE id = %s"
  37. row = await self.client.fetch_one(sql, [rule_id])
  38. if not row or "rule" not in row:
  39. return None
  40. try:
  41. # 合并 list[dict] 为一个 dict
  42. return {k: v for item in json.loads(row["rule"]) for k, v in item.items()}
  43. except json.JSONDecodeError as e:
  44. print(f"[get_rule_dict] JSON 解析失败: {e}")
  45. return None
  46. async def main():
  47. mysql_service = AsyncMysqlService()
  48. await mysql_service.init()
  49. users = await mysql_service.get_user_list(18)
  50. rules = await mysql_service.get_rule_dict(18)
  51. print(users)
  52. await mysql_service.client.close()
  53. if __name__ == '__main__':
  54. asyncio.run(main())