123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 |
- import time
- from unittest import case
- from tqdm import tqdm
- from applications.api import feishu_robot
- from applications.crawler.wechat import get_article_list_from_account
- from applications.pipeline import insert_article_into_recycle_pool
- class Const:
- # 订阅号
- SUBSCRIBE_TYPE_SET = {0, 1}
- NEW_ACCOUNT_CRAWL_PERIOD = 60 * 60 * 24 * 30
- class RecycleDailyPublishArticlesTask(Const):
- def __init__(self, pool, log_client):
- self.pool = pool
- self.log_client = log_client
- async def get_publish_accounts(self):
- """
- get all publish accounts
- """
- query = f"""
- select distinct t3.name, t3.gh_id, t3.follower_count, t3.create_timestamp as account_init_timestamp,
- t4.service_type_info as account_type, t4.verify_type_info as account_auth, t3.id as account_id,
- group_concat(distinct t5.remark) as account_remark
- from
- publish_plan t1
- join publish_plan_account t2 on t1.id = t2.plan_id
- join publish_account t3 on t2.account_id = t3.id
- left join publish_account_wx_type t4 on t3.id = t4.account_id
- left join publish_account_remark t5 on t3.id = t5.publish_account_id
- where t1.plan_status = 1 and t1.content_modal = 3 and t3.channel = 5
- group by t3.id;
- """
- account_list = await self.pool.async_fetch(query, db_name='aigc')
- return [i for i in account_list if '自动回复' not in str(i['account_remark'])]
- async def get_account_status(self):
- """get account experiment status"""
- sql = f"""
- select t1.account_id, t2.status
- from wx_statistics_group_source_account t1
- join wx_statistics_group_source t2 on t1.group_source_name = t2.account_source_name;
- """
- account_status_list = await self.pool.async_fetch(sql, db_name='aigc')
- account_status_dict = {account['account_id']: account['status'] for account in account_status_list}
- return account_status_dict
- async def recycle_single_account(self, account):
- """recycle single account"""
- query = f"""
- select max(publish_timestamp) as publish_timestamp from official_articles_v2 where ghId = %s;
- """
- response = await self.pool.async_fetch(query, params=(account['gh_id'],), db_name='aigc')
- if response:
- max_publish_timestamp = response[0]['publish_timestamp']
- else:
- max_publish_timestamp = int(time.time()) - self.NEW_ACCOUNT_CRAWL_PERIOD
- cursor = None
- while True:
- response = get_article_list_from_account(account_id=account['account_id'], cursor=cursor)
- response_code = response['code']
- match response_code:
- case 25013:
- await feishu_robot.bot(
- title="发布账号封禁",
- detail={
- "账号名称": account['name'],
- "账号id": account['account_id']
- },
- )
- return
- case 0:
- msg_list = response.get("data", {}).get("data", [])
- if not msg_list:
- return
- await insert_article_into_recycle_pool(self.pool, self.log_client, msg_list, account)
- # check last article
- last_article = msg_list[-1]
- last_publish_timestamp = last_article['AppMsg']['BaseInfo']['UpdateTime']
- if last_publish_timestamp <= max_publish_timestamp:
- return
- cursor = response['data'].get('next_cursor')
- if not cursor:
- return
- case _:
- return
- async def deal(self):
- """recycle all publish accounts articles"""
- binding_accounts = await self.get_publish_accounts()
- account_status = await self.get_account_status()
- account_list = [
- {
- **item,
- 'using_status': 0 if account_status.get(item['account_id']) == '实验' else 1
- }
- for item in binding_accounts
- ]
- # 订阅号
- subscription_accounts = [i for i in account_list if i['account_type'] in self.SUBSCRIBE_TYPE_SET]
- for account in tqdm(subscription_accounts, desc="recycle each account"):
- try:
- await self.recycle_single_account(account)
- except Exception as e:
- print("recycle account error:", e)
|