import json import time from applications.crawler.wechat import get_gzh_stat_daily from applications.crawler.wechat import get_access_token class ArticleDetailStatConst: # Task Status INIT_STATUS = 0 PROCESSING_STATUS = 1 SUCCESS_STATUS = 2 FAILED_STATUS = 99 # Account Status ACCOUNT_VALID_STATUS = 1 ACCOUNT_INVALID_STATUS = 0 # Cookie Status COOKIE_VALID_STATUS = 1 COOKIE_INVALID_STATUS = 0 class ArticleDetailStatMapper(ArticleDetailStatConst): def __init__(self, pool, log_client): self.pool = pool self.log_client = log_client # 获取账号信息 async def fetch_monitor_accounts(self): query = """ SELECT gh_id, account_name, app_id, app_secret FROM gzh_account_info WHERE status = %s; """ return await self.pool.async_fetch(query=query, params=(self.ACCOUNT_VALID_STATUS, )) # 更新 access_token async def set_access_token_for_each_account(self, gh_id, access_token, expire_timestamp): query = """ UPDATE gzh_cookie_info SET access_token = %s, access_token_status = %s, expire_timestamp = %s WHERE gh_id = %s; """ return await self.pool.async_save( query=query, params=(access_token, self.COOKIE_VALID_STATUS, expire_timestamp, gh_id) ) # 从数据库获取 access_token async def get_access_token_from_database(self, gh_id): query = """ SELECT access_token, expire_timestamp FROM gzh_cookie_info where gh_id = %s; """ return await self.pool.async_fetch(query=query, params=(gh_id, )) class ArticleDetailStat(ArticleDetailStatMapper): def __init__(self, pool, log_client): super().__init__(pool, log_client) # 处理单个账号 async def process_single_account(self, account: dict): gh_id = account["gh_id"] token_info = await self.get_access_token_from_database(gh_id) if not token_info: return expire_timestamp = token_info[0]["expire_timestamp"] or 0 if int(time.time()) >= expire_timestamp: print(f"{account['account_name']} access_token expired") new_token_info = await get_access_token(account["app_id"], account["app_secret"]) print(json.dumps(new_token_info, indent=4)) # 入口函数 async def deal(self): accounts = await self.fetch_monitor_accounts() for account in accounts[:1]: await self.process_single_account(account)