| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- import json
- import time
- from applications.crawler.wechat import get_gzh_stat_daily
- from applications.crawler.wechat import get_access_token
- class ArticleDetailStatConst:
- # Task Status
- INIT_STATUS = 0
- PROCESSING_STATUS = 1
- SUCCESS_STATUS = 2
- FAILED_STATUS = 99
- # Account Status
- ACCOUNT_VALID_STATUS = 1
- ACCOUNT_INVALID_STATUS = 0
- # Cookie Status
- COOKIE_VALID_STATUS = 1
- COOKIE_INVALID_STATUS = 0
- class ArticleDetailStatMapper(ArticleDetailStatConst):
- def __init__(self, pool, log_client):
- self.pool = pool
- self.log_client = log_client
- # 获取账号信息
- async def fetch_monitor_accounts(self):
- query = """
- SELECT gh_id, account_name, app_id, app_secret
- FROM gzh_account_info WHERE status = %s;
- """
- return await self.pool.async_fetch(query=query, params=(self.ACCOUNT_VALID_STATUS, ))
- # 更新 access_token
- async def set_access_token_for_each_account(self, gh_id, access_token, expire_timestamp):
- query = """
- UPDATE gzh_cookie_info
- SET access_token = %s, access_token_status = %s, expire_timestamp = %s
- WHERE gh_id = %s;
- """
- return await self.pool.async_save(
- query=query, params=(access_token, self.COOKIE_VALID_STATUS, expire_timestamp, gh_id)
- )
- # 从数据库获取 access_token
- async def get_access_token_from_database(self, gh_id):
- query = """
- SELECT access_token, expire_timestamp FROM gzh_cookie_info where gh_id = %s;
- """
- return await self.pool.async_fetch(query=query, params=(gh_id, ))
- class ArticleDetailStat(ArticleDetailStatMapper):
- def __init__(self, pool, log_client):
- super().__init__(pool, log_client)
- # 处理单个账号
- async def process_single_account(self, account: dict):
- gh_id = account["gh_id"]
- token_info = await self.get_access_token_from_database(gh_id)
- if not token_info:
- return
- expire_timestamp = token_info[0]["expire_timestamp"] or 0
- if int(time.time()) >= expire_timestamp:
- print(f"{account['account_name']} access_token expired")
- new_token_info = await get_access_token(account["app_id"], account["app_secret"])
- print(json.dumps(new_token_info, indent=4))
- # 入口函数
- async def deal(self):
- accounts = await self.fetch_monitor_accounts()
- for account in accounts[:1]:
- await self.process_single_account(account)
|