article_detail_stat.py 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. import json
  2. import time
  3. from applications.crawler.wechat import get_gzh_stat_daily
  4. from applications.crawler.wechat import get_access_token
  5. class ArticleDetailStatConst:
  6. # Task Status
  7. INIT_STATUS = 0
  8. PROCESSING_STATUS = 1
  9. SUCCESS_STATUS = 2
  10. FAILED_STATUS = 99
  11. # Account Status
  12. ACCOUNT_VALID_STATUS = 1
  13. ACCOUNT_INVALID_STATUS = 0
  14. # Cookie Status
  15. COOKIE_VALID_STATUS = 1
  16. COOKIE_INVALID_STATUS = 0
  17. class ArticleDetailStatMapper(ArticleDetailStatConst):
  18. def __init__(self, pool, log_client):
  19. self.pool = pool
  20. self.log_client = log_client
  21. # 获取账号信息
  22. async def fetch_monitor_accounts(self):
  23. query = """
  24. SELECT gh_id, account_name, app_id, app_secret
  25. FROM gzh_account_info WHERE status = %s;
  26. """
  27. return await self.pool.async_fetch(query=query, params=(self.ACCOUNT_VALID_STATUS, ))
  28. # 更新 access_token
  29. async def set_access_token_for_each_account(self, gh_id, access_token, expire_timestamp):
  30. query = """
  31. UPDATE gzh_cookie_info
  32. SET access_token = %s, access_token_status = %s, expire_timestamp = %s
  33. WHERE gh_id = %s;
  34. """
  35. return await self.pool.async_save(
  36. query=query, params=(access_token, self.COOKIE_VALID_STATUS, expire_timestamp, gh_id)
  37. )
  38. # 从数据库获取 access_token
  39. async def get_access_token_from_database(self, gh_id):
  40. query = """
  41. SELECT access_token, expire_timestamp FROM gzh_cookie_info where gh_id = %s;
  42. """
  43. return await self.pool.async_fetch(query=query, params=(gh_id, ))
  44. class ArticleDetailStat(ArticleDetailStatMapper):
  45. def __init__(self, pool, log_client):
  46. super().__init__(pool, log_client)
  47. # 处理单个账号
  48. async def process_single_account(self, account: dict):
  49. gh_id = account["gh_id"]
  50. token_info = await self.get_access_token_from_database(gh_id)
  51. if not token_info:
  52. return
  53. expire_timestamp = token_info[0]["expire_timestamp"] or 0
  54. if int(time.time()) >= expire_timestamp:
  55. print(f"{account['account_name']} access_token expired")
  56. new_token_info = await get_access_token(account["app_id"], account["app_secret"])
  57. print(json.dumps(new_token_info, indent=4))
  58. # 入口函数
  59. async def deal(self):
  60. accounts = await self.fetch_monitor_accounts()
  61. for account in accounts[:1]:
  62. await self.process_single_account(account)