auto_reply_cards_monitor.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713
  1. import asyncio
  2. import os
  3. import json
  4. import time
  5. import traceback
  6. import uuid
  7. import xml.etree.ElementTree as ET
  8. from tqdm import tqdm
  9. from datetime import datetime, timedelta
  10. from urllib.parse import unquote, parse_qs, urlparse
  11. import requests
  12. from requests.exceptions import RequestException
  13. from applications.utils import upload_to_oss
  14. from applications.utils import fetch_from_odps
  15. from applications.utils import AsyncHttpClient
  16. from applications.crawler.wechat import get_article_list_from_account
  17. from applications.crawler.wechat import get_article_detail
  18. class AutoReplyCardsMonitorConst:
  19. # fetch_status
  20. FETCH_INIT_STATUS = 0
  21. FETCH_PROCESSING_STATUS = 1
  22. FETCH_SUCCESS_STATUS = 2
  23. FETCH_FAIL_STATUS = 3
  24. # task_status
  25. INIT_STATUS = 0
  26. PROCESSING_STATUS = 1
  27. SUCCESS_STATUS = 2
  28. FAIL_STATUS = 99
  29. # account_status
  30. VALID_STATUS = 1
  31. INVALID_STATUS = 0
  32. class AutoReplyCardsMonitorUtils(AutoReplyCardsMonitorConst):
  33. @staticmethod
  34. def generate_task_id(task_name, gh_id):
  35. match task_name:
  36. case "follow":
  37. return f"{task_name}_{gh_id}"
  38. case _:
  39. return f"{task_name}_{uuid.uuid4()}"
  40. @staticmethod
  41. def parse_fields(root, fields, default=""):
  42. result = {}
  43. for key, path in fields.items():
  44. elem = root.find(path)
  45. result[key] = elem.text if elem is not None and elem.text else default
  46. return result
  47. def extract_reply_cards(self, msg_type, root):
  48. fields = {
  49. "title": ".//title",
  50. "page_path": ".//pagepath",
  51. "mini_program": ".//sourcedisplayname",
  52. "file_id": "appmsg/appattach/cdnthumburl",
  53. "file_size": "appmsg/appattach/cdnthumblength",
  54. "aes_key": "appmsg/appattach/aeskey",
  55. }
  56. data = self.parse_fields(root, fields)
  57. data["msg_type"] = msg_type
  58. return data
  59. def extract_reply_articles(self, msg_type, root):
  60. fields = {
  61. "title": "appmsg/title",
  62. "url": "appmsg/url",
  63. "cover_url": "appmsg/thumburl",
  64. "account_name": "appmsg/sourcedisplayname",
  65. "gh_id": "appmsg/sourceusername",
  66. "desc": "appmsg/des",
  67. }
  68. data = self.parse_fields(root, fields)
  69. data["msg_type"] = msg_type
  70. return data
  71. # 解析 xml
  72. def extract_callback_xml(self, xml_text):
  73. try:
  74. root = ET.fromstring(xml_text)
  75. msg_type = root.find("appmsg/type").text
  76. match msg_type:
  77. case "5":
  78. return self.extract_reply_articles(msg_type, root)
  79. case "33":
  80. return self.extract_reply_cards(msg_type, root)
  81. case "36":
  82. return self.extract_reply_cards(msg_type, root)
  83. case _:
  84. return {}
  85. except Exception as e:
  86. print(xml_text)
  87. print(e)
  88. print(traceback.format_exc())
  89. return {}
  90. # 解析 page_path
  91. @staticmethod
  92. def extract_page_path(page_path):
  93. # 解析外层 URL
  94. parsed_url = urlparse(page_path)
  95. outer_params = parse_qs(parsed_url.query)
  96. # 取出并解码 jumpPage
  97. jump_page = outer_params.get("jumpPage", [""])[0]
  98. if not jump_page:
  99. return None, None
  100. decoded_jump_page = unquote(jump_page)
  101. # 解析 jumpPage 内层参数
  102. inner_query = urlparse(decoded_jump_page).query
  103. inner_params = parse_qs(inner_query)
  104. video_id = inner_params.get("id", [None])[0]
  105. root_source_id = inner_params.get("rootSourceId", [None])[0]
  106. return video_id, root_source_id
  107. @staticmethod
  108. async def get_cover_url(aes_key, total_size, file_id):
  109. url = "http://api.geweapi.com/gewe/v2/api/message/downloadCdn"
  110. data = {
  111. "appId": "wx_anFlUnezoUynU3SKcqTWk",
  112. "aesKey": aes_key,
  113. "totalSize": total_size,
  114. "fileId": file_id,
  115. "type": "3",
  116. "suffix": "jpg",
  117. }
  118. headers = {
  119. "X-GEWE-TOKEN": "d3fb918f-0f36-4769-b095-410181614231",
  120. "Content-Type": "application/json",
  121. }
  122. async with AsyncHttpClient() as client:
  123. response = await client.post(url, headers=headers, data=json.dumps(data))
  124. return response
  125. @staticmethod
  126. async def get_sample_url(recent_articles):
  127. for article in recent_articles:
  128. link = article["ContentUrl"]
  129. response = await get_article_detail(article_link=link)
  130. if not response:
  131. continue
  132. code = response["code"]
  133. if code == 0 or code == 25006:
  134. return link
  135. return None
  136. # 获取检测的账号 list
  137. @staticmethod
  138. def get_monitor_account_list():
  139. # dt = (datetime.today() - timedelta(days=1)).strftime("%Y%m%d")
  140. week_ago = (datetime.today() - timedelta(days=7)).strftime("%Y-%m-%d %H:%M:%S")
  141. query = f"""
  142. SELECT 公众号名, ghid, count(DISTINCT mid) AS uv
  143. FROM loghubods.opengid_base_data
  144. WHERE dt = MAX_PT('loghubods.opengid_base_data')
  145. AND hotsencetype = 1074
  146. AND usersharedepth = 0
  147. AND channel = '公众号合作-即转-稳定'
  148. AND 点击时间 >= '{week_ago}'
  149. GROUP BY 公众号名, ghid
  150. HAVING uv >= 100
  151. ORDER BY uv DESC
  152. ;
  153. """
  154. result = fetch_from_odps(query)
  155. return result
  156. @staticmethod
  157. def download_and_upload_cover(task_id, index, cover_obj, timeout=10):
  158. try:
  159. cover_url = cover_obj["data"]["fileUrl"]
  160. except (KeyError, TypeError):
  161. print(f"[WARN] Invalid cover_obj structure: {cover_obj}")
  162. return None
  163. file_name = f"{task_id}_{index}.jpg"
  164. save_dir = os.path.join(os.getcwd(), "static")
  165. save_path = os.path.join(save_dir, file_name)
  166. os.makedirs(save_dir, exist_ok=True)
  167. try:
  168. response = requests.get(cover_url, timeout=timeout)
  169. response.raise_for_status()
  170. except RequestException as e:
  171. print(f"[ERROR] Download failed ({cover_url}): {e}")
  172. return None
  173. try:
  174. with open(save_path, "wb") as f:
  175. f.write(response.content)
  176. except OSError as e:
  177. print(f"[ERROR] Write file failed ({save_path}): {e}")
  178. return None
  179. oss_dir = "auto_rely_cards_cover"
  180. oss_key = None
  181. try:
  182. oss_key = upload_to_oss(save_path, f"{oss_dir}/{file_name}")
  183. except Exception as e:
  184. print(f"[ERROR] Upload to OSS failed: {e}")
  185. if oss_key:
  186. try:
  187. os.remove(save_path)
  188. except OSError as e:
  189. print(f"[WARN] Failed to remove temp file {save_path}: {e}")
  190. return oss_key
  191. class AutoReplyCardsMonitorMapper(AutoReplyCardsMonitorUtils):
  192. def __init__(self, pool, log_client):
  193. self.pool = pool
  194. self.log_client = log_client
  195. # 获取自动回复任务结果
  196. async def get_auto_reply_task_result(self, task_id):
  197. query = """
  198. SELECT task_result, task_status, err_msg, update_timestamp
  199. FROM gzh_msg_record
  200. WHERE task_id = %s;
  201. """
  202. return await self.pool.async_fetch(
  203. query=query, params=(task_id,), db_name="aigc"
  204. )
  205. # 查询账号
  206. async def fetch_account_status(self, account_name):
  207. query = """
  208. SELECT partner_name, partner_id, gh_id, status, follow_status
  209. FROM cooperate_accounts
  210. WHERE account_name = %s;
  211. """
  212. return await self.pool.async_fetch(query=query, params=(account_name,))
  213. # 更新账号状态为无效
  214. async def set_account_as_invalid(self, gh_id):
  215. query = """
  216. UPDATE cooperate_accounts SET status = %s WHERE gh_id = %s;
  217. """
  218. await self.pool.async_save(query=query, params=(self.INVALID_STATUS, gh_id))
  219. # 插入AIGC关注公众号任务
  220. async def insert_aigc_follow_account_task(self, task_id, link):
  221. timestamp = int(time.time() * 1000)
  222. query = """
  223. INSERT INTO gzh_msg_record (task_id, biz_type, task_params, create_timestamp, update_timestamp) VALUES (%s, %s, %s, %s, %s);
  224. """
  225. return await self.pool.async_save(
  226. query=query,
  227. params=(task_id, "follow", link, timestamp, timestamp),
  228. db_name="aigc",
  229. )
  230. # 插入AIGC自动回复任务
  231. async def insert_aigc_auto_reply_task(self, task_id, account_name):
  232. timestamp = int(time.time() * 1000)
  233. query = """
  234. INSERT INTO gzh_msg_record (task_id, task_params, create_timestamp, update_timestamp) VALUES (%s, %s, %s, %s);
  235. """
  236. return await self.pool.async_save(
  237. query=query,
  238. params=(task_id, account_name, timestamp, timestamp),
  239. db_name="aigc",
  240. )
  241. # 为账号设置 sample_url
  242. async def set_sample_url(self, gh_id, sample_url):
  243. query = """
  244. UPDATE cooperate_accounts SET sample_link = %s WHERE gh_id = %s;
  245. """
  246. return await self.pool.async_save(query=query, params=(sample_url, gh_id))
  247. # 修改账号的关注状态
  248. async def update_follow_status(self, gh_id, ori_status, new_status):
  249. query = """
  250. UPDATE cooperate_accounts SET follow_status = %s WHERE gh_id = %s and follow_status = %s;
  251. """
  252. return await self.pool.async_save(
  253. query=query, params=(new_status, gh_id, ori_status)
  254. )
  255. # 从 aigc 获取关注结果
  256. async def fetch_follow_account_status(self, gh_id):
  257. query = """
  258. SELECT task_status, err_msg
  259. FROM gzh_msg_record
  260. WHERE task_id = %s;
  261. """
  262. return await self.pool.async_fetch(
  263. query=query, params=(f"follow_{gh_id}",), db_name="aigc"
  264. )
  265. # 创建自动回复任务
  266. async def create_auto_reply_task(self, task_id, gh_id):
  267. query = """
  268. INSERT INTO cooperate_accounts_task (task_id, gh_id) VALUES (%s, %s);
  269. """
  270. return await self.pool.async_save(query=query, params=(task_id, gh_id))
  271. async def update_auto_reply_task_status(
  272. self, task_id, status_type, ori_status, new_status
  273. ):
  274. task_query = """
  275. UPDATE cooperate_accounts_task SET task_status = %s WHERE task_id = %s AND task_status = %s;
  276. """
  277. extract_query = """
  278. UPDATE cooperate_accounts_task SET extract_status = %s WHERE task_id = %s AND extract_status = %s;
  279. """
  280. match status_type:
  281. case "task":
  282. return await self.pool.async_save(
  283. query=task_query, params=(new_status, task_id, ori_status)
  284. )
  285. case "extract":
  286. return await self.pool.async_save(
  287. query=extract_query, params=(new_status, task_id, ori_status)
  288. )
  289. case _:
  290. print("status_type_error")
  291. return None
  292. # 获取正在自动回复卡片的任务 id
  293. async def fetch_auto_replying_tasks(self):
  294. query = """
  295. SELECT task_id FROM cooperate_accounts_task WHERE task_status = %s;
  296. """
  297. return await self.pool.async_fetch(
  298. query=query, params=(self.PROCESSING_STATUS,)
  299. )
  300. # 设置自动回复结果
  301. async def set_auto_reply_result(self, task_id, finish_timestamp, result):
  302. query = """
  303. UPDATE cooperate_accounts_task
  304. SET finish_timestamp = %s, result = %s, task_status = %s
  305. WHERE task_id = %s and task_status = %s;
  306. """
  307. return await self.pool.async_save(
  308. query=query,
  309. params=(
  310. finish_timestamp,
  311. result,
  312. self.SUCCESS_STATUS,
  313. task_id,
  314. self.PROCESSING_STATUS,
  315. ),
  316. )
  317. # 获取带解析的任务
  318. async def get_extract_tasks(self):
  319. query = """
  320. SELECT task_id, result FROM cooperate_accounts_task WHERE extract_status = %s AND task_status = %s;
  321. """
  322. return await self.pool.async_fetch(
  323. query=query, params=(self.INIT_STATUS, self.SUCCESS_STATUS)
  324. )
  325. # 存储解析结果
  326. async def store_extract_result(self, query, row_table):
  327. return await self.pool.async_save(query=query, params=row_table)
  328. class AutoReplyCardsMonitor(AutoReplyCardsMonitorMapper):
  329. def __init__(self, pool, log_client):
  330. super().__init__(pool, log_client)
  331. # 存储卡片信息
  332. async def store_card(self, task_id, index, msg_type, xml_obj):
  333. video_id, root_source_id = self.extract_page_path(xml_obj["page_path"])
  334. cover_obj = await self.get_cover_url(xml_obj["aes_key"], xml_obj["file_size"], xml_obj["file_id"])
  335. cover_oss = self.download_and_upload_cover(task_id, index, cover_obj)
  336. query = """
  337. INSERT INTO cooperate_auto_reply_detail
  338. (
  339. task_id, position, msg_type, card_title, card_cover,
  340. video_id, root_source_id, mini_program_name, task_result
  341. ) VALUES
  342. (
  343. %s, %s, %s, %s, %s,
  344. %s, %s, %s, %s
  345. );
  346. """
  347. insert_row = (
  348. task_id,
  349. index,
  350. msg_type,
  351. xml_obj["title"],
  352. cover_oss,
  353. video_id,
  354. root_source_id,
  355. xml_obj["mini_program"],
  356. json.dumps(xml_obj, ensure_ascii=False),
  357. )
  358. await self.store_extract_result(query, insert_row)
  359. # 存储文章信息
  360. async def store_article(self, task_id, index, msg_type, xml_obj):
  361. article_title = xml_obj.get("title")
  362. article_link = xml_obj.get("url")
  363. article_cover = xml_obj.get("cover_url")
  364. article_desc = xml_obj.get("desc")
  365. fetch_fail_status = False
  366. fetch_response = await get_article_detail(article_link=article_link, is_cache=False, is_count=True)
  367. if not fetch_response:
  368. fetch_fail_status = True
  369. if not fetch_fail_status:
  370. if fetch_response.get("code") != 0:
  371. fetch_fail_status = True
  372. if fetch_fail_status:
  373. query = """
  374. INSERT INTO cooperate_auto_reply_detail
  375. (task_id, position, msg_type, article_title, article_link, article_cover, article_desc, remark)
  376. VALUES
  377. (%s, %s, %s, %s, %s, %s, %s, %s);
  378. """
  379. remark = "获取文章详情失败"
  380. insert_row = (task_id, index, msg_type, article_title, article_link, article_cover, article_desc, remark)
  381. await self.store_extract_result(query, insert_row)
  382. else:
  383. print(article_link)
  384. article_detail = fetch_response["data"]["data"]
  385. article_text = article_detail["body_text"]
  386. article_images = article_detail["image_url_list"]
  387. read_cnt = article_detail["view_count"]
  388. like_cnt = article_detail["like_count"]
  389. publish_timestamp = int(article_detail["publish_timestamp"] / 1000)
  390. parsed = urlparse(article_detail["content_link"])
  391. params = parse_qs(parsed.query)
  392. wx_sn = params.get("sn", [None])[0]
  393. print(params)
  394. print(wx_sn)
  395. mini_info = article_detail.get("mini_program")
  396. if not mini_info:
  397. # video_id, root_source_id = None, None
  398. query = """
  399. INSERT INTO cooperate_auto_reply_detail
  400. (
  401. task_id, position, msg_type, article_title, article_link,
  402. article_cover, article_text, article_images, article_desc, read_cnt,
  403. like_cnt, publish_timestamp, task_result, wx_sn
  404. ) VALUES
  405. (
  406. %s, %s, %s, %s, %s,
  407. %s, %s, %s, %s, %s,
  408. %s, %s, %s, %s
  409. );
  410. """
  411. values = (
  412. task_id,
  413. index,
  414. msg_type,
  415. article_title,
  416. article_link,
  417. article_cover,
  418. article_text,
  419. json.dumps(article_images, ensure_ascii=False),
  420. article_desc,
  421. read_cnt,
  422. like_cnt,
  423. publish_timestamp,
  424. json.dumps(fetch_response, ensure_ascii=False),
  425. wx_sn,
  426. )
  427. await self.store_extract_result(query, values)
  428. else:
  429. for card_index, i in enumerate(mini_info, 1):
  430. try:
  431. video_id, root_source_id = self.extract_page_path(i["path"])
  432. card_title = i["title"]
  433. card_cover = i["image_url"]
  434. mini_name = i["nike_name"]
  435. query = """
  436. INSERT INTO cooperate_auto_reply_detail
  437. (
  438. task_id, position, msg_type, card_title, card_cover,
  439. video_id, root_source_id, mini_program_name, article_title, article_link,
  440. article_cover, article_text, article_images, article_desc, read_cnt,
  441. like_cnt, publish_timestamp, task_result, wx_sn, card_index
  442. ) VALUES
  443. (
  444. %s, %s, %s, %s, %s,
  445. %s, %s, %s, %s, %s,
  446. %s, %s, %s, %s, %s,
  447. %s, %s, %s, %s, %s
  448. );
  449. """
  450. values = (
  451. task_id,
  452. index,
  453. msg_type,
  454. card_title,
  455. card_cover,
  456. video_id,
  457. root_source_id,
  458. mini_name,
  459. article_title,
  460. article_link,
  461. article_cover,
  462. article_text,
  463. json.dumps(article_images, ensure_ascii=False),
  464. article_desc,
  465. read_cnt,
  466. like_cnt,
  467. publish_timestamp,
  468. json.dumps(fetch_response, ensure_ascii=False),
  469. wx_sn,
  470. card_index,
  471. )
  472. await self.store_extract_result(query, values)
  473. except Exception as e:
  474. print(traceback.format_exc())
  475. print(e)
  476. # 创建单个关注公众号任务
  477. async def create_follow_single_account_task(self, gh_id):
  478. response = await get_article_list_from_account(account_id=gh_id)
  479. code = response.get("code")
  480. match code:
  481. case 0:
  482. recent_articles = response["data"]["data"][0]["AppMsg"]["DetailInfo"]
  483. article_url = await self.get_sample_url(recent_articles)
  484. print(article_url)
  485. if article_url:
  486. await self.set_sample_url(gh_id, article_url)
  487. task_id = self.generate_task_id(task_name="follow", gh_id=gh_id)
  488. affected_rows = await self.insert_aigc_follow_account_task(
  489. task_id, article_url
  490. )
  491. if affected_rows:
  492. await self.update_follow_status(
  493. gh_id, self.INIT_STATUS, self.PROCESSING_STATUS
  494. )
  495. case 25013:
  496. await self.set_account_as_invalid(gh_id)
  497. case _:
  498. pass
  499. # 创建单个账号自动回复任务
  500. async def create_auto_reply_single_account_task(self, gh_id, account_name):
  501. print(account_name)
  502. task_id = self.generate_task_id(task_name="auto_reply", gh_id=gh_id)
  503. # 先插入 task, 再创建自动回复任务
  504. create_row = await self.create_auto_reply_task(task_id, gh_id)
  505. if create_row:
  506. affected_rows = await self.insert_aigc_auto_reply_task(task_id, gh_id)
  507. if not affected_rows:
  508. print("发布任务至 AIGC 失败")
  509. else:
  510. await self.update_auto_reply_task_status(
  511. task_id, "task", self.INIT_STATUS, self.PROCESSING_STATUS
  512. )
  513. else:
  514. print("创建任务至 DB 失败")
  515. async def follow_gzh_task(self):
  516. account_list = self.get_monitor_account_list()
  517. for account in account_list:
  518. try:
  519. fetch_response = await self.fetch_account_status(account.公众号名)
  520. if not fetch_response:
  521. print("账号不存在", account)
  522. # todo 没有 gh_id, 暂时无法存储账号
  523. # affected_rows =await self.insert_accounts_task(account.公众号名, account.ghid)
  524. # if affected_rows:
  525. # await self.create_follow_account_task(account.ghid)
  526. else:
  527. account_detail = fetch_response[0]
  528. status = account_detail["status"]
  529. if not status:
  530. print("账号已经迁移或者封禁")
  531. continue
  532. # 新逻辑,无需考虑账号是否关注
  533. await self.create_auto_reply_single_account_task(
  534. account_detail["gh_id"], account.公众号名
  535. )
  536. except Exception as e:
  537. print(f"处理账号{account.公众号名}异常", e)
  538. # 异步获取关注结果
  539. async def get_auto_reply_response(self):
  540. task_list = await self.fetch_auto_replying_tasks()
  541. if not task_list:
  542. print("No processing task yet")
  543. return
  544. for task in tqdm(task_list):
  545. try:
  546. task_id = task["task_id"]
  547. response = await self.get_auto_reply_task_result(task_id)
  548. if not response:
  549. continue
  550. task_status = response[0]["task_status"]
  551. task_result = response[0]["task_result"]
  552. update_timestamp = response[0]["update_timestamp"]
  553. match task_status:
  554. case self.FETCH_FAIL_STATUS:
  555. await self.update_auto_reply_task_status(
  556. task_id, "task", self.PROCESSING_STATUS, self.FAIL_STATUS
  557. )
  558. case self.FETCH_SUCCESS_STATUS:
  559. await self.set_auto_reply_result(
  560. task_id, update_timestamp, task_result
  561. )
  562. case _:
  563. continue
  564. except Exception as e:
  565. print(e)
  566. # 解析单个xml
  567. async def extract_single_xml(self, task):
  568. task_id = task["task_id"]
  569. result = task["result"]
  570. # acquire lock
  571. acquire_lock = await self.update_auto_reply_task_status(
  572. task_id, "extract", self.INIT_STATUS, self.PROCESSING_STATUS
  573. )
  574. if not acquire_lock:
  575. return
  576. try:
  577. # parse xml
  578. xml_list = json.loads(result) if type(result) == str else result
  579. for index, item in enumerate(xml_list, 1):
  580. xml_obj = self.extract_callback_xml(item)
  581. if xml_obj:
  582. msg_type = xml_obj.get("msg_type", None)
  583. match msg_type:
  584. case "33":
  585. await self.store_card(task_id, index, msg_type, xml_obj)
  586. case "5":
  587. await self.store_article(task_id, index, msg_type, xml_obj)
  588. case _:
  589. continue
  590. await asyncio.sleep(5)
  591. await self.update_auto_reply_task_status(
  592. task_id, "extract", self.PROCESSING_STATUS, self.SUCCESS_STATUS
  593. )
  594. except Exception as e:
  595. print(e)
  596. print(traceback.format_exc())
  597. await self.update_auto_reply_task_status(
  598. task_id, "extract", self.PROCESSING_STATUS, self.FAIL_STATUS
  599. )
  600. # main function
  601. async def deal(self, task_name):
  602. match task_name:
  603. case "follow_gzh_task":
  604. await self.follow_gzh_task()
  605. case "get_auto_reply_task":
  606. await self.get_auto_reply_response()
  607. case "extract_task":
  608. task_list = await self.get_extract_tasks()
  609. if not task_list:
  610. print("No tasks to extract now")
  611. return
  612. for task in tqdm(task_list, desc="解析任务"):
  613. await self.extract_single_xml(task)
  614. await asyncio.sleep(10)
  615. case "re_extract_task":
  616. pass
  617. case _:
  618. print("task_error")