123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869 |
- # application/functions/async_mysql_service.py
- import asyncio
- import json
- import os
- from typing import List, Optional, Dict, Any
- from application.base.async_mysql_client import AsyncMySQLClient
- from utils.env_loader import load_env
- class AsyncMysqlService:
- """
- 项目业务逻辑封装类,基于 AsyncMySQLClient 实现异步数据库访问
- 功能:
- - 封装与业务相关的 SQL 操作
- - 自动读取环境变量进行配置初始化
- - 与爬虫、任务处理逻辑解耦
- """
- def __init__(self):
- """
- 初始化时自动从环境变量读取配置并构造底层连接池客户端
- """
- db_config = {
- "host": os.getenv("DB_HOST"),
- "port": int(os.getenv("DB_PORT")),
- "user": os.getenv("DB_USER"),
- "password": os.getenv("DB_PASSWORD"),
- "db": os.getenv("DB_NAME"),
- "charset": os.getenv("DB_CHARSET")
- }
- self.client = AsyncMySQLClient(**db_config)
- async def init(self):
- """连接池初始化,在服务启动时调用一次"""
- await self.client.init_pool()
- async def get_user_list(self,id) -> List[Dict[str, Any]]:
- sql = "SELECT uid, link, nick_name from crawler_user_v3 where task_id = %s"
- return await self.client.fetch_all(sql, [id])
- async def get_rule_dict(self, rule_id: int) -> Optional[Dict[str, Any]]:
- sql = "SELECT rule FROM crawler_task_v3 WHERE id = %s"
- row = await self.client.fetch_one(sql, [rule_id])
- if not row or "rule" not in row:
- return None
- try:
- # 合并 list[dict] 为一个 dict
- return {k: v for item in json.loads(row["rule"]) for k, v in item.items()}
- except json.JSONDecodeError as e:
- print(f"[get_rule_dict] JSON 解析失败: {e}")
- return None
- async def main():
- mysql_service = AsyncMysqlService()
- await mysql_service.init()
- users = await mysql_service.get_user_list(18)
- rules = await mysql_service.get_rule_dict(18)
- print(users)
- await mysql_service.client.close()
- if __name__ == '__main__':
- asyncio.run(main())
|