data_analysis.py 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. from tqdm.asyncio import tqdm
  2. from applications.utils import run_tasks_with_asyncio_task_group
  3. class DataAnalysis:
  4. def __init__(self, pool):
  5. self.pool = pool
  6. async def get_articles(self):
  7. query = """
  8. select id, title
  9. from publish_account_category_detail
  10. where category is null;
  11. """
  12. return await self.pool.async_fetch(query=query)
  13. async def get_title_category(self, title: str):
  14. query = f"""
  15. select category from article_category
  16. where title = %s and status = 2 and version = 2
  17. order by id desc limit 1;
  18. """
  19. return await self.pool.async_fetch(query=query, params=(title,))
  20. async def set_each_record(self, article):
  21. id_ = article['id']
  22. title = article['title']
  23. result = await self.get_title_category(title)
  24. if not result:
  25. category = "empty"
  26. else:
  27. category = result[0]['category']
  28. update_query = """
  29. update publish_account_category_detail
  30. set category = %s
  31. where id = %s;
  32. """
  33. await self.pool.async_save(query=update_query, params=(category, id_))
  34. async def deal(self):
  35. articles = await self.get_articles()
  36. return await run_tasks_with_asyncio_task_group(
  37. task_list=articles,
  38. handler=self.set_each_record,
  39. max_concurrency=2,
  40. fail_fast=False,
  41. description="更新文章分类",
  42. unit="id",
  43. )
  44. for article in tqdm(articles):
  45. await self.set_each_record(article)