cooperate_account_analysis.py 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. from tqdm.asyncio import tqdm
  2. import json
  3. from applications.crawler.wechat import get_article_list_from_account
  4. class CooperateAccountAnalysisTask:
  5. def __init__(self, pool, log_client):
  6. self.pool = pool
  7. self.log_client = log_client
  8. async def insert_account(self, account: dict):
  9. insert_query = """
  10. INSERT IGNORE INTO cooperate_accounts_temp
  11. (partner_name, partner_id, gh_id, account_name)
  12. VALUES (%s, %s, %s, %s);
  13. """
  14. await self.pool.async_save(
  15. query=insert_query,
  16. params=(
  17. account.get('partner_name'),
  18. account.get('partner_id'),
  19. account.get('gh_id'),
  20. account.get('account_name')
  21. )
  22. )
  23. async def update_each_account(self, gh_id: str, response: dict):
  24. if response['code'] == 25013:
  25. status = 0
  26. result = response['msg']
  27. else:
  28. status = 1
  29. result = json.dumps(response['data'], ensure_ascii=False)
  30. update_query = """
  31. UPDATE cooperate_accounts_temp
  32. SET status = %s, recent_articles= %s
  33. WHERE gh_id = %s;
  34. """
  35. await self.pool.async_save(
  36. query=update_query,
  37. params=(
  38. status,
  39. json.dumps(result, ensure_ascii=False),
  40. gh_id
  41. )
  42. )
  43. async def get_account_list(self):
  44. select_query = """
  45. SELECT gh_id FROM cooperate_accounts_temp
  46. WHERE status = %s and id > 11;
  47. """
  48. return await self.pool.async_fetch(
  49. query=select_query,
  50. params=(1, )
  51. )
  52. async def init_account_list(self, account_list):
  53. for account in tqdm(account_list, desc="Dealing Accounts"):
  54. await self.insert_account(account)
  55. async def deal(self):
  56. account_list = await self.get_account_list()
  57. for account in tqdm(account_list, desc="Dealing Accounts"):
  58. gh_id = account['gh_id']
  59. response = await get_article_list_from_account(gh_id)
  60. await self.update_each_account(gh_id, response)