recycle_daily_publish_articles.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. import time
  2. from unittest import case
  3. from tqdm import tqdm
  4. from applications.api import feishu_robot
  5. from applications.crawler.wechat import get_article_list_from_account
  6. from applications.pipeline import insert_article_into_recycle_pool
  7. class Const:
  8. # 订阅号
  9. SUBSCRIBE_TYPE_SET = {0, 1}
  10. NEW_ACCOUNT_CRAWL_PERIOD = 60 * 60 * 24 * 30
  11. class RecycleDailyPublishArticlesTask(Const):
  12. def __init__(self, pool, log_client):
  13. self.pool = pool
  14. self.log_client = log_client
  15. async def get_publish_accounts(self):
  16. """
  17. get all publish accounts
  18. """
  19. query = f"""
  20. select distinct t3.name, t3.gh_id, t3.follower_count, t3.create_timestamp as account_init_timestamp,
  21. t4.service_type_info as account_type, t4.verify_type_info as account_auth, t3.id as account_id,
  22. group_concat(distinct t5.remark) as account_remark
  23. from
  24. publish_plan t1
  25. join publish_plan_account t2 on t1.id = t2.plan_id
  26. join publish_account t3 on t2.account_id = t3.id
  27. left join publish_account_wx_type t4 on t3.id = t4.account_id
  28. left join publish_account_remark t5 on t3.id = t5.publish_account_id
  29. where t1.plan_status = 1 and t1.content_modal = 3 and t3.channel = 5
  30. group by t3.id;
  31. """
  32. account_list = await self.pool.async_fetch(query, db_name='aigc')
  33. return [i for i in account_list if '自动回复' not in str(i['account_remark'])]
  34. async def get_account_status(self):
  35. """get account experiment status"""
  36. sql = f"""
  37. select t1.account_id, t2.status
  38. from wx_statistics_group_source_account t1
  39. join wx_statistics_group_source t2 on t1.group_source_name = t2.account_source_name;
  40. """
  41. account_status_list = await self.pool.async_fetch(sql, db_name='aigc')
  42. account_status_dict = {account['account_id']: account['status'] for account in account_status_list}
  43. return account_status_dict
  44. async def recycle_single_account(self, account):
  45. """recycle single account"""
  46. query = f"""
  47. select max(publish_timestamp) as publish_timestamp from official_articles_v2 where ghId = %s;
  48. """
  49. response = await self.pool.async_fetch(query, params=(account['gh_id'],), db_name='aigc')
  50. if response:
  51. max_publish_timestamp = response[0]['publish_timestamp']
  52. else:
  53. max_publish_timestamp = int(time.time()) - self.NEW_ACCOUNT_CRAWL_PERIOD
  54. cursor = None
  55. while True:
  56. response = get_article_list_from_account(account_id=account['account_id'], cursor=cursor)
  57. response_code = response['code']
  58. match response_code:
  59. case 25013:
  60. await feishu_robot.bot(
  61. title="发布账号封禁",
  62. detail={
  63. "账号名称": account['name'],
  64. "账号id": account['account_id']
  65. },
  66. )
  67. return
  68. case 0:
  69. msg_list = response.get("data", {}).get("data", [])
  70. if not msg_list:
  71. return
  72. await insert_article_into_recycle_pool(self.pool, self.log_client, msg_list, account)
  73. # check last article
  74. last_article = msg_list[-1]
  75. last_publish_timestamp = last_article['AppMsg']['BaseInfo']['UpdateTime']
  76. if last_publish_timestamp <= max_publish_timestamp:
  77. return
  78. cursor = response['data'].get('next_cursor')
  79. if not cursor:
  80. return
  81. case _:
  82. return
  83. async def deal(self):
  84. """recycle all publish accounts articles"""
  85. binding_accounts = await self.get_publish_accounts()
  86. account_status = await self.get_account_status()
  87. account_list = [
  88. {
  89. **item,
  90. 'using_status': 0 if account_status.get(item['account_id']) == '实验' else 1
  91. }
  92. for item in binding_accounts
  93. ]
  94. # 订阅号
  95. subscription_accounts = [i for i in account_list if i['account_type'] in self.SUBSCRIBE_TYPE_SET]
  96. for account in tqdm(subscription_accounts, desc="recycle each account"):
  97. try:
  98. await self.recycle_single_account(account)
  99. except Exception as e:
  100. print("recycle account error:", e)