recycle_daily_publish_articles.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672
  1. import asyncio
  2. import json
  3. import time
  4. import datetime
  5. import urllib.parse
  6. import traceback
  7. from tqdm.asyncio import tqdm
  8. from app.infra.external import feishu_robot
  9. from app.infra.crawler.wechat import get_article_list_from_account
  10. from app.infra.crawler.wechat import get_article_detail
  11. from app.core.pipeline import insert_article_into_recycle_pool
  12. from app.infra.shared.tools import str_to_md5
  13. class Const:
  14. # 订阅号
  15. SUBSCRIBE_TYPE_SET = {0, 1}
  16. NEW_ACCOUNT_CRAWL_PERIOD = 60 * 60 * 24 * 30
  17. FORBIDDEN_GH_IDS = [
  18. "gh_4c058673c07e",
  19. "gh_de9f9ebc976b",
  20. "gh_7b4a5f86d68c",
  21. "gh_f902cea89e48",
  22. "gh_789a40fe7935",
  23. "gh_cd041ed721e6",
  24. "gh_62d7f423f382",
  25. "gh_043223059726",
  26. "gh_6cfd1132df94",
  27. "gh_7f5075624a50",
  28. "gh_d4dffc34ac39",
  29. "gh_c69776baf2cd",
  30. "gh_9877c8541764",
  31. "gh_ac43e43b253b",
  32. "gh_93e00e187787",
  33. "gh_080bb43aa0dc",
  34. "gh_b1c71a0e7a85",
  35. "gh_d5f935d0d1f2",
  36. "gh_6b7c2a257263",
  37. "gh_bfe5b705324a",
  38. "gh_7e5818b2dd83",
  39. "gh_a2901d34f75b",
  40. "gh_5ae65db96cb7",
  41. "gh_72bace6b3059",
  42. "gh_dd4c857bbb36",
  43. "gh_ff487cb5dab3",
  44. "gh_ac43eb24376d",
  45. "gh_b15de7c99912",
  46. "gh_56ca3dae948c",
  47. "gh_341e08f852f5",
  48. "gh_0c89e11f8bf3",
  49. "gh_9eef14ad6c16",
  50. "gh_c5cdf60d9ab4"
  51. ]
  52. # NOT USED SERVER ACCOUNT
  53. NOT_USED_SERVER_ACCOUNT = {"gh_84e744b16b3a", "gh_5855bed97938", "gh_61a72b720de3"}
  54. # 文章状态
  55. # 记录默认状态
  56. DEFAULT_STATUS = 0
  57. # 请求接口失败状态
  58. REQUEST_FAIL_STATUS = -1
  59. # 文章被删除状态
  60. DELETE_STATUS = -2
  61. # 未知原因无信息返回状态
  62. UNKNOWN_STATUS = -3
  63. # 文章违规状态
  64. ILLEGAL_STATUS = -4
  65. ARTICLE_ILLEGAL_CODE = 25012
  66. ARTICLE_DELETE_CODE = 25005
  67. ARTICLE_SUCCESS_CODE = 0
  68. ARTICLE_UNKNOWN_CODE = 10000
  69. ACCOUNT_FORBIDDEN_CODE = 25013
  70. CRAWL_CRASH_CODE = 20000
  71. STAT_PERIOD = 3 * 24 * 3600
  72. INIT_STATUS = 0
  73. PROCESSING_STATUS = 1
  74. SUCCESS_STATUS = 2
  75. FAILED_STATUS = 99
  76. class RecycleDailyPublishArticlesTask(Const):
  77. def __init__(self, pool, log_client, date_string):
  78. self.pool = pool
  79. self.log_client = log_client
  80. self.date_string = date_string
  81. async def get_publish_accounts(self):
  82. """
  83. get all publish accounts
  84. """
  85. query = f"""
  86. select distinct t3.name, t3.gh_id, t3.follower_count, t3.create_timestamp as account_init_timestamp,
  87. t4.service_type_info as account_type, t4.verify_type_info as account_auth, t3.id as account_id,
  88. group_concat(distinct t5.remark) as account_remark
  89. from
  90. publish_plan t1
  91. join publish_plan_account t2 on t1.id = t2.plan_id
  92. join publish_account t3 on t2.account_id = t3.id
  93. left join publish_account_wx_type t4 on t3.id = t4.account_id
  94. left join publish_account_remark t5 on t3.id = t5.publish_account_id
  95. where t1.plan_status = 1 and t1.content_modal = 3 and t3.channel = 5
  96. group by t3.id;
  97. """
  98. account_list = await self.pool.async_fetch(query, db_name="aigc")
  99. return [i for i in account_list if "自动回复" not in str(i["account_remark"])]
  100. async def get_account_status(self):
  101. """get account experiment status"""
  102. sql = f"""
  103. select t1.account_id, t2.status
  104. from wx_statistics_group_source_account t1
  105. join wx_statistics_group_source t2 on t1.group_source_name = t2.account_source_name;
  106. """
  107. account_status_list = await self.pool.async_fetch(sql, db_name="aigc")
  108. account_status_dict = {
  109. account["account_id"]: account["status"] for account in account_status_list
  110. }
  111. return account_status_dict
  112. async def recycle_single_account(self, account):
  113. """recycle single account"""
  114. query = """
  115. select max(publish_timestamp) as publish_timestamp from official_articles_v2 where ghId = %s;
  116. """
  117. response = await self.pool.async_fetch(
  118. query, params=(account["gh_id"],), db_name="piaoquan_crawler"
  119. )
  120. if response:
  121. max_publish_timestamp = response[0]["publish_timestamp"]
  122. else:
  123. max_publish_timestamp = int(time.time()) - self.NEW_ACCOUNT_CRAWL_PERIOD
  124. cursor = None
  125. while True:
  126. response = await get_article_list_from_account(
  127. account_id=account["gh_id"], index=cursor, is_cache=False
  128. )
  129. if not response:
  130. return
  131. response_code = response["code"]
  132. match response_code:
  133. case self.ACCOUNT_FORBIDDEN_CODE:
  134. await feishu_robot.bot(
  135. title="发布账号封禁",
  136. detail={
  137. "账号名称": account["name"],
  138. "账号id": account["gh_id"],
  139. },
  140. )
  141. return
  142. case self.ARTICLE_SUCCESS_CODE:
  143. msg_list = response.get("data", {}).get("data", [])
  144. if not msg_list:
  145. return
  146. await insert_article_into_recycle_pool(
  147. self.pool, self.log_client, msg_list, account
  148. )
  149. # check last article
  150. last_article = msg_list[-1]
  151. last_publish_timestamp = last_article["AppMsg"]["BaseInfo"][
  152. "UpdateTime"
  153. ]
  154. if last_publish_timestamp <= max_publish_timestamp:
  155. return
  156. cursor = response["data"].get("next_cursor")
  157. if not cursor:
  158. return
  159. case self.CRAWL_CRASH_CODE:
  160. await self.log_client.log(
  161. contents={
  162. "task": "recycle_daily_publish_articles",
  163. "data": {
  164. "gh_id": account["gh_id"],
  165. },
  166. "message": "爬虫挂掉",
  167. "status": "fail",
  168. }
  169. )
  170. case _:
  171. return
  172. async def get_task_list(self):
  173. """recycle all publish accounts articles"""
  174. binding_accounts = await self.get_publish_accounts()
  175. # 过滤封禁账号
  176. binding_accounts = [
  177. i for i in binding_accounts if i["gh_id"] not in self.FORBIDDEN_GH_IDS
  178. ]
  179. account_status = await self.get_account_status()
  180. # account_list = [
  181. # {
  182. # **item,
  183. # "using_status": (
  184. # 0 if account_status.get(item["account_id"]) == "实验" else 1
  185. # ),
  186. # }
  187. # for item in binding_accounts
  188. # ]
  189. account_list = [{**item, "using_status": 1} for item in binding_accounts]
  190. # 订阅号
  191. subscription_accounts = [
  192. i for i in account_list if i["account_type"] in self.SUBSCRIBE_TYPE_SET
  193. ]
  194. return subscription_accounts
  195. async def deal(self):
  196. subscription_accounts = await self.get_task_list()
  197. for account in tqdm(subscription_accounts, desc="recycle each account"):
  198. try:
  199. await self.recycle_single_account(account)
  200. except Exception as e:
  201. print(
  202. f"{account['name']}\t{account['gh_id']}: recycle account error:", e
  203. )
  204. class CheckDailyPublishArticlesTask(RecycleDailyPublishArticlesTask):
  205. async def check_account(self, account: dict, date_string: str) -> bool:
  206. """check account data"""
  207. query = """
  208. select accountName, count(1) as publish_count
  209. from official_articles_v2 where ghId = %s and from_unixtime(publish_timestamp) > %s;
  210. """
  211. response = await self.pool.async_fetch(
  212. query=query,
  213. db_name="piaoquan_crawler",
  214. params=(account["gh_id"], date_string),
  215. )
  216. if response:
  217. today_publish_count = response[0]["publish_count"]
  218. return today_publish_count > 0
  219. else:
  220. return False
  221. async def deal(self):
  222. task_list = await self.get_task_list()
  223. for task in tqdm(task_list, desc="check each account step1: "):
  224. if await self.check_account(task, self.date_string):
  225. continue
  226. else:
  227. await self.recycle_single_account(task)
  228. # check again
  229. fail_list = []
  230. for second_task in tqdm(task_list, desc="check each account step2: "):
  231. if await self.check_account(second_task, self.date_string):
  232. continue
  233. else:
  234. second_task.pop("account_type", None)
  235. second_task.pop("account_auth", None)
  236. second_task.pop("account_id", None)
  237. second_task.pop("account_remark", None)
  238. fail_list.append(second_task)
  239. if fail_list:
  240. now = datetime.datetime.now()
  241. if now.hour < 20:
  242. return
  243. columns = [
  244. feishu_robot.create_feishu_columns_sheet(
  245. sheet_type="plain_text",
  246. sheet_name="name",
  247. display_name="公众号名称",
  248. ),
  249. feishu_robot.create_feishu_columns_sheet(
  250. sheet_type="plain_text", sheet_name="gh_id", display_name="gh_id"
  251. ),
  252. feishu_robot.create_feishu_columns_sheet(
  253. sheet_type="number",
  254. sheet_name="follower_count",
  255. display_name="粉丝数",
  256. ),
  257. feishu_robot.create_feishu_columns_sheet(
  258. sheet_type="date",
  259. sheet_name="account_init_timestamp",
  260. display_name="账号接入系统时间",
  261. ),
  262. feishu_robot.create_feishu_columns_sheet(
  263. sheet_type="plain_text",
  264. sheet_name="using_status",
  265. display_name="利用状态",
  266. ),
  267. ]
  268. await feishu_robot.bot(
  269. title=f"{self.date_string} 发布文章,存在未更新的账号",
  270. detail={"columns": columns, "rows": fail_list},
  271. table=True,
  272. mention=False,
  273. )
  274. else:
  275. await feishu_robot.bot(
  276. title=f"{self.date_string} 发布文章,所有文章更新成功",
  277. detail={
  278. "date_string": self.date_string,
  279. "finish_time": datetime.datetime.now().__str__(),
  280. },
  281. mention=False,
  282. )
  283. class UpdateRootSourceIdAndUpdateTimeTask(Const):
  284. """
  285. update publish_timestamp && root_source_id
  286. """
  287. def __init__(self, pool, log_client):
  288. self.pool = pool
  289. self.log_client = log_client
  290. async def get_article_list(self) -> list[dict]:
  291. query = """select ContentUrl, wx_sn from official_articles_v2 where publish_timestamp in %s;"""
  292. article_list = await self.pool.async_fetch(
  293. query=query, db_name="piaoquan_crawler", params=(tuple([0, -1, -3]),)
  294. )
  295. return article_list
  296. async def check_each_article(self, article: dict):
  297. url = article["ContentUrl"]
  298. wx_sn = article["wx_sn"].decode("utf-8")
  299. try:
  300. response = await get_article_detail(url)
  301. response_code = response["code"]
  302. if response_code == self.ARTICLE_DELETE_CODE:
  303. publish_timestamp_s = self.DELETE_STATUS
  304. root_source_id_list = []
  305. elif response_code == self.ARTICLE_ILLEGAL_CODE:
  306. publish_timestamp_s = self.ILLEGAL_STATUS
  307. root_source_id_list = []
  308. elif response_code == self.ARTICLE_SUCCESS_CODE:
  309. data = response["data"]["data"]
  310. publish_timestamp_ms = data["publish_timestamp"]
  311. publish_timestamp_s = int(publish_timestamp_ms / 1000)
  312. mini_program = data.get("mini_program", [])
  313. if mini_program:
  314. root_source_id_list = [
  315. urllib.parse.parse_qs(urllib.parse.unquote(i["path"])).get(
  316. "rootSourceId", [""]
  317. )[0]
  318. for i in mini_program
  319. ]
  320. else:
  321. root_source_id_list = []
  322. else:
  323. publish_timestamp_s = self.UNKNOWN_STATUS
  324. root_source_id_list = []
  325. except Exception as e:
  326. publish_timestamp_s = self.REQUEST_FAIL_STATUS
  327. root_source_id_list = None
  328. error_msg = traceback.format_exc()
  329. await self.log_client.log(
  330. contents={
  331. "task": "get_official_article_detail",
  332. "data": {
  333. "url": url,
  334. "wx_sn": wx_sn,
  335. "error_msg": error_msg,
  336. "error": str(e),
  337. },
  338. "function": "check_each_article",
  339. "status": "fail",
  340. }
  341. )
  342. query = """
  343. update official_articles_v2 set publish_timestamp = %s, root_source_id_list = %s
  344. where wx_sn = %s;
  345. """
  346. await self.pool.async_save(
  347. query=query,
  348. db_name="piaoquan_crawler",
  349. params=(
  350. publish_timestamp_s,
  351. json.dumps(root_source_id_list, ensure_ascii=False),
  352. wx_sn,
  353. ),
  354. )
  355. if publish_timestamp_s == self.REQUEST_FAIL_STATUS:
  356. article["wx_sn"] = wx_sn
  357. return article
  358. else:
  359. return None
  360. async def fallback_mechanism(self):
  361. # 通过msgId 来修改publish_timestamp
  362. update_sql = f"""
  363. update official_articles_v2 oav
  364. join (
  365. select ghId, appMsgId, max(publish_timestamp) as publish_timestamp
  366. from official_articles_v2
  367. where publish_timestamp > %s
  368. group by ghId, appMsgId
  369. ) vv
  370. on oav.appMsgId = vv.appMsgId and oav.ghId = vv.ghId
  371. set oav.publish_timestamp = vv.publish_timestamp
  372. where oav.publish_timestamp <= %s;
  373. """
  374. affected_rows_1 = await self.pool.async_save(
  375. query=update_sql, params=(0, 0), db_name="piaoquan_crawler"
  376. )
  377. # 若还是无 publish_timestamp,用update_time当作 publish_timestamp
  378. update_sql_2 = f"""
  379. update official_articles_v2
  380. set publish_timestamp = updateTime
  381. where publish_timestamp < %s;
  382. """
  383. affected_rows_2 = await self.pool.async_save(
  384. query=update_sql_2, params=(0,), db_name="piaoquan_crawler"
  385. )
  386. if affected_rows_1 or affected_rows_2:
  387. await feishu_robot.bot(
  388. title="执行兜底修改发布时间戳",
  389. detail={
  390. "通过msgId修改": affected_rows_1,
  391. "通过create_timestamp修改": affected_rows_2,
  392. },
  393. mention=False,
  394. )
  395. async def deal(self):
  396. task_list = await self.get_article_list()
  397. for task in tqdm(task_list, desc="get article detail step1: "):
  398. try:
  399. await self.check_each_article(task)
  400. except Exception as e:
  401. try:
  402. await self.log_client.log(
  403. contents={
  404. "task": "get_official_article_detail_step1",
  405. "data": {
  406. "detail": {
  407. "url": task["ContentUrl"],
  408. "wx_sn": task["wx_sn"].decode("utf-8"),
  409. },
  410. "error_msg": traceback.format_exc(),
  411. "error": str(e),
  412. },
  413. "function": "check_each_article",
  414. "status": "fail",
  415. }
  416. )
  417. except Exception as e:
  418. print(e)
  419. print(traceback.format_exc())
  420. # process_failed_task_reproduce
  421. fail_tasks = await self.get_article_list()
  422. fail_list = []
  423. for fail_task in tqdm(fail_tasks, desc="get article detail step2: "):
  424. try:
  425. res = await self.check_each_article(fail_task)
  426. if res:
  427. fail_list.append(res)
  428. except Exception as e:
  429. await self.log_client.log(
  430. contents={
  431. "task": "get_official_article_detail_step2",
  432. "data": {
  433. "detail": {
  434. "url": fail_task["ContentUrl"],
  435. "wx_sn": fail_task["wx_sn"].decode("utf-8"),
  436. },
  437. "error_msg": traceback.format_exc(),
  438. "error": str(e),
  439. },
  440. "function": "check_each_article",
  441. "status": "fail",
  442. }
  443. )
  444. if fail_list:
  445. await feishu_robot.bot(title="更新文章,获取detail失败", detail=fail_list)
  446. current_hour = datetime.datetime.now().hour
  447. if current_hour >= 21:
  448. await self.fallback_mechanism()
  449. class RecycleFwhDailyPublishArticlesTask(Const):
  450. def __init__(self, pool, log_client):
  451. self.pool = pool
  452. self.log_client = log_client
  453. @staticmethod
  454. async def illegal_article_bot(
  455. account_name: str,
  456. gh_id: str,
  457. group_id: str,
  458. illegal_msg: str,
  459. publish_date: str,
  460. article_title: str,
  461. ):
  462. await feishu_robot.bot(
  463. title="服务号文章违规告警,请前往微信公众平台处理",
  464. detail={
  465. "account_name": account_name,
  466. "gh_id": gh_id,
  467. "group_id": group_id,
  468. "illegal_msg": illegal_msg,
  469. "publish_date": str(publish_date),
  470. "article_title": article_title,
  471. },
  472. env="server_account_publish_monitor",
  473. )
  474. async def save_data_to_database(self, article):
  475. """
  476. save data to db
  477. """
  478. insert_query = f"""
  479. insert into official_articles_v2
  480. (ghId, accountName, appMsgId, title, Type, createTime, updateTime, ItemIndex, ContentUrl, show_view_count,
  481. wx_sn, title_md5, article_group, channel_content_id, root_source_id_list, publish_timestamp)
  482. values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  483. """
  484. return await self.pool.async_save(
  485. query=insert_query, db_name="piaoquan_crawler", params=article
  486. )
  487. async def update_article_read_cnt(self, wx_sn, new_read_cnt):
  488. if new_read_cnt <= 0:
  489. return 0
  490. update_query = """
  491. update official_articles_v2
  492. set show_view_count = %s
  493. where wx_sn = %s;
  494. """
  495. return await self.pool.async_save(
  496. query=update_query, db_name="piaoquan_crawler", params=(new_read_cnt, wx_sn)
  497. )
  498. async def get_group_server_accounts(self):
  499. fetch_query = "select gzh_id from article_gzh_developer;"
  500. fetch_response = await self.pool.async_fetch(
  501. query=fetch_query, db_name="piaoquan_crawler"
  502. )
  503. gh_id_list = [
  504. i["gzh_id"]
  505. for i in fetch_response
  506. if i["gzh_id"] not in self.NOT_USED_SERVER_ACCOUNT
  507. ]
  508. return gh_id_list
  509. async def get_stat_published_articles(self, gh_id):
  510. earliest_timestamp = int(time.time()) - self.STAT_PERIOD
  511. fetch_query = """
  512. select publish_date, account_name, gh_id, user_group_id, url, publish_timestamp, content_id
  513. from long_articles_group_send_result
  514. where gh_id = %s and recycle_status = %s and create_time > %s;
  515. """
  516. earliest_time = datetime.datetime.fromtimestamp(earliest_timestamp).strftime(
  517. "%Y-%m-%d %H:%M:%S"
  518. )
  519. return await self.pool.async_fetch(
  520. query=fetch_query,
  521. params=(gh_id, self.SUCCESS_STATUS, earliest_time),
  522. )
  523. async def process_each_account_data(self, account_published_article_list):
  524. if not account_published_article_list:
  525. return
  526. for article in account_published_article_list:
  527. account_name = article["account_name"]
  528. gh_id = article["gh_id"]
  529. user_group_id = article["user_group_id"]
  530. url = article["url"]
  531. publish_date = article["publish_date"]
  532. content_id = article["content_id"]
  533. # get article detail info with spider
  534. try:
  535. article_detail_info = await get_article_detail(
  536. url, is_count=True, is_cache=False
  537. )
  538. response_code = article_detail_info["code"]
  539. if response_code == self.ARTICLE_ILLEGAL_CODE:
  540. query = """
  541. SELECT article_title FROM long_articles_text WHERE content_id = %s;
  542. """
  543. article_title = await self.pool.async_fetch(
  544. query=query,
  545. params=(content_id,),
  546. )
  547. if article_title:
  548. article_title = article_title[0]["article_title"]
  549. else:
  550. article_title = content_id
  551. await self.illegal_article_bot(
  552. account_name=account_name,
  553. gh_id=gh_id,
  554. group_id=user_group_id,
  555. illegal_msg=article_detail_info["msg"],
  556. publish_date=publish_date,
  557. article_title=article_title,
  558. )
  559. await asyncio.sleep(3)
  560. content_url = article_detail_info["data"]["data"]["content_link"]
  561. app_msg_id = content_url.split("mid=")[-1].split("&")[0]
  562. wx_sn = content_url.split("sn=")[-1]
  563. publish_timestamp = int(
  564. article_detail_info["data"]["data"]["publish_timestamp"] / 1000
  565. )
  566. create_time = publish_timestamp
  567. update_time = publish_timestamp
  568. item_index = article_detail_info["data"]["data"]["item_index"]
  569. show_view_count = article_detail_info["data"]["data"]["view_count"]
  570. title = article_detail_info["data"]["data"]["title"]
  571. title_md5 = str_to_md5(title)
  572. channel_content_id = article_detail_info["data"]["data"][
  573. "channel_content_id"
  574. ]
  575. mini_program_info = article_detail_info["data"]["data"]["mini_program"]
  576. root_source_id_list = [
  577. urllib.parse.parse_qs(urllib.parse.unquote(i["path"]))[
  578. "rootSourceId"
  579. ][0]
  580. for i in mini_program_info
  581. ]
  582. root_source_id_list = json.dumps(root_source_id_list)
  583. try:
  584. await self.save_data_to_database(
  585. article=(
  586. gh_id,
  587. account_name,
  588. app_msg_id,
  589. title,
  590. "9",
  591. create_time,
  592. update_time,
  593. item_index,
  594. url,
  595. show_view_count,
  596. wx_sn,
  597. title_md5,
  598. user_group_id,
  599. channel_content_id,
  600. root_source_id_list,
  601. publish_timestamp,
  602. )
  603. )
  604. except Exception as e:
  605. await self.update_article_read_cnt(wx_sn, show_view_count)
  606. except Exception as e:
  607. print(f"article {url} is not available, skip it")
  608. print(e)
  609. async def deal(self):
  610. account_id_list = await self.get_group_server_accounts()
  611. for account_id in account_id_list:
  612. publish_articles = tqdm(
  613. await self.get_stat_published_articles(account_id),
  614. desc=f"<crawling> {account_id}",
  615. )
  616. await self.process_each_account_data(publish_articles)