# application/functions/async_mysql_service.py import asyncio import json import os from typing import List, Optional, Dict, Any from application.base.async_mysql_client import AsyncMySQLClient from utils.env_loader import load_env class AsyncMysqlService: """ 项目业务逻辑封装类,基于 AsyncMySQLClient 实现异步数据库访问 功能: - 封装与业务相关的 SQL 操作 - 自动读取环境变量进行配置初始化 - 与爬虫、任务处理逻辑解耦 """ def __init__(self): """ 初始化时自动从环境变量读取配置并构造底层连接池客户端 """ db_config = { "host": os.getenv("DB_HOST"), "port": int(os.getenv("DB_PORT")), "user": os.getenv("DB_USER"), "password": os.getenv("DB_PASSWORD"), "db": os.getenv("DB_NAME"), "charset": os.getenv("DB_CHARSET") } self.client = AsyncMySQLClient(**db_config) async def init(self): """连接池初始化,在服务启动时调用一次""" await self.client.init_pool() async def get_user_list(self,id) -> List[Dict[str, Any]]: sql = "SELECT uid, link, nick_name from crawler_user_v3 where task_id = %s" return await self.client.fetch_all(sql, [id]) async def get_rule_dict(self, rule_id: int) -> Optional[Dict[str, Any]]: sql = "SELECT rule FROM crawler_task_v3 WHERE id = %s" row = await self.client.fetch_one(sql, [rule_id]) if not row or "rule" not in row: return None try: # 合并 list[dict] 为一个 dict return {k: v for item in json.loads(row["rule"]) for k, v in item.items()} except json.JSONDecodeError as e: print(f"[get_rule_dict] JSON 解析失败: {e}") return None async def main(): mysql_service = AsyncMysqlService() await mysql_service.init() users = await mysql_service.get_user_list(18) rules = await mysql_service.get_rule_dict(18) print(users) await mysql_service.client.close() if __name__ == '__main__': asyncio.run(main())