| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162 |
- import asyncio
- from datetime import date
- from tqdm import tqdm
- from app.core.database import DatabaseManager
- from app.core.observability import LogService
- from ._utils import RankLogMonitorUtils
- from ._mapper import RankLogMonitorMapper
- CONCURRENCY = 10
- class RankLogMonitor:
- def __init__(self, pool: DatabaseManager, log_service: LogService):
- self.pool = pool
- self.log_service = log_service
- self.mapper = RankLogMonitorMapper(self.pool, self.log_service)
- self.tool = RankLogMonitorUtils()
- async def save_single_account(self, gh_id: str, rank_date: str) -> int:
- articles = await self.mapper.fetch_articles(gh_id=gh_id)
- records = []
- for article in articles:
- score_detail = self.tool.parse_score_map(
- article.get("score_map"),
- content_pool_type=article.get("content_pool_type"),
- )
- record = {
- "account_name": article.get("account_name"),
- "gh_id": article.get("gh_id"),
- "source_id": article.get("source_id"),
- "content_pool_type": article.get("content_pool_type"),
- "strategy": article.get("strategy"),
- "title": article.get("title"),
- "score": article.get("score"),
- "category": article.get("category"),
- "source_log": article.get("score_map"),
- **score_detail,
- }
- records.append(record)
- if records:
- return await self.mapper.save_records(rank_date=rank_date, records=records)
- return 0
- async def deal(self):
- rank_date = date.today().isoformat()
- accounts = await self.mapper.fetch_accounts()
- semaphore = asyncio.Semaphore(CONCURRENCY)
- pbar = tqdm(total=len(accounts))
- async def _worker(gh_id: str) -> int:
- async with semaphore:
- result = await self.save_single_account(gh_id, rank_date)
- pbar.update(1)
- return result
- results = await asyncio.gather(*[_worker(acc) for acc in accounts])
- pbar.close()
- return sum(results)
|