auto_reply_cards_monitor.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783
  1. import asyncio
  2. import os
  3. import json
  4. import time
  5. import traceback
  6. import uuid
  7. from typing import List, Dict
  8. import xml.etree.ElementTree as ET
  9. from tqdm import tqdm
  10. from datetime import datetime, timedelta
  11. from urllib.parse import unquote, parse_qs, urlparse
  12. import requests
  13. from requests.exceptions import RequestException
  14. from app.infra.shared.tools import upload_to_oss
  15. from app.infra.shared.tools import fetch_from_odps
  16. from app.infra.shared import AsyncHttpClient
  17. from app.infra.crawler.wechat import get_article_list_from_account
  18. from app.infra.crawler.wechat import get_article_detail
  19. class AutoReplyCardsMonitorConst:
  20. # fetch_status
  21. FETCH_INIT_STATUS = 0
  22. FETCH_PROCESSING_STATUS = 1
  23. FETCH_SUCCESS_STATUS = 2
  24. FETCH_FAIL_STATUS = 3
  25. # task_status
  26. INIT_STATUS = 0
  27. PROCESSING_STATUS = 1
  28. SUCCESS_STATUS = 2
  29. FAIL_STATUS = 99
  30. # account_status
  31. VALID_STATUS = 1
  32. INVALID_STATUS = 0
  33. class AutoReplyCardsMonitorUtils(AutoReplyCardsMonitorConst):
  34. @staticmethod
  35. def generate_task_id(task_name, gh_id):
  36. match task_name:
  37. case "follow":
  38. return f"{task_name}_{gh_id}"
  39. case _:
  40. return f"{task_name}_{uuid.uuid4()}"
  41. @staticmethod
  42. def parse_fields(root, fields, default=""):
  43. result = {}
  44. for key, path in fields.items():
  45. elem = root.find(path)
  46. result[key] = elem.text if elem is not None and elem.text else default
  47. return result
  48. def extract_reply_cards(self, msg_type: str, root) -> List[Dict]:
  49. fields = {
  50. "title": ".//title",
  51. "page_path": ".//pagepath",
  52. "mini_program": ".//sourcedisplayname",
  53. "file_id": "appmsg/appattach/cdnthumburl",
  54. "file_size": "appmsg/appattach/cdnthumblength",
  55. "aes_key": "appmsg/appattach/aeskey",
  56. }
  57. data = self.parse_fields(root, fields)
  58. data["msg_type"] = msg_type
  59. results = [data]
  60. return results
  61. def extract_reply_articles(self, msg_type, root) -> Dict:
  62. fields = {
  63. "title": "appmsg/title",
  64. "url": "appmsg/url",
  65. "cover_url": "appmsg/thumburl",
  66. "account_name": "appmsg/sourcedisplayname",
  67. "gh_id": "appmsg/sourceusername",
  68. "desc": "appmsg/des",
  69. }
  70. data = self.parse_fields(root, fields)
  71. data["msg_type"] = msg_type
  72. return data
  73. @staticmethod
  74. def extract_group_reply_articles(msg_type, root) -> List[Dict]:
  75. items = []
  76. for item in root.findall(".//item"):
  77. data = {
  78. "title": item.findtext("title"),
  79. "url": item.findtext("url"),
  80. "cover_url": item.findtext("cover"),
  81. "account_name": item.findtext("sources/source/name"),
  82. "gh_id": "",
  83. "desc": "",
  84. "msg_type": msg_type
  85. }
  86. items.append(data)
  87. return items
  88. # 解析 xml
  89. def extract_callback_xml(self, xml_text):
  90. try:
  91. root = ET.fromstring(xml_text)
  92. msg_type = root.find("appmsg/type").text
  93. match msg_type:
  94. case "5":
  95. # return self.extract_reply_articles(msg_type, root)
  96. return self.extract_group_reply_articles(msg_type, root)
  97. case "33":
  98. return self.extract_reply_cards(msg_type, root)
  99. case "36":
  100. return self.extract_reply_cards(msg_type, root)
  101. case _:
  102. return []
  103. except Exception as e:
  104. print(xml_text)
  105. print(e)
  106. print(traceback.format_exc())
  107. return []
  108. # 解析 page_path
  109. @staticmethod
  110. def extract_page_path(page_path):
  111. # 解析外层 URL
  112. parsed_url = urlparse(page_path)
  113. outer_params = parse_qs(parsed_url.query)
  114. # 取出并解码 jumpPage
  115. jump_page = outer_params.get("jumpPage", [""])[0]
  116. if not jump_page:
  117. return None, None
  118. decoded_jump_page = unquote(jump_page)
  119. # 解析 jumpPage 内层参数
  120. inner_query = urlparse(decoded_jump_page).query
  121. inner_params = parse_qs(inner_query)
  122. video_id = inner_params.get("id", [None])[0]
  123. root_source_id = inner_params.get("rootSourceId", [None])[0]
  124. return video_id, root_source_id
  125. @staticmethod
  126. async def get_cover_url(aes_key, total_size, file_id):
  127. url = "http://api.geweapi.com/gewe/v2/api/message/downloadCdn"
  128. data = {
  129. "appId": "wx_anFlUnezoUynU3SKcqTWk",
  130. "aesKey": aes_key,
  131. "totalSize": total_size,
  132. "fileId": file_id,
  133. "type": "3",
  134. "suffix": "jpg",
  135. }
  136. headers = {
  137. "X-GEWE-TOKEN": "d3fb918f-0f36-4769-b095-410181614231",
  138. "Content-Type": "application/json",
  139. }
  140. async with AsyncHttpClient() as client:
  141. response = await client.post(url, headers=headers, data=json.dumps(data))
  142. return response
  143. @staticmethod
  144. async def get_sample_url(recent_articles):
  145. for article in recent_articles:
  146. link = article["ContentUrl"]
  147. response = await get_article_detail(article_link=link)
  148. if not response:
  149. continue
  150. code = response["code"]
  151. if code == 0 or code == 25006:
  152. return link
  153. return None
  154. # 获取检测的账号 list
  155. @staticmethod
  156. def get_monitor_account_list():
  157. # dt = (datetime.today() - timedelta(days=1)).strftime("%Y%m%d")
  158. week_ago = (datetime.today() - timedelta(days=7)).strftime("%Y-%m-%d %H:%M:%S")
  159. query = f"""
  160. SELECT 公众号名, ghid, count(DISTINCT mid) AS uv
  161. FROM loghubods.opengid_base_data
  162. WHERE dt = MAX_PT('loghubods.opengid_base_data')
  163. AND hotsencetype = 1074
  164. AND usersharedepth = 0
  165. AND channel = '公众号合作-即转-稳定'
  166. AND 点击时间 >= '{week_ago}'
  167. GROUP BY 公众号名, ghid
  168. HAVING uv >= 100
  169. ORDER BY uv DESC
  170. ;
  171. """
  172. result = fetch_from_odps(query)
  173. return result
  174. @staticmethod
  175. def download_and_upload_cover(task_id, index, cover_obj, timeout=10):
  176. try:
  177. cover_url = cover_obj["data"]["fileUrl"]
  178. except (KeyError, TypeError):
  179. print(f"[WARN] Invalid cover_obj structure: {cover_obj}")
  180. return None
  181. file_name = f"{task_id}_{index}.jpg"
  182. save_dir = os.path.join(os.getcwd(), "static")
  183. save_path = os.path.join(save_dir, file_name)
  184. os.makedirs(save_dir, exist_ok=True)
  185. try:
  186. response = requests.get(cover_url, timeout=timeout)
  187. response.raise_for_status()
  188. except RequestException as e:
  189. print(f"[ERROR] Download failed ({cover_url}): {e}")
  190. return None
  191. try:
  192. with open(save_path, "wb") as f:
  193. f.write(response.content)
  194. except OSError as e:
  195. print(f"[ERROR] Write file failed ({save_path}): {e}")
  196. return None
  197. oss_dir = "auto_rely_cards_cover"
  198. oss_key = None
  199. try:
  200. oss_key = upload_to_oss(save_path, f"{oss_dir}/{file_name}")
  201. except Exception as e:
  202. print(f"[ERROR] Upload to OSS failed: {e}")
  203. if oss_key:
  204. try:
  205. os.remove(save_path)
  206. except OSError as e:
  207. print(f"[WARN] Failed to remove temp file {save_path}: {e}")
  208. return oss_key
  209. class AutoReplyCardsMonitorMapper(AutoReplyCardsMonitorUtils):
  210. def __init__(self, pool, log_client):
  211. self.pool = pool
  212. self.log_client = log_client
  213. # 获取自动回复任务结果
  214. async def get_auto_reply_task_result(self, task_id):
  215. query = """
  216. SELECT task_result, task_status, err_msg, update_timestamp
  217. FROM gzh_msg_record
  218. WHERE task_id = %s;
  219. """
  220. return await self.pool.async_fetch(
  221. query=query, params=(task_id,), db_name="aigc"
  222. )
  223. # 查询账号
  224. async def fetch_account_status(self, account_name):
  225. query = """
  226. SELECT partner_name, partner_id, gh_id, status, follow_status
  227. FROM cooperate_accounts
  228. WHERE account_name = %s;
  229. """
  230. return await self.pool.async_fetch(query=query, params=(account_name,))
  231. # 更新账号状态为无效
  232. async def set_account_as_invalid(self, gh_id):
  233. query = """
  234. UPDATE cooperate_accounts SET status = %s WHERE gh_id = %s;
  235. """
  236. await self.pool.async_save(query=query, params=(self.INVALID_STATUS, gh_id))
  237. # 插入AIGC关注公众号任务
  238. async def insert_aigc_follow_account_task(self, task_id, link):
  239. timestamp = int(time.time() * 1000)
  240. query = """
  241. INSERT INTO gzh_msg_record (task_id, biz_type, task_params, create_timestamp, update_timestamp) VALUES (%s, %s, %s, %s, %s);
  242. """
  243. return await self.pool.async_save(
  244. query=query,
  245. params=(task_id, "follow", link, timestamp, timestamp),
  246. db_name="aigc",
  247. )
  248. # 插入AIGC自动回复任务
  249. async def insert_aigc_auto_reply_task(self, task_id, account_name):
  250. timestamp = int(time.time() * 1000)
  251. query = """
  252. INSERT INTO gzh_msg_record (task_id, task_params, create_timestamp, update_timestamp) VALUES (%s, %s, %s, %s);
  253. """
  254. return await self.pool.async_save(
  255. query=query,
  256. params=(task_id, account_name, timestamp, timestamp),
  257. db_name="aigc",
  258. )
  259. # 为账号设置 sample_url
  260. async def set_sample_url(self, gh_id, sample_url):
  261. query = """
  262. UPDATE cooperate_accounts SET sample_link = %s WHERE gh_id = %s;
  263. """
  264. return await self.pool.async_save(query=query, params=(sample_url, gh_id))
  265. # 修改账号的关注状态
  266. async def update_follow_status(self, gh_id, ori_status, new_status):
  267. query = """
  268. UPDATE cooperate_accounts SET follow_status = %s WHERE gh_id = %s and follow_status = %s;
  269. """
  270. return await self.pool.async_save(
  271. query=query, params=(new_status, gh_id, ori_status)
  272. )
  273. # 从 aigc 获取关注结果
  274. async def fetch_follow_account_status(self, gh_id):
  275. query = """
  276. SELECT task_status, err_msg
  277. FROM gzh_msg_record
  278. WHERE task_id = %s;
  279. """
  280. return await self.pool.async_fetch(
  281. query=query, params=(f"follow_{gh_id}",), db_name="aigc"
  282. )
  283. # 创建自动回复任务
  284. async def create_auto_reply_task(self, task_id, gh_id):
  285. query = """
  286. INSERT INTO cooperate_accounts_task (task_id, gh_id) VALUES (%s, %s);
  287. """
  288. return await self.pool.async_save(query=query, params=(task_id, gh_id))
  289. async def update_auto_reply_task_status(
  290. self, task_id, status_type, ori_status, new_status
  291. ):
  292. task_query = """
  293. UPDATE cooperate_accounts_task SET task_status = %s WHERE task_id = %s AND task_status = %s;
  294. """
  295. extract_query = """
  296. UPDATE cooperate_accounts_task SET extract_status = %s WHERE task_id = %s AND extract_status = %s;
  297. """
  298. match status_type:
  299. case "task":
  300. return await self.pool.async_save(
  301. query=task_query, params=(new_status, task_id, ori_status)
  302. )
  303. case "extract":
  304. return await self.pool.async_save(
  305. query=extract_query, params=(new_status, task_id, ori_status)
  306. )
  307. case _:
  308. print("status_type_error")
  309. return None
  310. # 获取正在自动回复卡片的任务 id
  311. async def fetch_auto_replying_tasks(self):
  312. query = """
  313. SELECT task_id FROM cooperate_accounts_task WHERE task_status = %s;
  314. """
  315. return await self.pool.async_fetch(
  316. query=query, params=(self.PROCESSING_STATUS,)
  317. )
  318. # 设置自动回复结果
  319. async def set_auto_reply_result(self, task_id, finish_timestamp, result):
  320. query = """
  321. UPDATE cooperate_accounts_task
  322. SET finish_timestamp = %s, result = %s, task_status = %s
  323. WHERE task_id = %s and task_status = %s;
  324. """
  325. return await self.pool.async_save(
  326. query=query,
  327. params=(
  328. finish_timestamp,
  329. result,
  330. self.SUCCESS_STATUS,
  331. task_id,
  332. self.PROCESSING_STATUS,
  333. ),
  334. )
  335. # 获取带解析的任务
  336. async def get_extract_tasks(self):
  337. query = """
  338. SELECT task_id, result FROM cooperate_accounts_task WHERE extract_status = %s AND task_status = %s;
  339. """
  340. return await self.pool.async_fetch(
  341. query=query, params=(self.INIT_STATUS, self.SUCCESS_STATUS)
  342. )
  343. # 存储解析结果
  344. async def store_extract_result(self, query, row_table):
  345. return await self.pool.async_save(query=query, params=row_table)
  346. # 从 growth 数据库获取账号信息,并且存储在 cooperate_accounts 表中
  347. async def fetch_cooperate_accounts(self, account_name):
  348. fetch_query = """
  349. SELECT t2.name AS partner_name, t2.channel AS partner_id,
  350. t1.name AS account_name, t1.gh_id
  351. FROM content_platform_gzh_account t1 JOIN content_platform_account t2
  352. ON t1.create_account_id = t2.id
  353. WHERE t1.name = %s;
  354. """
  355. fetch_response = await self.pool.async_fetch(
  356. query=fetch_query, db_name="growth", params=(account_name,)
  357. )
  358. if not fetch_response:
  359. return 0
  360. account_detail = fetch_response[0]
  361. save_query = """
  362. INSERT INTO cooperate_accounts (partner_name, partner_id, account_name, gh_id)
  363. VALUES (%s, %s, %s, %s);
  364. """
  365. return await self.pool.async_save(
  366. query=save_query,
  367. params=(
  368. account_detail["partner_name"],
  369. account_detail["partner_id"],
  370. account_detail["account_name"],
  371. account_detail["gh_id"],
  372. ),
  373. )
  374. class AutoReplyCardsMonitor(AutoReplyCardsMonitorMapper):
  375. def __init__(self, pool, log_client):
  376. super().__init__(pool, log_client)
  377. # 存储卡片信息
  378. async def store_card(self, task_id, index, msg_type, xml_obj):
  379. video_id, root_source_id = self.extract_page_path(xml_obj["page_path"])
  380. cover_obj = await self.get_cover_url(
  381. xml_obj["aes_key"], xml_obj["file_size"], xml_obj["file_id"]
  382. )
  383. cover_oss = self.download_and_upload_cover(task_id, index, cover_obj)
  384. query = """
  385. INSERT INTO cooperate_auto_reply_detail
  386. (
  387. task_id, position, msg_type, card_title, card_cover,
  388. video_id, root_source_id, mini_program_name, task_result
  389. ) VALUES
  390. (
  391. %s, %s, %s, %s, %s,
  392. %s, %s, %s, %s
  393. );
  394. """
  395. insert_row = (
  396. task_id,
  397. index,
  398. msg_type,
  399. xml_obj["title"],
  400. cover_oss,
  401. video_id,
  402. root_source_id,
  403. xml_obj["mini_program"],
  404. json.dumps(xml_obj, ensure_ascii=False),
  405. )
  406. await self.store_extract_result(query, insert_row)
  407. # 存储文章信息
  408. async def store_article(self, task_id, index, msg_type, xml_obj):
  409. article_title = xml_obj.get("title")
  410. article_link = xml_obj.get("url")
  411. article_cover = xml_obj.get("cover_url")
  412. article_desc = xml_obj.get("desc")
  413. fetch_fail_status = False
  414. fetch_response = await get_article_detail(
  415. article_link=article_link, is_cache=False, is_count=True
  416. )
  417. if not fetch_response:
  418. fetch_fail_status = True
  419. if not fetch_fail_status:
  420. if fetch_response.get("code") != 0:
  421. fetch_fail_status = True
  422. if fetch_fail_status:
  423. query = """
  424. INSERT INTO cooperate_auto_reply_detail
  425. (task_id, position, msg_type, article_title, article_link, article_cover, article_desc, remark)
  426. VALUES
  427. (%s, %s, %s, %s, %s, %s, %s, %s);
  428. """
  429. remark = "获取文章详情失败"
  430. insert_row = (
  431. task_id,
  432. index,
  433. msg_type,
  434. article_title,
  435. article_link,
  436. article_cover,
  437. article_desc,
  438. remark,
  439. )
  440. await self.store_extract_result(query, insert_row)
  441. else:
  442. article_detail = fetch_response["data"]["data"]
  443. article_text = article_detail["body_text"]
  444. article_images = article_detail["image_url_list"]
  445. read_cnt = article_detail["view_count"]
  446. like_cnt = article_detail["like_count"]
  447. publish_timestamp = int(article_detail["publish_timestamp"] / 1000)
  448. parsed = urlparse(article_detail["content_link"])
  449. params = parse_qs(parsed.query)
  450. wx_sn = params.get("sn", [None])[0]
  451. print(params)
  452. print(wx_sn)
  453. mini_info = article_detail.get("mini_program")
  454. if not mini_info:
  455. # video_id, root_source_id = None, None
  456. query = """
  457. INSERT INTO cooperate_auto_reply_detail
  458. (
  459. task_id, position, msg_type, article_title, article_link,
  460. article_cover, article_text, article_images, article_desc, read_cnt,
  461. like_cnt, publish_timestamp, task_result, wx_sn
  462. ) VALUES
  463. (
  464. %s, %s, %s, %s, %s,
  465. %s, %s, %s, %s, %s,
  466. %s, %s, %s, %s
  467. );
  468. """
  469. values = (
  470. task_id,
  471. index,
  472. msg_type,
  473. article_title,
  474. article_link,
  475. article_cover,
  476. article_text,
  477. json.dumps(article_images, ensure_ascii=False),
  478. article_desc,
  479. read_cnt,
  480. like_cnt,
  481. publish_timestamp,
  482. json.dumps(fetch_response, ensure_ascii=False),
  483. wx_sn,
  484. )
  485. await self.store_extract_result(query, values)
  486. else:
  487. for card_index, i in enumerate(mini_info, 1):
  488. try:
  489. video_id, root_source_id = self.extract_page_path(i["path"])
  490. card_title = i["title"]
  491. card_cover = i["image_url"]
  492. mini_name = i["nike_name"]
  493. query = """
  494. INSERT INTO cooperate_auto_reply_detail
  495. (
  496. task_id, position, msg_type, card_title, card_cover,
  497. video_id, root_source_id, mini_program_name, article_title, article_link,
  498. article_cover, article_text, article_images, article_desc, read_cnt,
  499. like_cnt, publish_timestamp, task_result, wx_sn, card_index
  500. ) VALUES
  501. (
  502. %s, %s, %s, %s, %s,
  503. %s, %s, %s, %s, %s,
  504. %s, %s, %s, %s, %s,
  505. %s, %s, %s, %s, %s
  506. );
  507. """
  508. values = (
  509. task_id,
  510. index,
  511. msg_type,
  512. card_title,
  513. card_cover,
  514. video_id,
  515. root_source_id,
  516. mini_name,
  517. article_title,
  518. article_link,
  519. article_cover,
  520. article_text,
  521. json.dumps(article_images, ensure_ascii=False),
  522. article_desc,
  523. read_cnt,
  524. like_cnt,
  525. publish_timestamp,
  526. json.dumps(fetch_response, ensure_ascii=False),
  527. wx_sn,
  528. card_index,
  529. )
  530. await self.store_extract_result(query, values)
  531. except Exception as e:
  532. print(traceback.format_exc())
  533. print(e)
  534. # 创建单个关注公众号任务
  535. async def create_follow_single_account_task(self, gh_id):
  536. response = await get_article_list_from_account(account_id=gh_id)
  537. code = response.get("code")
  538. match code:
  539. case 0:
  540. recent_articles = response["data"]["data"][0]["AppMsg"]["DetailInfo"]
  541. article_url = await self.get_sample_url(recent_articles)
  542. print(article_url)
  543. if article_url:
  544. await self.set_sample_url(gh_id, article_url)
  545. task_id = self.generate_task_id(task_name="follow", gh_id=gh_id)
  546. affected_rows = await self.insert_aigc_follow_account_task(
  547. task_id, article_url
  548. )
  549. if affected_rows:
  550. await self.update_follow_status(
  551. gh_id, self.INIT_STATUS, self.PROCESSING_STATUS
  552. )
  553. case 25013:
  554. await self.set_account_as_invalid(gh_id)
  555. case _:
  556. pass
  557. # 创建单个账号自动回复任务
  558. async def create_auto_reply_single_account_task(self, gh_id, account_name):
  559. print(account_name)
  560. task_id = self.generate_task_id(task_name="auto_reply", gh_id=gh_id)
  561. # 先插入 task, 再创建自动回复任务
  562. create_row = await self.create_auto_reply_task(task_id, gh_id)
  563. if create_row:
  564. affected_rows = await self.insert_aigc_auto_reply_task(task_id, gh_id)
  565. if not affected_rows:
  566. print("发布任务至 AIGC 失败")
  567. else:
  568. await self.update_auto_reply_task_status(
  569. task_id, "task", self.INIT_STATUS, self.PROCESSING_STATUS
  570. )
  571. else:
  572. print("创建任务至 DB 失败")
  573. async def follow_gzh_task(self):
  574. account_list = self.get_monitor_account_list()
  575. for account in account_list:
  576. try:
  577. fetch_response = await self.fetch_account_status(account.公众号名)
  578. if not fetch_response:
  579. affected_rows = await self.fetch_cooperate_accounts(
  580. account.公众号名
  581. )
  582. if affected_rows:
  583. fetch_response = await self.fetch_account_status(
  584. account.公众号名
  585. )
  586. else:
  587. print(f"系统中无账号,跳过: {account.公众号名}")
  588. continue
  589. account_detail = fetch_response[0]
  590. status = account_detail["status"]
  591. if not status:
  592. print("账号已经迁移或者封禁")
  593. continue
  594. # 新逻辑,无需考虑账号是否关注
  595. await self.create_auto_reply_single_account_task(
  596. account_detail["gh_id"], account.公众号名
  597. )
  598. except Exception as e:
  599. print(f"处理账号{account.公众号名}异常", e)
  600. # 异步获取关注结果
  601. async def get_auto_reply_response(self):
  602. task_list = await self.fetch_auto_replying_tasks()
  603. if not task_list:
  604. print("No processing task yet")
  605. return
  606. for task in tqdm(task_list):
  607. try:
  608. task_id = task["task_id"]
  609. response = await self.get_auto_reply_task_result(task_id)
  610. if not response:
  611. continue
  612. task_status = response[0]["task_status"]
  613. task_result = response[0]["task_result"]
  614. update_timestamp = response[0]["update_timestamp"]
  615. match task_status:
  616. case self.FETCH_FAIL_STATUS:
  617. await self.update_auto_reply_task_status(
  618. task_id, "task", self.PROCESSING_STATUS, self.FAIL_STATUS
  619. )
  620. case self.FETCH_SUCCESS_STATUS:
  621. await self.set_auto_reply_result(
  622. task_id, update_timestamp, task_result
  623. )
  624. case _:
  625. continue
  626. except Exception as e:
  627. print(e)
  628. # 解析单个xml
  629. async def extract_single_xml(self, task):
  630. task_id = task["task_id"]
  631. result = task["result"]
  632. # acquire lock
  633. acquire_lock = await self.update_auto_reply_task_status(
  634. task_id, "extract", self.INIT_STATUS, self.PROCESSING_STATUS
  635. )
  636. if not acquire_lock:
  637. return
  638. try:
  639. # parse xml
  640. xml_list = json.loads(result) if type(result) == str else result
  641. index = 0
  642. for item in xml_list:
  643. xml_obj_list = self.extract_callback_xml(item)
  644. if xml_obj_list:
  645. for xml_obj in xml_obj_list:
  646. index += 1
  647. msg_type = xml_obj.get("msg_type", None)
  648. match msg_type:
  649. case "33":
  650. await self.store_card(task_id, index, msg_type, xml_obj)
  651. case "5":
  652. await self.store_article(task_id, index, msg_type, xml_obj)
  653. case _:
  654. continue
  655. await asyncio.sleep(5)
  656. await self.update_auto_reply_task_status(
  657. task_id, "extract", self.PROCESSING_STATUS, self.SUCCESS_STATUS
  658. )
  659. except Exception as e:
  660. print(e)
  661. print(traceback.format_exc())
  662. await self.update_auto_reply_task_status(
  663. task_id, "extract", self.PROCESSING_STATUS, self.FAIL_STATUS
  664. )
  665. # main function
  666. async def deal(self, task_name):
  667. match task_name:
  668. case "follow_gzh_task":
  669. await self.follow_gzh_task()
  670. case "get_auto_reply_task":
  671. await self.get_auto_reply_response()
  672. case "extract_task":
  673. task_list = await self.get_extract_tasks()
  674. if not task_list:
  675. print("No tasks to extract now")
  676. return
  677. for task in tqdm(task_list, desc="解析任务"):
  678. await self.extract_single_xml(task)
  679. await asyncio.sleep(10)
  680. case "re_extract_task":
  681. pass
  682. case _:
  683. print("task_error")