| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071 |
- from tqdm.asyncio import tqdm
- import json
- from applications.crawler.wechat import get_article_list_from_account
- class CooperateAccountAnalysisTask:
- def __init__(self, pool, log_client):
- self.pool = pool
- self.log_client = log_client
- async def insert_account(self, account: dict):
- insert_query = """
- INSERT IGNORE INTO cooperate_accounts_temp
- (partner_name, partner_id, gh_id, account_name)
- VALUES (%s, %s, %s, %s);
- """
- await self.pool.async_save(
- query=insert_query,
- params=(
- account.get('partner_name'),
- account.get('partner_id'),
- account.get('gh_id'),
- account.get('account_name')
- )
- )
- async def update_each_account(self, gh_id: str, response: dict):
- if response['code'] == 25013:
- status = 0
- result = response['msg']
- else:
- status = 1
- result = json.dumps(response['data'], ensure_ascii=False)
- update_query = """
- UPDATE cooperate_accounts_temp
- SET status = %s, recent_articles= %s
- WHERE gh_id = %s;
- """
- await self.pool.async_save(
- query=update_query,
- params=(
- status,
- json.dumps(result, ensure_ascii=False),
- gh_id
- )
- )
- async def get_account_list(self):
- select_query = """
- SELECT gh_id FROM cooperate_accounts_temp
- WHERE status = %s and id > 11;
- """
- return await self.pool.async_fetch(
- query=select_query,
- params=(1, )
- )
- async def init_account_list(self, account_list):
- for account in tqdm(account_list, desc="Dealing Accounts"):
- await self.insert_account(account)
- async def deal(self):
- account_list = await self.get_account_list()
- for account in tqdm(account_list, desc="Dealing Accounts"):
- gh_id = account['gh_id']
- response = await get_article_list_from_account(gh_id)
- await self.update_each_account(gh_id, response)
|