import asyncio from datetime import date from tqdm import tqdm from app.core.database import DatabaseManager from app.core.observability import LogService from ._utils import RankLogMonitorUtils from ._mapper import RankLogMonitorMapper CONCURRENCY = 10 class RankLogMonitor: def __init__(self, pool: DatabaseManager, log_service: LogService): self.pool = pool self.log_service = log_service self.mapper = RankLogMonitorMapper(self.pool, self.log_service) self.tool = RankLogMonitorUtils() async def save_single_account(self, gh_id: str, rank_date: str) -> int: articles = await self.mapper.fetch_articles(gh_id=gh_id) records = [] for article in articles: score_detail = self.tool.parse_score_map( article.get("score_map"), content_pool_type=article.get("content_pool_type"), ) record = { "account_name": article.get("account_name"), "gh_id": article.get("gh_id"), "source_id": article.get("source_id"), "content_pool_type": article.get("content_pool_type"), "strategy": article.get("strategy"), "title": article.get("title"), "score": article.get("score"), "category": article.get("category"), "source_log": article.get("score_map"), **score_detail, } records.append(record) if records: return await self.mapper.save_records(rank_date=rank_date, records=records) return 0 async def deal(self): rank_date = date.today().isoformat() accounts = await self.mapper.fetch_accounts() semaphore = asyncio.Semaphore(CONCURRENCY) pbar = tqdm(total=len(accounts)) async def _worker(gh_id: str) -> int: async with semaphore: result = await self.save_single_account(gh_id, rank_date) pbar.update(1) return result results = await asyncio.gather(*[_worker(acc) for acc in accounts]) pbar.close() return sum(results)