import time from unittest import case from tqdm import tqdm from applications.api import feishu_robot from applications.crawler.wechat import get_article_list_from_account from applications.pipeline import insert_article_into_recycle_pool class Const: # 订阅号 SUBSCRIBE_TYPE_SET = {0, 1} NEW_ACCOUNT_CRAWL_PERIOD = 60 * 60 * 24 * 30 class RecycleDailyPublishArticlesTask(Const): def __init__(self, pool, log_client): self.pool = pool self.log_client = log_client async def get_publish_accounts(self): """ get all publish accounts """ query = f""" select distinct t3.name, t3.gh_id, t3.follower_count, t3.create_timestamp as account_init_timestamp, t4.service_type_info as account_type, t4.verify_type_info as account_auth, t3.id as account_id, group_concat(distinct t5.remark) as account_remark from publish_plan t1 join publish_plan_account t2 on t1.id = t2.plan_id join publish_account t3 on t2.account_id = t3.id left join publish_account_wx_type t4 on t3.id = t4.account_id left join publish_account_remark t5 on t3.id = t5.publish_account_id where t1.plan_status = 1 and t1.content_modal = 3 and t3.channel = 5 group by t3.id; """ account_list = await self.pool.async_fetch(query, db_name='aigc') return [i for i in account_list if '自动回复' not in str(i['account_remark'])] async def get_account_status(self): """get account experiment status""" sql = f""" select t1.account_id, t2.status from wx_statistics_group_source_account t1 join wx_statistics_group_source t2 on t1.group_source_name = t2.account_source_name; """ account_status_list = await self.pool.async_fetch(sql, db_name='aigc') account_status_dict = {account['account_id']: account['status'] for account in account_status_list} return account_status_dict async def recycle_single_account(self, account): """recycle single account""" query = f""" select max(publish_timestamp) as publish_timestamp from official_articles_v2 where ghId = %s; """ response = await self.pool.async_fetch(query, params=(account['gh_id'],), db_name='aigc') if response: max_publish_timestamp = response[0]['publish_timestamp'] else: max_publish_timestamp = int(time.time()) - self.NEW_ACCOUNT_CRAWL_PERIOD cursor = None while True: response = get_article_list_from_account(account_id=account['account_id'], cursor=cursor) response_code = response['code'] match response_code: case 25013: await feishu_robot.bot( title="发布账号封禁", detail={ "账号名称": account['name'], "账号id": account['account_id'] }, ) return case 0: msg_list = response.get("data", {}).get("data", []) if not msg_list: return await insert_article_into_recycle_pool(self.pool, self.log_client, msg_list, account) # check last article last_article = msg_list[-1] last_publish_timestamp = last_article['AppMsg']['BaseInfo']['UpdateTime'] if last_publish_timestamp <= max_publish_timestamp: return cursor = response['data'].get('next_cursor') if not cursor: return case _: return async def deal(self): """recycle all publish accounts articles""" binding_accounts = await self.get_publish_accounts() account_status = await self.get_account_status() account_list = [ { **item, 'using_status': 0 if account_status.get(item['account_id']) == '实验' else 1 } for item in binding_accounts ] # 订阅号 subscription_accounts = [i for i in account_list if i['account_type'] in self.SUBSCRIBE_TYPE_SET] for account in tqdm(subscription_accounts, desc="recycle each account"): try: await self.recycle_single_account(account) except Exception as e: print("recycle account error:", e)