entrance.py 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. import asyncio
  2. from datetime import date
  3. from tqdm import tqdm
  4. from app.core.database import DatabaseManager
  5. from app.core.observability import LogService
  6. from ._utils import RankLogMonitorUtils
  7. from ._mapper import RankLogMonitorMapper
  8. CONCURRENCY = 10
  9. class RankLogMonitor:
  10. def __init__(self, pool: DatabaseManager, log_service: LogService):
  11. self.pool = pool
  12. self.log_service = log_service
  13. self.mapper = RankLogMonitorMapper(self.pool, self.log_service)
  14. self.tool = RankLogMonitorUtils()
  15. async def save_single_account(self, gh_id: str, rank_date: str) -> int:
  16. articles = await self.mapper.fetch_articles(gh_id=gh_id)
  17. records = []
  18. for article in articles:
  19. score_detail = self.tool.parse_score_map(
  20. article.get("score_map"),
  21. content_pool_type=article.get("content_pool_type"),
  22. )
  23. record = {
  24. "account_name": article.get("account_name"),
  25. "gh_id": article.get("gh_id"),
  26. "source_id": article.get("source_id"),
  27. "content_pool_type": article.get("content_pool_type"),
  28. "strategy": article.get("strategy"),
  29. "title": article.get("title"),
  30. "score": article.get("score"),
  31. "category": article.get("category"),
  32. "source_log": article.get("score_map"),
  33. **score_detail,
  34. }
  35. records.append(record)
  36. if records:
  37. return await self.mapper.save_records(rank_date=rank_date, records=records)
  38. return 0
  39. async def deal(self):
  40. rank_date = date.today().isoformat()
  41. accounts = await self.mapper.fetch_accounts()
  42. semaphore = asyncio.Semaphore(CONCURRENCY)
  43. pbar = tqdm(total=len(accounts))
  44. async def _worker(gh_id: str) -> int:
  45. async with semaphore:
  46. result = await self.save_single_account(gh_id, rank_date)
  47. pbar.update(1)
  48. return result
  49. results = await asyncio.gather(*[_worker(acc) for acc in accounts])
  50. pbar.close()
  51. return sum(results)