auto_reply_cards_monitor.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758
  1. import asyncio
  2. import os
  3. import json
  4. import time
  5. import traceback
  6. import uuid
  7. import xml.etree.ElementTree as ET
  8. from tqdm import tqdm
  9. from datetime import datetime, timedelta
  10. from urllib.parse import unquote, parse_qs, urlparse
  11. import requests
  12. from requests.exceptions import RequestException
  13. from applications.utils import upload_to_oss
  14. from applications.utils import fetch_from_odps
  15. from applications.utils import AsyncHttpClient
  16. from applications.crawler.wechat import get_article_list_from_account
  17. from applications.crawler.wechat import get_article_detail
  18. class AutoReplyCardsMonitorConst:
  19. # fetch_status
  20. FETCH_INIT_STATUS = 0
  21. FETCH_PROCESSING_STATUS = 1
  22. FETCH_SUCCESS_STATUS = 2
  23. FETCH_FAIL_STATUS = 3
  24. # task_status
  25. INIT_STATUS = 0
  26. PROCESSING_STATUS = 1
  27. SUCCESS_STATUS = 2
  28. FAIL_STATUS = 99
  29. # account_status
  30. VALID_STATUS = 1
  31. INVALID_STATUS = 0
  32. class AutoReplyCardsMonitorUtils(AutoReplyCardsMonitorConst):
  33. @staticmethod
  34. def generate_task_id(task_name, gh_id):
  35. match task_name:
  36. case "follow":
  37. return f"{task_name}_{gh_id}"
  38. case _:
  39. return f"{task_name}_{uuid.uuid4()}"
  40. @staticmethod
  41. def parse_fields(root, fields, default=""):
  42. result = {}
  43. for key, path in fields.items():
  44. elem = root.find(path)
  45. result[key] = elem.text if elem is not None and elem.text else default
  46. return result
  47. def extract_reply_cards(self, msg_type, root):
  48. fields = {
  49. "title": ".//title",
  50. "page_path": ".//pagepath",
  51. "mini_program": ".//sourcedisplayname",
  52. "file_id": "appmsg/appattach/cdnthumburl",
  53. "file_size": "appmsg/appattach/cdnthumblength",
  54. "aes_key": "appmsg/appattach/aeskey",
  55. }
  56. data = self.parse_fields(root, fields)
  57. data["msg_type"] = msg_type
  58. return data
  59. def extract_reply_articles(self, msg_type, root):
  60. fields = {
  61. "title": "appmsg/title",
  62. "url": "appmsg/url",
  63. "cover_url": "appmsg/thumburl",
  64. "account_name": "appmsg/sourcedisplayname",
  65. "gh_id": "appmsg/sourceusername",
  66. "desc": "appmsg/des",
  67. }
  68. data = self.parse_fields(root, fields)
  69. data["msg_type"] = msg_type
  70. return data
  71. # 解析 xml
  72. def extract_callback_xml(self, xml_text):
  73. try:
  74. root = ET.fromstring(xml_text)
  75. msg_type = root.find("appmsg/type").text
  76. match msg_type:
  77. case "5":
  78. return self.extract_reply_articles(msg_type, root)
  79. case "33":
  80. return self.extract_reply_cards(msg_type, root)
  81. case "36":
  82. return self.extract_reply_cards(msg_type, root)
  83. case _:
  84. return {}
  85. except Exception as e:
  86. print(xml_text)
  87. print(e)
  88. print(traceback.format_exc())
  89. return {}
  90. # 解析 page_path
  91. @staticmethod
  92. def extract_page_path(page_path):
  93. # 解析外层 URL
  94. parsed_url = urlparse(page_path)
  95. outer_params = parse_qs(parsed_url.query)
  96. # 取出并解码 jumpPage
  97. jump_page = outer_params.get("jumpPage", [""])[0]
  98. if not jump_page:
  99. return None, None
  100. decoded_jump_page = unquote(jump_page)
  101. # 解析 jumpPage 内层参数
  102. inner_query = urlparse(decoded_jump_page).query
  103. inner_params = parse_qs(inner_query)
  104. video_id = inner_params.get("id", [None])[0]
  105. root_source_id = inner_params.get("rootSourceId", [None])[0]
  106. return video_id, root_source_id
  107. @staticmethod
  108. async def get_cover_url(aes_key, total_size, file_id):
  109. url = "http://api.geweapi.com/gewe/v2/api/message/downloadCdn"
  110. data = {
  111. "appId": "wx_anFlUnezoUynU3SKcqTWk",
  112. "aesKey": aes_key,
  113. "totalSize": total_size,
  114. "fileId": file_id,
  115. "type": "3",
  116. "suffix": "jpg",
  117. }
  118. headers = {
  119. "X-GEWE-TOKEN": "d3fb918f-0f36-4769-b095-410181614231",
  120. "Content-Type": "application/json",
  121. }
  122. async with AsyncHttpClient() as client:
  123. response = await client.post(url, headers=headers, data=json.dumps(data))
  124. return response
  125. @staticmethod
  126. async def get_sample_url(recent_articles):
  127. for article in recent_articles:
  128. link = article["ContentUrl"]
  129. response = await get_article_detail(article_link=link)
  130. if not response:
  131. continue
  132. code = response["code"]
  133. if code == 0 or code == 25006:
  134. return link
  135. return None
  136. # 获取检测的账号 list
  137. @staticmethod
  138. def get_monitor_account_list():
  139. yesterday = (datetime.today() - timedelta(days=1)).strftime("%Y%m%d")
  140. query = f"""
  141. SELECT 公众号名, ghid, count(DISTINCT mid) AS uv
  142. FROM loghubods.opengid_base_data
  143. WHERE dt = {yesterday}
  144. AND hotsencetype = 1074
  145. AND usersharedepth = 0
  146. AND channel = '公众号合作-即转-稳定'
  147. GROUP BY 公众号名, ghid
  148. HAVING uv >= 100
  149. ORDER BY uv DESC
  150. ;
  151. """
  152. result = fetch_from_odps(query)
  153. return result
  154. @staticmethod
  155. def download_and_upload_cover(task_id, index, cover_obj, timeout=10):
  156. try:
  157. cover_url = cover_obj["data"]["fileUrl"]
  158. except (KeyError, TypeError):
  159. print(f"[WARN] Invalid cover_obj structure: {cover_obj}")
  160. return None
  161. file_name = f"{task_id}_{index}.jpg"
  162. save_dir = os.path.join(os.getcwd(), "static")
  163. save_path = os.path.join(save_dir, file_name)
  164. os.makedirs(save_dir, exist_ok=True)
  165. try:
  166. response = requests.get(cover_url, timeout=timeout)
  167. response.raise_for_status()
  168. except RequestException as e:
  169. print(f"[ERROR] Download failed ({cover_url}): {e}")
  170. return None
  171. try:
  172. with open(save_path, "wb") as f:
  173. f.write(response.content)
  174. except OSError as e:
  175. print(f"[ERROR] Write file failed ({save_path}): {e}")
  176. return None
  177. oss_dir = "auto_rely_cards_cover"
  178. oss_key = None
  179. try:
  180. oss_key = upload_to_oss(save_path, f"{oss_dir}/{file_name}")
  181. except Exception as e:
  182. print(f"[ERROR] Upload to OSS failed: {e}")
  183. if oss_key:
  184. try:
  185. os.remove(save_path)
  186. except OSError as e:
  187. print(f"[WARN] Failed to remove temp file {save_path}: {e}")
  188. return oss_key
  189. class AutoReplyCardsMonitorMapper(AutoReplyCardsMonitorUtils):
  190. def __init__(self, pool, log_client):
  191. self.pool = pool
  192. self.log_client = log_client
  193. # 获取自动回复任务结果
  194. async def get_auto_reply_task_result(self, task_id):
  195. query = """
  196. SELECT task_result, task_status, err_msg, update_timestamp
  197. FROM gzh_msg_record
  198. WHERE task_id = %s;
  199. """
  200. return await self.pool.async_fetch(
  201. query=query, params=(task_id,), db_name="aigc"
  202. )
  203. # 查询账号
  204. async def fetch_account_status(self, account_name):
  205. query = """
  206. SELECT partner_name, partner_id, gh_id, status, follow_status
  207. FROM cooperate_accounts
  208. WHERE account_name = %s;
  209. """
  210. return await self.pool.async_fetch(query=query, params=(account_name,))
  211. # 更新账号状态为无效
  212. async def set_account_as_invalid(self, gh_id):
  213. query = """
  214. UPDATE cooperate_accounts SET status = %s WHERE gh_id = %s;
  215. """
  216. await self.pool.async_save(query=query, params=(self.INVALID_STATUS, gh_id))
  217. # 插入AIGC关注公众号任务
  218. async def insert_aigc_follow_account_task(self, task_id, link):
  219. timestamp = int(time.time() * 1000)
  220. query = """
  221. INSERT INTO gzh_msg_record (task_id, biz_type, task_params, create_timestamp, update_timestamp) VALUES (%s, %s, %s, %s, %s);
  222. """
  223. return await self.pool.async_save(
  224. query=query,
  225. params=(task_id, "follow", link, timestamp, timestamp),
  226. db_name="aigc",
  227. )
  228. # 插入AIGC自动回复任务
  229. async def insert_aigc_auto_reply_task(self, task_id, account_name):
  230. timestamp = int(time.time() * 1000)
  231. query = """
  232. INSERT INTO gzh_msg_record (task_id, task_params, create_timestamp, update_timestamp) VALUES (%s, %s, %s, %s);
  233. """
  234. return await self.pool.async_save(
  235. query=query,
  236. params=(task_id, account_name, timestamp, timestamp),
  237. db_name="aigc",
  238. )
  239. # 为账号设置 sample_url
  240. async def set_sample_url(self, gh_id, sample_url):
  241. query = """
  242. UPDATE cooperate_accounts SET sample_link = %s WHERE gh_id = %s;
  243. """
  244. return await self.pool.async_save(query=query, params=(sample_url, gh_id))
  245. # 修改账号的关注状态
  246. async def update_follow_status(self, gh_id, ori_status, new_status):
  247. query = """
  248. UPDATE cooperate_accounts SET follow_status = %s WHERE gh_id = %s and follow_status = %s;
  249. """
  250. return await self.pool.async_save(
  251. query=query, params=(new_status, gh_id, ori_status)
  252. )
  253. # 从 aigc 获取关注结果
  254. async def fetch_follow_account_status(self, gh_id):
  255. query = """
  256. SELECT task_status, err_msg
  257. FROM gzh_msg_record
  258. WHERE task_id = %s;
  259. """
  260. return await self.pool.async_fetch(
  261. query=query, params=(f"follow_{gh_id}",), db_name="aigc"
  262. )
  263. # 创建自动回复任务
  264. async def create_auto_reply_task(self, task_id, gh_id):
  265. query = """
  266. INSERT INTO cooperate_accounts_task (task_id, gh_id) VALUES (%s, %s);
  267. """
  268. return await self.pool.async_save(query=query, params=(task_id, gh_id))
  269. async def update_auto_reply_task_status(
  270. self, task_id, status_type, ori_status, new_status
  271. ):
  272. task_query = """
  273. UPDATE cooperate_accounts_task SET task_status = %s WHERE task_id = %s AND task_status = %s;
  274. """
  275. extract_query = """
  276. UPDATE cooperate_accounts_task SET extract_status = %s WHERE task_id = %s AND extract_status = %s;
  277. """
  278. match status_type:
  279. case "task":
  280. return await self.pool.async_save(
  281. query=task_query, params=(new_status, task_id, ori_status)
  282. )
  283. case "extract":
  284. return await self.pool.async_save(
  285. query=extract_query, params=(new_status, task_id, ori_status)
  286. )
  287. case _:
  288. print("status_type_error")
  289. return None
  290. # 获取正在自动回复卡片的任务 id
  291. async def fetch_auto_replying_tasks(self):
  292. query = """
  293. SELECT task_id FROM cooperate_accounts_task WHERE task_status = %s;
  294. """
  295. return await self.pool.async_fetch(
  296. query=query, params=(self.PROCESSING_STATUS,)
  297. )
  298. # 设置自动回复结果
  299. async def set_auto_reply_result(self, task_id, finish_timestamp, result):
  300. query = """
  301. UPDATE cooperate_accounts_task
  302. SET finish_timestamp = %s, result = %s, task_status = %s
  303. WHERE task_id = %s and task_status = %s;
  304. """
  305. return await self.pool.async_save(
  306. query=query,
  307. params=(
  308. finish_timestamp,
  309. result,
  310. self.SUCCESS_STATUS,
  311. task_id,
  312. self.PROCESSING_STATUS,
  313. ),
  314. )
  315. # 获取带解析的任务
  316. async def get_extract_tasks(self):
  317. query = """
  318. SELECT task_id, result FROM cooperate_accounts_task WHERE extract_status = %s AND task_status = %s;
  319. """
  320. return await self.pool.async_fetch(
  321. query=query, params=(self.INIT_STATUS, self.SUCCESS_STATUS)
  322. )
  323. # 存储解析结果
  324. async def store_extract_result(self, query, row_table):
  325. return await self.pool.async_save(query=query, params=row_table)
  326. class AutoReplyCardsMonitor(AutoReplyCardsMonitorMapper):
  327. def __init__(self, pool, log_client):
  328. super().__init__(pool, log_client)
  329. # 存储卡片信息
  330. async def store_card(self, task_id, index, msg_type, xml_obj):
  331. video_id, root_source_id = self.extract_page_path(xml_obj["page_path"])
  332. cover_obj = await self.get_cover_url(xml_obj["aes_key"], xml_obj["file_size"], xml_obj["file_id"])
  333. cover_oss = self.download_and_upload_cover(task_id, index, cover_obj)
  334. query = """
  335. INSERT INTO cooperate_auto_reply_detail
  336. (
  337. task_id, position, msg_type, card_title, card_cover,
  338. video_id, root_source_id, mini_program_name, task_result
  339. ) VALUES
  340. (
  341. %s, %s, %s, %s, %s,
  342. %s, %s, %s, %s
  343. );
  344. """
  345. insert_row = (
  346. task_id,
  347. index,
  348. msg_type,
  349. xml_obj["title"],
  350. cover_oss,
  351. video_id,
  352. root_source_id,
  353. xml_obj["mini_program"],
  354. json.dumps(xml_obj, ensure_ascii=False),
  355. )
  356. await self.store_extract_result(query, insert_row)
  357. # 存储文章信息
  358. async def store_article(self, task_id, index, msg_type, xml_obj):
  359. article_title = xml_obj.get("title")
  360. article_link = xml_obj.get("url")
  361. article_cover = xml_obj.get("cover_url")
  362. article_desc = xml_obj.get("desc")
  363. fetch_fail_status = False
  364. fetch_response = await get_article_detail(article_link=article_link, is_cache=False, is_count=True)
  365. if not fetch_response:
  366. fetch_fail_status = True
  367. if not fetch_fail_status:
  368. if fetch_response.get("code") != 0:
  369. fetch_fail_status = True
  370. if fetch_fail_status:
  371. query = """
  372. INSERT INTO cooperate_auto_reply_detail
  373. (task_id, position, msg_type, article_title, article_link, article_cover, article_desc, remark)
  374. VALUES
  375. (%s, %s, %s, %s, %s, %s, %s, %s);
  376. """
  377. remark = "获取文章详情失败"
  378. insert_row = (task_id, index, msg_type, article_title, article_link, article_cover, article_desc, remark)
  379. await self.store_extract_result(query, insert_row)
  380. else:
  381. print(article_link)
  382. article_detail = fetch_response["data"]["data"]
  383. article_text = article_detail["body_text"]
  384. article_images = article_detail["image_url_list"]
  385. read_cnt = article_detail["view_count"]
  386. like_cnt = article_detail["like_count"]
  387. publish_timestamp = int(article_detail["publish_timestamp"] / 1000)
  388. parsed = urlparse(article_detail["content_link"])
  389. params = parse_qs(parsed.query)
  390. wx_sn = params.get("sn", [None])[0]
  391. print(params)
  392. print(wx_sn)
  393. mini_info = article_detail.get("mini_program")
  394. if not mini_info:
  395. # video_id, root_source_id = None, None
  396. query = """
  397. INSERT INTO cooperate_auto_reply_detail
  398. (
  399. task_id, position, msg_type, article_title, article_link,
  400. article_cover, article_text, article_images, article_desc, read_cnt,
  401. like_cnt, publish_timestamp, task_result, wx_sn
  402. ) VALUES
  403. (
  404. %s, %s, %s, %s, %s,
  405. %s, %s, %s, %s, %s,
  406. %s, %s, %s, %s
  407. );
  408. """
  409. values = (
  410. task_id,
  411. index,
  412. msg_type,
  413. article_title,
  414. article_link,
  415. article_cover,
  416. article_text,
  417. json.dumps(article_images, ensure_ascii=False),
  418. article_desc,
  419. read_cnt,
  420. like_cnt,
  421. publish_timestamp,
  422. json.dumps(fetch_response, ensure_ascii=False),
  423. wx_sn,
  424. )
  425. await self.store_extract_result(query, values)
  426. else:
  427. for card_index, i in enumerate(mini_info, 1):
  428. try:
  429. video_id, root_source_id = self.extract_page_path(i["path"])
  430. card_title = i["title"]
  431. card_cover = i["image_url"]
  432. mini_name = i["nike_name"]
  433. query = """
  434. INSERT INTO cooperate_auto_reply_detail
  435. (
  436. task_id, position, msg_type, card_title, card_cover,
  437. video_id, root_source_id, mini_program_name, article_title, article_link,
  438. article_cover, article_text, article_images, article_desc, read_cnt,
  439. like_cnt, publish_timestamp, task_result, wx_sn, card_index
  440. ) VALUES
  441. (
  442. %s, %s, %s, %s, %s,
  443. %s, %s, %s, %s, %s,
  444. %s, %s, %s, %s, %s,
  445. %s, %s, %s, %s, %s
  446. );
  447. """
  448. values = (
  449. task_id,
  450. index,
  451. msg_type,
  452. card_title,
  453. card_cover,
  454. video_id,
  455. root_source_id,
  456. mini_name,
  457. article_title,
  458. article_link,
  459. article_cover,
  460. article_text,
  461. json.dumps(article_images, ensure_ascii=False),
  462. article_desc,
  463. read_cnt,
  464. like_cnt,
  465. publish_timestamp,
  466. json.dumps(fetch_response, ensure_ascii=False),
  467. wx_sn,
  468. card_index,
  469. )
  470. await self.store_extract_result(query, values)
  471. except Exception as e:
  472. print(traceback.format_exc())
  473. print(e)
  474. # 创建单个关注公众号任务
  475. async def create_follow_single_account_task(self, gh_id):
  476. response = await get_article_list_from_account(account_id=gh_id)
  477. code = response.get("code")
  478. match code:
  479. case 0:
  480. recent_articles = response["data"]["data"][0]["AppMsg"]["DetailInfo"]
  481. article_url = await self.get_sample_url(recent_articles)
  482. print(article_url)
  483. if article_url:
  484. await self.set_sample_url(gh_id, article_url)
  485. task_id = self.generate_task_id(task_name="follow", gh_id=gh_id)
  486. affected_rows = await self.insert_aigc_follow_account_task(
  487. task_id, article_url
  488. )
  489. if affected_rows:
  490. await self.update_follow_status(
  491. gh_id, self.INIT_STATUS, self.PROCESSING_STATUS
  492. )
  493. case 25013:
  494. await self.set_account_as_invalid(gh_id)
  495. case _:
  496. pass
  497. # 创建单个账号自动回复任务
  498. async def create_auto_reply_single_account_task(self, gh_id, account_name):
  499. print(account_name)
  500. task_id = self.generate_task_id(task_name="auto_reply", gh_id=gh_id)
  501. # 先插入 task, 再创建自动回复任务
  502. create_row = await self.create_auto_reply_task(task_id, gh_id)
  503. if create_row:
  504. affected_rows = await self.insert_aigc_auto_reply_task(task_id, gh_id)
  505. if not affected_rows:
  506. print("发布任务至 AIGC 失败")
  507. else:
  508. await self.update_auto_reply_task_status(
  509. task_id, "task", self.INIT_STATUS, self.PROCESSING_STATUS
  510. )
  511. else:
  512. print("创建任务至 DB 失败")
  513. async def follow_gzh_task(self):
  514. account_list = self.get_monitor_account_list()
  515. for account in account_list:
  516. try:
  517. fetch_response = await self.fetch_account_status(account.公众号名)
  518. if not fetch_response:
  519. print("账号不存在", account)
  520. # todo 没有 gh_id, 暂时无法存储账号
  521. # affected_rows =await self.insert_accounts_task(account.公众号名, account.ghid)
  522. # if affected_rows:
  523. # await self.create_follow_account_task(account.ghid)
  524. else:
  525. account_detail = fetch_response[0]
  526. status = account_detail["status"]
  527. if not status:
  528. print("账号已经迁移或者封禁")
  529. continue
  530. # 新逻辑,无需考虑账号是否关注
  531. await self.create_auto_reply_single_account_task(
  532. account_detail["gh_id"], account.公众号名
  533. )
  534. # # 旧逻辑,考虑账号是否关注
  535. # follow_status = account_detail["follow_status"]
  536. # match follow_status:
  537. # case self.INIT_STATUS:
  538. # await self.create_follow_single_account_task(
  539. # account_detail["gh_id"]
  540. # )
  541. #
  542. # case self.PROCESSING_STATUS:
  543. # fetch_response = await self.fetch_follow_account_status(
  544. # account_detail["gh_id"]
  545. # )
  546. # if not fetch_response:
  547. # await self.update_follow_status(
  548. # account_detail["gh_id"],
  549. # self.PROCESSING_STATUS,
  550. # self.INIT_STATUS,
  551. # )
  552. #
  553. # task_status = fetch_response[0]["task_status"]
  554. # match task_status:
  555. # case self.FETCH_INIT_STATUS:
  556. # continue
  557. # case self.FETCH_PROCESSING_STATUS:
  558. # continue
  559. # case self.FETCH_SUCCESS_STATUS:
  560. # await self.update_follow_status(
  561. # account_detail["gh_id"],
  562. # self.PROCESSING_STATUS,
  563. # self.SUCCESS_STATUS,
  564. # )
  565. # case self.FETCH_FAIL_STATUS:
  566. # await self.update_follow_status(
  567. # account_detail["gh_id"],
  568. # self.PROCESSING_STATUS,
  569. # self.FAIL_STATUS,
  570. # )
  571. #
  572. # case self.SUCCESS_STATUS:
  573. # # 账号已经关注,创建获取自动回复任务
  574. # await self.create_auto_reply_single_account_task(
  575. # account_detail["gh_id"], account.公众号名
  576. # )
  577. #
  578. # case _:
  579. # print(f"{account.公众号名}账号状态异常")
  580. except Exception as e:
  581. print(f"处理账号{account.公众号名}异常", e)
  582. # 异步获取关注结果
  583. async def get_auto_reply_response(self):
  584. task_list = await self.fetch_auto_replying_tasks()
  585. if not task_list:
  586. print("No processing task yet")
  587. return
  588. for task in tqdm(task_list):
  589. try:
  590. task_id = task["task_id"]
  591. response = await self.get_auto_reply_task_result(task_id)
  592. if not response:
  593. continue
  594. task_status = response[0]["task_status"]
  595. task_result = response[0]["task_result"]
  596. update_timestamp = response[0]["update_timestamp"]
  597. match task_status:
  598. case self.FETCH_FAIL_STATUS:
  599. await self.update_auto_reply_task_status(
  600. task_id, "task", self.PROCESSING_STATUS, self.FAIL_STATUS
  601. )
  602. case self.FETCH_SUCCESS_STATUS:
  603. await self.set_auto_reply_result(
  604. task_id, update_timestamp, task_result
  605. )
  606. case _:
  607. continue
  608. except Exception as e:
  609. print(e)
  610. # 解析单个xml
  611. async def extract_single_xml(self, task):
  612. task_id = task["task_id"]
  613. result = task["result"]
  614. # acquire lock
  615. acquire_lock = await self.update_auto_reply_task_status(
  616. task_id, "extract", self.INIT_STATUS, self.PROCESSING_STATUS
  617. )
  618. if not acquire_lock:
  619. return
  620. try:
  621. # parse xml
  622. xml_list = json.loads(result) if type(result) == str else result
  623. for index, item in enumerate(xml_list, 1):
  624. xml_obj = self.extract_callback_xml(item)
  625. if xml_obj:
  626. msg_type = xml_obj.get("msg_type", None)
  627. match msg_type:
  628. case "33":
  629. await self.store_card(task_id, index, msg_type, xml_obj)
  630. case "5":
  631. await self.store_article(task_id, index, msg_type, xml_obj)
  632. case _:
  633. continue
  634. await asyncio.sleep(5)
  635. await self.update_auto_reply_task_status(
  636. task_id, "extract", self.PROCESSING_STATUS, self.SUCCESS_STATUS
  637. )
  638. except Exception as e:
  639. print(e)
  640. print(traceback.format_exc())
  641. await self.update_auto_reply_task_status(
  642. task_id, "extract", self.PROCESSING_STATUS, self.FAIL_STATUS
  643. )
  644. # main function
  645. async def deal(self, task_name):
  646. match task_name:
  647. case "follow_gzh_task":
  648. await self.follow_gzh_task()
  649. case "get_auto_reply_task":
  650. await self.get_auto_reply_response()
  651. case "extract_task":
  652. task_list = await self.get_extract_tasks()
  653. if not task_list:
  654. print("No tasks to extract now")
  655. return
  656. for task in tqdm(task_list, desc="解析任务"):
  657. await self.extract_single_xml(task)
  658. await asyncio.sleep(10)
  659. case "re_extract_task":
  660. pass
  661. case _:
  662. print("task_error")