auto_reply_cards_monitor.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754
  1. import os
  2. import json
  3. import time
  4. import traceback
  5. import uuid
  6. import xml.etree.ElementTree as ET
  7. from tqdm import tqdm
  8. from datetime import datetime, timedelta
  9. from urllib.parse import unquote, parse_qs, urlparse
  10. import requests
  11. from requests.exceptions import RequestException
  12. from applications.utils import upload_to_oss
  13. from applications.utils import fetch_from_odps
  14. from applications.utils import AsyncHttpClient
  15. from applications.crawler.wechat import get_article_list_from_account
  16. from applications.crawler.wechat import get_article_detail
  17. class AutoReplyCardsMonitorConst:
  18. # fetch_status
  19. FETCH_INIT_STATUS = 0
  20. FETCH_PROCESSING_STATUS = 1
  21. FETCH_SUCCESS_STATUS = 2
  22. FETCH_FAIL_STATUS = 3
  23. # task_status
  24. INIT_STATUS = 0
  25. PROCESSING_STATUS = 1
  26. SUCCESS_STATUS = 2
  27. FAIL_STATUS = 99
  28. # account_status
  29. VALID_STATUS = 1
  30. INVALID_STATUS = 0
  31. class AutoReplyCardsMonitorUtils(AutoReplyCardsMonitorConst):
  32. @staticmethod
  33. def generate_task_id(task_name, gh_id):
  34. match task_name:
  35. case "follow":
  36. return f"{task_name}_{gh_id}"
  37. case _:
  38. return f"{task_name}_{uuid.uuid4()}"
  39. @staticmethod
  40. def parse_fields(root, fields, default=""):
  41. result = {}
  42. for key, path in fields.items():
  43. elem = root.find(path)
  44. result[key] = elem.text if elem is not None and elem.text else default
  45. return result
  46. def extract_reply_cards(self, msg_type, root):
  47. FIELDS = {
  48. "title": ".//title",
  49. "page_path": ".//pagepath",
  50. "mini_program": ".//sourcedisplayname",
  51. "file_id": "appmsg/appattach/cdnthumburl",
  52. "file_size": "appmsg/appattach/cdnthumblength",
  53. "aes_key": "appmsg/appattach/aeskey",
  54. }
  55. data = self.parse_fields(root, FIELDS)
  56. data["msg_type"] = msg_type
  57. return data
  58. def extract_reply_articles(self, msg_type, root):
  59. fields = {
  60. "title": "appmsg/title",
  61. "url": "appmsg/url",
  62. "cover_url": "appmsg/thumburl",
  63. "account_name": "appmsg/sourcedisplayname",
  64. "gh_id": "appmsg/sourceusername",
  65. "desc": "appmsg/des",
  66. }
  67. data = self.parse_fields(root, fields)
  68. data["msg_type"] = msg_type
  69. return data
  70. # 解析 xml
  71. def extract_callback_xml(self, xml_text):
  72. try:
  73. root = ET.fromstring(xml_text)
  74. msg_type = root.find("appmsg/type").text
  75. match msg_type:
  76. case "5":
  77. return self.extract_reply_articles(msg_type, root)
  78. case "33":
  79. return self.extract_reply_cards(msg_type, root)
  80. case "36":
  81. return self.extract_reply_cards(msg_type, root)
  82. case _:
  83. return {}
  84. except Exception as e:
  85. print(xml_text)
  86. print(e)
  87. print(traceback.format_exc())
  88. return {}
  89. # 解析 page_path
  90. @staticmethod
  91. def extract_page_path(page_path):
  92. # 解析外层 URL
  93. parsed_url = urlparse(page_path)
  94. outer_params = parse_qs(parsed_url.query)
  95. # 取出并解码 jumpPage
  96. jump_page = outer_params.get("jumpPage", [""])[0]
  97. if not jump_page:
  98. return None, None
  99. decoded_jump_page = unquote(jump_page)
  100. # 解析 jumpPage 内层参数
  101. inner_query = urlparse(decoded_jump_page).query
  102. inner_params = parse_qs(inner_query)
  103. video_id = inner_params.get("id", [None])[0]
  104. root_source_id = inner_params.get("rootSourceId", [None])[0]
  105. return video_id, root_source_id
  106. @staticmethod
  107. async def get_cover_url(aes_key, total_size, file_id):
  108. url = "http://api.geweapi.com/gewe/v2/api/message/downloadCdn"
  109. data = {
  110. "appId": "wx_anFlUnezoUynU3SKcqTWk",
  111. "aesKey": aes_key,
  112. "totalSize": total_size,
  113. "fileId": file_id,
  114. "type": "3",
  115. "suffix": "jpg",
  116. }
  117. headers = {
  118. "X-GEWE-TOKEN": "d3fb918f-0f36-4769-b095-410181614231",
  119. "Content-Type": "application/json",
  120. }
  121. async with AsyncHttpClient() as client:
  122. response = await client.post(url, headers=headers, data=json.dumps(data))
  123. return response
  124. @staticmethod
  125. async def get_sample_url(recent_articles):
  126. for article in recent_articles:
  127. link = article["ContentUrl"]
  128. response = await get_article_detail(article_link=link)
  129. if not response:
  130. continue
  131. code = response["code"]
  132. if code == 0 or code == 25006:
  133. return link
  134. return None
  135. # 获取检测的账号 list
  136. @staticmethod
  137. def get_monitor_account_list():
  138. yesterday = (datetime.today() - timedelta(days=1)).strftime("%Y%m%d")
  139. query = f"""
  140. SELECT 公众号名, ghid, count(DISTINCT mid) AS uv
  141. FROM loghubods.opengid_base_data
  142. WHERE dt = {yesterday}
  143. AND hotsencetype = 1074
  144. AND usersharedepth = 0
  145. AND channel = '公众号合作-即转-稳定'
  146. GROUP BY 公众号名, ghid
  147. HAVING uv >= 100
  148. ORDER BY uv DESC
  149. ;
  150. """
  151. result = fetch_from_odps(query)
  152. return result
  153. @staticmethod
  154. def download_and_upload_cover(task_id, index, cover_obj, timeout=10):
  155. try:
  156. cover_url = cover_obj["data"]["fileUrl"]
  157. except (KeyError, TypeError):
  158. print(f"[WARN] Invalid cover_obj structure: {cover_obj}")
  159. return None
  160. file_name = f"{task_id}_{index}.jpg"
  161. save_dir = os.path.join(os.getcwd(), "static")
  162. save_path = os.path.join(save_dir, file_name)
  163. os.makedirs(save_dir, exist_ok=True)
  164. try:
  165. response = requests.get(cover_url, timeout=timeout)
  166. response.raise_for_status()
  167. except RequestException as e:
  168. print(f"[ERROR] Download failed ({cover_url}): {e}")
  169. return None
  170. try:
  171. with open(save_path, "wb") as f:
  172. f.write(response.content)
  173. except OSError as e:
  174. print(f"[ERROR] Write file failed ({save_path}): {e}")
  175. return None
  176. oss_dir = "auto_rely_cards_cover"
  177. oss_key = None
  178. try:
  179. oss_key = upload_to_oss(save_path, f"{oss_dir}/{file_name}")
  180. except Exception as e:
  181. print(f"[ERROR] Upload to OSS failed: {e}")
  182. if oss_key:
  183. try:
  184. os.remove(save_path)
  185. except OSError as e:
  186. print(f"[WARN] Failed to remove temp file {save_path}: {e}")
  187. return oss_key
  188. class AutoReplyCardsMonitorMapper(AutoReplyCardsMonitorUtils):
  189. def __init__(self, pool, log_client):
  190. self.pool = pool
  191. self.log_client = log_client
  192. # 获取自动回复任务结果
  193. async def get_auto_reply_task_result(self, task_id):
  194. query = """
  195. SELECT task_result, task_status, err_msg, update_timestamp
  196. FROM gzh_msg_record
  197. WHERE task_id = %s;
  198. """
  199. return await self.pool.async_fetch(
  200. query=query, params=(task_id,), db_name="aigc"
  201. )
  202. # 查询账号
  203. async def fetch_account_status(self, account_name):
  204. query = """
  205. SELECT partner_name, partner_id, gh_id, status, follow_status
  206. FROM cooperate_accounts
  207. WHERE account_name = %s;
  208. """
  209. return await self.pool.async_fetch(query=query, params=(account_name,))
  210. # 更新账号状态为无效
  211. async def set_account_as_invalid(self, gh_id):
  212. query = """
  213. UPDATE cooperate_accounts SET status = %s WHERE gh_id = %s;
  214. """
  215. await self.pool.async_save(query=query, params=(self.INVALID_STATUS, gh_id))
  216. # 插入AIGC关注公众号任务
  217. async def insert_aigc_follow_account_task(self, task_id, link):
  218. timestamp = int(time.time() * 1000)
  219. query = """
  220. INSERT INTO gzh_msg_record (task_id, biz_type, task_params, create_timestamp, update_timestamp) VALUES (%s, %s, %s, %s, %s);
  221. """
  222. return await self.pool.async_save(
  223. query=query,
  224. params=(task_id, "follow", link, timestamp, timestamp),
  225. db_name="aigc",
  226. )
  227. # 插入AIGC自动回复任务
  228. async def insert_aigc_auto_reply_task(self, task_id, account_name):
  229. timestamp = int(time.time() * 1000)
  230. query = """
  231. INSERT INTO gzh_msg_record (task_id, task_params, create_timestamp, update_timestamp) VALUES (%s, %s, %s, %s);
  232. """
  233. return await self.pool.async_save(
  234. query=query,
  235. params=(task_id, account_name, timestamp, timestamp),
  236. db_name="aigc",
  237. )
  238. # 为账号设置 sample_url
  239. async def set_sample_url(self, gh_id, sample_url):
  240. query = """
  241. UPDATE cooperate_accounts SET sample_link = %s WHERE gh_id = %s;
  242. """
  243. return await self.pool.async_save(query=query, params=(sample_url, gh_id))
  244. # 修改账号的关注状态
  245. async def update_follow_status(self, gh_id, ori_status, new_status):
  246. query = """
  247. UPDATE cooperate_accounts SET follow_status = %s WHERE gh_id = %s and follow_status = %s;
  248. """
  249. return await self.pool.async_save(
  250. query=query, params=(new_status, gh_id, ori_status)
  251. )
  252. # 从 aigc 获取关注结果
  253. async def fetch_follow_account_status(self, gh_id):
  254. query = """
  255. SELECT task_status, err_msg
  256. FROM gzh_msg_record
  257. WHERE task_id = %s;
  258. """
  259. return await self.pool.async_fetch(
  260. query=query, params=(f"follow_{gh_id}",), db_name="aigc"
  261. )
  262. # 创建自动回复任务
  263. async def create_auto_reply_task(self, task_id, gh_id):
  264. query = """
  265. INSERT INTO cooperate_accounts_task (task_id, gh_id) VALUES (%s, %s);
  266. """
  267. return await self.pool.async_save(query=query, params=(task_id, gh_id))
  268. async def update_auto_reply_task_status(
  269. self, task_id, status_type, ori_status, new_status
  270. ):
  271. task_query = """
  272. UPDATE cooperate_accounts_task SET task_status = %s WHERE task_id = %s AND task_status = %s;
  273. """
  274. extract_query = """
  275. UPDATE cooperate_accounts_task SET extract_status = %s WHERE task_id = %s AND extract_status = %s;
  276. """
  277. match status_type:
  278. case "task":
  279. return await self.pool.async_save(
  280. query=task_query, params=(new_status, task_id, ori_status)
  281. )
  282. case "extract":
  283. return await self.pool.async_save(
  284. query=extract_query, params=(new_status, task_id, ori_status)
  285. )
  286. case _:
  287. print("status_type_error")
  288. return None
  289. # 获取正在自动回复卡片的任务 id
  290. async def fetch_auto_replying_tasks(self):
  291. query = """
  292. SELECT task_id FROM cooperate_accounts_task WHERE task_status = %s;
  293. """
  294. return await self.pool.async_fetch(
  295. query=query, params=(self.PROCESSING_STATUS,)
  296. )
  297. # 设置自动回复结果
  298. async def set_auto_reply_result(self, task_id, finish_timestamp, result):
  299. query = """
  300. UPDATE cooperate_accounts_task
  301. SET finish_timestamp = %s, result = %s, task_status = %s
  302. WHERE task_id = %s and task_status = %s;
  303. """
  304. return await self.pool.async_save(
  305. query=query,
  306. params=(
  307. finish_timestamp,
  308. result,
  309. self.SUCCESS_STATUS,
  310. task_id,
  311. self.PROCESSING_STATUS,
  312. ),
  313. )
  314. # 获取带解析的任务
  315. async def get_extract_tasks(self):
  316. query = """
  317. SELECT task_id, result FROM cooperate_accounts_task WHERE extract_status = %s AND task_status = %s;
  318. """
  319. return await self.pool.async_fetch(
  320. query=query, params=(self.INIT_STATUS, self.SUCCESS_STATUS)
  321. )
  322. # 存储解析结果
  323. async def store_extract_result(self, query, row_table):
  324. return await self.pool.async_save(query=query, params=row_table)
  325. class AutoReplyCardsMonitor(AutoReplyCardsMonitorMapper):
  326. def __init__(self, pool, log_client):
  327. super().__init__(pool, log_client)
  328. # 存储卡片信息
  329. async def store_card(self, task_id, index, msg_type, xml_obj):
  330. video_id, root_source_id = self.extract_page_path(xml_obj["page_path"])
  331. cover_obj = await self.get_cover_url(xml_obj["aes_key"], xml_obj["file_size"], xml_obj["file_id"])
  332. cover_oss = self.download_and_upload_cover(task_id, index, cover_obj)
  333. query = """
  334. INSERT INTO cooperate_auto_reply_detail
  335. (
  336. task_id, position, msg_type, card_title, card_cover,
  337. video_id, root_source_id, mini_program_name, task_result
  338. ) VALUES
  339. (
  340. %s, %s, %s, %s, %s,
  341. %s, %s, %s, %s
  342. );
  343. """
  344. insert_row = (
  345. task_id,
  346. index,
  347. msg_type,
  348. xml_obj["title"],
  349. cover_oss,
  350. video_id,
  351. root_source_id,
  352. xml_obj["mini_program"],
  353. json.dumps(xml_obj, ensure_ascii=False),
  354. )
  355. await self.store_extract_result(query, insert_row)
  356. # 存储文章信息
  357. async def store_article(self, task_id, index, msg_type, xml_obj):
  358. article_title = xml_obj.get("title")
  359. article_link = xml_obj.get("url")
  360. article_cover = xml_obj.get("cover_url")
  361. article_desc = xml_obj.get("desc")
  362. fetch_fail_status = False
  363. fetch_response = await get_article_detail(article_link=article_link, is_cache=False, is_count=True)
  364. if not fetch_response:
  365. fetch_fail_status = True
  366. if not fetch_fail_status:
  367. if fetch_response.get("code") != 0:
  368. fetch_fail_status = True
  369. if fetch_fail_status:
  370. query = """
  371. INSERT INTO cooperate_auto_reply_detail
  372. (task_id, position, msg_type, article_title, article_link, article_cover, article_desc, remark)
  373. VALUES
  374. (%s, %s, %s, %s, %s, %s, %s, %s);
  375. """
  376. remark = "获取文章详情失败"
  377. insert_row = (task_id, index, msg_type, article_title, article_link, article_cover, article_desc, remark)
  378. await self.store_extract_result(query, insert_row)
  379. else:
  380. print(article_link)
  381. article_detail = fetch_response["data"]["data"]
  382. article_text = article_detail["body_text"]
  383. article_images = article_detail["image_url_list"]
  384. read_cnt = article_detail["view_count"]
  385. like_cnt = article_detail["like_count"]
  386. publish_timestamp = int(article_detail["publish_timestamp"] / 1000)
  387. parsed = urlparse(article_detail["content_link"])
  388. params = parse_qs(parsed.query)
  389. wx_sn = params.get("sn", [None])[0]
  390. print(params)
  391. print(wx_sn)
  392. mini_info = article_detail.get("mini_program")
  393. if not mini_info:
  394. # video_id, root_source_id = None, None
  395. query = """
  396. INSERT INTO cooperate_auto_reply_detail
  397. (
  398. task_id, position, msg_type, article_title, article_link,
  399. article_cover, article_text, article_images, article_desc, read_cnt,
  400. like_cnt, publish_timestamp, task_result, wx_sn
  401. ) VALUES
  402. (
  403. %s, %s, %s, %s, %s,
  404. %s, %s, %s, %s, %s,
  405. %s, %s, %s, %s
  406. );
  407. """
  408. values = (
  409. task_id,
  410. index,
  411. msg_type,
  412. article_title,
  413. article_link,
  414. article_cover,
  415. article_text,
  416. json.dumps(article_images, ensure_ascii=False),
  417. article_desc,
  418. read_cnt,
  419. like_cnt,
  420. publish_timestamp,
  421. json.dumps(fetch_response, ensure_ascii=False),
  422. wx_sn,
  423. )
  424. await self.store_extract_result(query, values)
  425. else:
  426. for card_index, i in enumerate(mini_info, 1):
  427. try:
  428. video_id, root_source_id = self.extract_page_path(i["path"])
  429. card_title = i["title"]
  430. card_cover = i["image_url"]
  431. mini_name = i["nike_name"]
  432. query = """
  433. INSERT INTO cooperate_auto_reply_detail
  434. (
  435. task_id, position, msg_type, card_title, card_cover,
  436. video_id, root_source_id, mini_program_name, article_title, article_link,
  437. article_cover, article_text, article_images, article_desc, read_cnt,
  438. like_cnt, publish_timestamp, task_result, wx_sn, card_index
  439. ) VALUES
  440. (
  441. %s, %s, %s, %s, %s,
  442. %s, %s, %s, %s, %s,
  443. %s, %s, %s, %s, %s,
  444. %s, %s, %s, %s, %s
  445. );
  446. """
  447. values = (
  448. task_id,
  449. index,
  450. msg_type,
  451. card_title,
  452. card_cover,
  453. video_id,
  454. root_source_id,
  455. mini_name,
  456. article_title,
  457. article_link,
  458. article_cover,
  459. article_text,
  460. json.dumps(article_images, ensure_ascii=False),
  461. article_desc,
  462. read_cnt,
  463. like_cnt,
  464. publish_timestamp,
  465. json.dumps(fetch_response, ensure_ascii=False),
  466. wx_sn,
  467. card_index,
  468. )
  469. await self.store_extract_result(query, values)
  470. except Exception as e:
  471. print(traceback.format_exc())
  472. print(e)
  473. # 创建单个关注公众号任务
  474. async def create_follow_single_account_task(self, gh_id):
  475. response = await get_article_list_from_account(account_id=gh_id)
  476. code = response.get("code")
  477. match code:
  478. case 0:
  479. recent_articles = response["data"]["data"][0]["AppMsg"]["DetailInfo"]
  480. article_url = await self.get_sample_url(recent_articles)
  481. print(article_url)
  482. if article_url:
  483. await self.set_sample_url(gh_id, article_url)
  484. task_id = self.generate_task_id(task_name="follow", gh_id=gh_id)
  485. affected_rows = await self.insert_aigc_follow_account_task(
  486. task_id, article_url
  487. )
  488. if affected_rows:
  489. await self.update_follow_status(
  490. gh_id, self.INIT_STATUS, self.PROCESSING_STATUS
  491. )
  492. case 25013:
  493. await self.set_account_as_invalid(gh_id)
  494. case _:
  495. pass
  496. # 创建单个账号自动回复任务
  497. async def create_auto_reply_single_account_task(self, gh_id, account_name):
  498. print(account_name)
  499. task_id = self.generate_task_id(task_name="auto_reply", gh_id=gh_id)
  500. # 先插入 task, 再创建自动回复任务
  501. create_row = await self.create_auto_reply_task(task_id, gh_id)
  502. if create_row:
  503. affected_rows = await self.insert_aigc_auto_reply_task(task_id, gh_id)
  504. if not affected_rows:
  505. print("发布任务至 AIGC 失败")
  506. else:
  507. await self.update_auto_reply_task_status(
  508. task_id, "task", self.INIT_STATUS, self.PROCESSING_STATUS
  509. )
  510. else:
  511. print("创建任务至 DB 失败")
  512. async def follow_gzh_task(self):
  513. account_list = self.get_monitor_account_list()
  514. for account in account_list:
  515. try:
  516. fetch_response = await self.fetch_account_status(account.公众号名)
  517. if not fetch_response:
  518. print("账号不存在", account)
  519. # todo 没有 gh_id, 暂时无法存储账号
  520. # affected_rows =await self.insert_accounts_task(account.公众号名, account.ghid)
  521. # if affected_rows:
  522. # await self.create_follow_account_task(account.ghid)
  523. else:
  524. account_detail = fetch_response[0]
  525. status = account_detail["status"]
  526. if not status:
  527. print("账号已经迁移或者封禁")
  528. continue
  529. # 新逻辑,无需考虑账号是否关注
  530. await self.create_auto_reply_single_account_task(
  531. account_detail["gh_id"], account.公众号名
  532. )
  533. # # 旧逻辑,考虑账号是否关注
  534. # follow_status = account_detail["follow_status"]
  535. # match follow_status:
  536. # case self.INIT_STATUS:
  537. # await self.create_follow_single_account_task(
  538. # account_detail["gh_id"]
  539. # )
  540. #
  541. # case self.PROCESSING_STATUS:
  542. # fetch_response = await self.fetch_follow_account_status(
  543. # account_detail["gh_id"]
  544. # )
  545. # if not fetch_response:
  546. # await self.update_follow_status(
  547. # account_detail["gh_id"],
  548. # self.PROCESSING_STATUS,
  549. # self.INIT_STATUS,
  550. # )
  551. #
  552. # task_status = fetch_response[0]["task_status"]
  553. # match task_status:
  554. # case self.FETCH_INIT_STATUS:
  555. # continue
  556. # case self.FETCH_PROCESSING_STATUS:
  557. # continue
  558. # case self.FETCH_SUCCESS_STATUS:
  559. # await self.update_follow_status(
  560. # account_detail["gh_id"],
  561. # self.PROCESSING_STATUS,
  562. # self.SUCCESS_STATUS,
  563. # )
  564. # case self.FETCH_FAIL_STATUS:
  565. # await self.update_follow_status(
  566. # account_detail["gh_id"],
  567. # self.PROCESSING_STATUS,
  568. # self.FAIL_STATUS,
  569. # )
  570. #
  571. # case self.SUCCESS_STATUS:
  572. # # 账号已经关注,创建获取自动回复任务
  573. # await self.create_auto_reply_single_account_task(
  574. # account_detail["gh_id"], account.公众号名
  575. # )
  576. #
  577. # case _:
  578. # print(f"{account.公众号名}账号状态异常")
  579. except Exception as e:
  580. print(f"处理账号{account.公众号名}异常", e)
  581. # 异步获取关注结果
  582. async def get_auto_reply_response(self):
  583. task_list = await self.fetch_auto_replying_tasks()
  584. if not task_list:
  585. print("No processing task yet")
  586. return
  587. for task in tqdm(task_list):
  588. try:
  589. task_id = task["task_id"]
  590. response = await self.get_auto_reply_task_result(task_id)
  591. if not response:
  592. continue
  593. task_status = response[0]["task_status"]
  594. task_result = response[0]["task_result"]
  595. update_timestamp = response[0]["update_timestamp"]
  596. match task_status:
  597. case self.FETCH_FAIL_STATUS:
  598. await self.update_auto_reply_task_status(
  599. task_id, "task", self.PROCESSING_STATUS, self.FAIL_STATUS
  600. )
  601. case self.FETCH_SUCCESS_STATUS:
  602. await self.set_auto_reply_result(
  603. task_id, update_timestamp, task_result
  604. )
  605. case _:
  606. continue
  607. except Exception as e:
  608. print(e)
  609. # 解析单个xml
  610. async def extract_single_xml(self, task):
  611. task_id = task["task_id"]
  612. result = task["result"]
  613. # acquire lock
  614. acquire_lock = await self.update_auto_reply_task_status(
  615. task_id, "extract", self.INIT_STATUS, self.PROCESSING_STATUS
  616. )
  617. if not acquire_lock:
  618. return
  619. try:
  620. # parse xml
  621. xml_list = json.loads(result) if type(result) == str else result
  622. for index, item in enumerate(xml_list, 1):
  623. xml_obj = self.extract_callback_xml(item)
  624. if xml_obj:
  625. msg_type = xml_obj.get("msg_type", None)
  626. match msg_type:
  627. case "33":
  628. await self.store_card(task_id, index, msg_type, xml_obj)
  629. case "5":
  630. await self.store_article(task_id, index, msg_type, xml_obj)
  631. case _:
  632. continue
  633. await self.update_auto_reply_task_status(
  634. task_id, "extract", self.PROCESSING_STATUS, self.SUCCESS_STATUS
  635. )
  636. except Exception as e:
  637. print(e)
  638. print(traceback.format_exc())
  639. await self.update_auto_reply_task_status(
  640. task_id, "extract", self.PROCESSING_STATUS, self.FAIL_STATUS
  641. )
  642. # main function
  643. async def deal(self, task_name):
  644. match task_name:
  645. case "follow_gzh_task":
  646. await self.follow_gzh_task()
  647. case "get_auto_reply_task":
  648. await self.get_auto_reply_response()
  649. case "extract_task":
  650. task_list = await self.get_extract_tasks()
  651. if not task_list:
  652. print("No tasks to extract now")
  653. return
  654. for task in tqdm(task_list, desc="解析任务"):
  655. await self.extract_single_xml(task)
  656. case "re_extract_task":
  657. pass
  658. case _:
  659. print("task_error")