auto_reply_cards_monitor.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480
  1. import json
  2. import time
  3. import uuid
  4. import xml.etree.ElementTree as ET
  5. from datetime import datetime, timedelta
  6. from urllib.parse import unquote, parse_qs
  7. from applications.utils import fetch_from_odps
  8. from applications.utils import AsyncHttpClient
  9. from applications.crawler.wechat import get_article_list_from_account
  10. from applications.crawler.wechat import get_article_detail
  11. class AutoReplyCardsMonitorConst:
  12. # fetch_status
  13. FETCH_INIT_STATUS = 0
  14. FETCH_PROCESSING_STATUS = 1
  15. FETCH_SUCCESS_STATUS = 2
  16. FETCH_FAIL_STATUS = 3
  17. # task_status
  18. INIT_STATUS = 0
  19. PROCESSING_STATUS = 1
  20. SUCCESS_STATUS = 2
  21. FAIL_STATUS = 99
  22. # account_status
  23. VALID_STATUS = 1
  24. INVALID_STATUS = 0
  25. class AutoReplyCardsMonitorUtils(AutoReplyCardsMonitorConst):
  26. @staticmethod
  27. def generate_task_id(task_name, gh_id):
  28. match task_name:
  29. case "follow":
  30. return f"{task_name}_{gh_id}"
  31. case _:
  32. return f"{task_name}_{uuid.uuid4()}"
  33. @staticmethod
  34. def extract_reply_cards(msg_type, root):
  35. page_path = root.find(".//pagepath").text
  36. card_title = root.find(".//title").text
  37. mini_program = root.find(".//sourcedisplayname").text
  38. file_id = root.find("appmsg/appattach/cdnthumburl").text
  39. ase_key = root.find("appmsg/appattach/aeskey").text
  40. file_size = root.find("appmsg/appattach/cdnthumblength").text
  41. return {
  42. "title": card_title,
  43. "page_path": page_path,
  44. "msg_type": msg_type,
  45. "mini_program": mini_program,
  46. "file_id": file_id,
  47. "file_size": file_size,
  48. "ase_key": ase_key,
  49. }
  50. @staticmethod
  51. def extract_reply_articles(msg_type, root):
  52. title = root.find("appmsg/title").text
  53. url = root.find("appmsg/url").text
  54. cover_url = root.find("appmsg/thumburl").text
  55. account_name = root.find("appmsg/sourcedisplayname").text
  56. gh_id = root.find("appmsg/sourceusername").text
  57. desc = root.find("appmsg/des").text
  58. return {
  59. "msg_type": msg_type,
  60. "title": title,
  61. "url": url,
  62. "cover_url": cover_url,
  63. "account_name": account_name,
  64. "gh_id": gh_id,
  65. "desc": desc,
  66. }
  67. # 解析 xml
  68. @staticmethod
  69. def extract_callback_xml(self, xml_text):
  70. try:
  71. root = ET.fromstring(xml_text)
  72. msg_type = root.find("appmsg/type").text
  73. match msg_type:
  74. case "5":
  75. return self.extract_reply_articles(msg_type, root)
  76. case "33":
  77. return self.extract_reply_cards(msg_type, root)
  78. case "36":
  79. return self.extract_reply_cards(msg_type, root)
  80. case _:
  81. return {
  82. "msg_type": msg_type,
  83. }
  84. except Exception as e:
  85. print(e)
  86. return {}
  87. # 解析 page_path
  88. @staticmethod
  89. def extract_page_path(page_path):
  90. pass
  91. @staticmethod
  92. async def get_cover_url(aes_key, total_size, file_id):
  93. url = "http://api.geweapi.com/gewe/v2/api/message/downloadCdn"
  94. data = {
  95. "appId": "wx_anFlUnezoUynU3SKcqTWk",
  96. "aesKey": aes_key,
  97. "totalSize": total_size,
  98. "fileId": file_id,
  99. "type": "3",
  100. "suffix": "jpg",
  101. }
  102. headers = {
  103. "X-GEWE-TOKEN": "d3fb918f-0f36-4769-b095-410181614231",
  104. "Content-Type": "application/json",
  105. }
  106. async with AsyncHttpClient() as client:
  107. response = await client.post(url, headers=headers, data=json.dumps(data))
  108. return response
  109. @staticmethod
  110. async def get_sample_url(recent_articles):
  111. for article in recent_articles:
  112. link = article["ContentUrl"]
  113. print(link)
  114. response = await get_article_detail(article_link=link)
  115. print(response)
  116. if not response:
  117. continue
  118. code = response["code"]
  119. if code == 0 or code == 25006:
  120. return link
  121. return None
  122. # 获取检测的账号 list
  123. @staticmethod
  124. def get_monitor_account_list():
  125. yesterday = (datetime.today() - timedelta(days=1)).strftime("%Y%m%d")
  126. query = f"""
  127. SELECT 公众号名, ghid, count(DISTINCT mid) AS uv
  128. FROM loghubods.opengid_base_data
  129. WHERE dt = {yesterday}
  130. AND hotsencetype = 1074
  131. AND usersharedepth = 0
  132. AND channel = '公众号合作-即转-稳定'
  133. GROUP BY 公众号名, ghid
  134. HAVING uv > 100
  135. ORDER BY uv DESC
  136. LIMIT 10
  137. ;
  138. """
  139. result = fetch_from_odps(query)
  140. return result
  141. # 下载封面图片
  142. async def download_cover(self, url, file_path):
  143. pass
  144. # 上传封面至 oss
  145. async def upload_cover(self, file_path):
  146. pass
  147. class AutoReplyCardsMonitorMapper(AutoReplyCardsMonitorUtils):
  148. def __init__(self, pool, log_client):
  149. self.pool = pool
  150. self.log_client = log_client
  151. # 获取自动回复任务结果
  152. async def get_auto_reply_task_result(self, task_id):
  153. query = """
  154. SELECT task_result, task_status, err_msg, update_timestamp
  155. FROM gzh_msg_record
  156. WHERE task_id = %s;
  157. """
  158. return await self.pool.async_fetch(
  159. query=query, params=(task_id,), db_name="aigc"
  160. )
  161. # 查询账号
  162. async def fetch_account_status(self, account_name):
  163. query = """
  164. SELECT partner_name, partner_id, gh_id, status, follow_status
  165. FROM cooperate_accounts
  166. WHERE account_name = %s;
  167. """
  168. return await self.pool.async_fetch(query=query, params=(account_name,))
  169. # 更新账号状态为无效
  170. async def set_account_as_invalid(self, gh_id):
  171. query = """
  172. UPDATE cooperate_accounts SET status = %s WHERE gh_id = %s;
  173. """
  174. await self.pool.async_save(query=query, params=(self.INVALID_STATUS, gh_id))
  175. # 插入AIGC关注公众号任务
  176. async def insert_aigc_follow_account_task(self, task_id, link):
  177. timestamp = int(time.time() * 1000)
  178. query = """
  179. INSERT INTO gzh_msg_record (task_id, biz_type, task_params, create_timestamp, update_timestamp) VALUES (%s, %s, %s, %s, %s);
  180. """
  181. return await self.pool.async_save(
  182. query=query,
  183. params=(task_id, "follow", link, timestamp, timestamp),
  184. db_name="aigc",
  185. )
  186. # 插入AIGC自动回复任务
  187. async def insert_aigc_auto_reply_task(self, task_id, account_name):
  188. timestamp = int(time.time() * 1000)
  189. query = """
  190. INSERT INTO gzh_msg_record (task_id, task_params, create_timestamp, update_timestamp) VALUES (%s, %s, %s, %s);
  191. """
  192. return await self.pool.async_save(
  193. query=query,
  194. params=(task_id, account_name, timestamp, timestamp),
  195. db_name="aigc",
  196. )
  197. # 为账号设置 sample_url
  198. async def set_sample_url(self, gh_id, sample_url):
  199. query = """
  200. UPDATE cooperate_accounts SET sample_link = %s WHERE gh_id = %s;
  201. """
  202. return await self.pool.async_save(query=query, params=(sample_url, gh_id))
  203. # 修改账号的关注状态
  204. async def update_follow_status(self, gh_id, ori_status, new_status):
  205. query = """
  206. UPDATE cooperate_accounts SET follow_status = %s WHERE gh_id = %s and follow_status = %s;
  207. """
  208. return await self.pool.async_save(
  209. query=query, params=(new_status, gh_id, ori_status)
  210. )
  211. # 从 aigc 获取关注结果
  212. async def fetch_follow_account_status(self, gh_id):
  213. query = """
  214. SELECT task_status, err_msg
  215. FROM gzh_msg_record
  216. WHERE task_id = %s;
  217. """
  218. return await self.pool.async_fetch(
  219. query=query, params=(f"follow_{gh_id}",), db_name="aigc"
  220. )
  221. # 创建自动回复任务
  222. async def create_auto_reply_task(self, task_id, gh_id):
  223. query = """
  224. INSERT INTO cooperate_accounts_task (task_id, gh_id) VALUES (%s, %s);
  225. """
  226. return await self.pool.async_save(query=query, params=(task_id, gh_id))
  227. async def update_auto_reply_task_status(
  228. self, task_id, status_type, ori_status, new_status
  229. ):
  230. task_query = """
  231. UPDATE cooperate_accounts_task SET task_status = %s WHERE task_id = %s AND task_status = %s;
  232. """
  233. extract_query = """
  234. UPDATE cooperate_accounts_task SET extract_status = %s WHERE task_id = %s AND extract_status = %s;
  235. """
  236. match status_type:
  237. case "task":
  238. return await self.pool.async_save(
  239. query=task_query, params=(new_status, task_id, ori_status)
  240. )
  241. case "extract":
  242. return await self.pool.async_save(
  243. query=extract_query, params=(new_status, task_id, ori_status)
  244. )
  245. case _:
  246. print("status_type_error")
  247. return None
  248. # 获取正在自动回复卡片的任务 id
  249. async def fetch_auto_replying_tasks(self):
  250. query = """
  251. SELECT task_id FROM cooperate_accounts_task WHERE task_status = %s;
  252. """
  253. return await self.pool.async_fetch(
  254. query=query, params=(self.PROCESSING_STATUS,)
  255. )
  256. # 设置自动回复结果
  257. async def set_auto_reply_result(self, task_id, finish_timestamp, result):
  258. query = """
  259. UPDATE cooperate_accounts_task
  260. SET finish_timestamp = %s, result = %s, task_status = %s
  261. WHERE task_id = %s and task_status = %s;
  262. """
  263. return await self.pool.async_save(
  264. query=query,
  265. params=(
  266. finish_timestamp,
  267. result,
  268. self.SUCCESS_STATUS,
  269. task_id,
  270. self.PROCESSING_STATUS,
  271. ),
  272. )
  273. class AutoReplyCardsMonitor(AutoReplyCardsMonitorMapper):
  274. def __init__(self, pool, log_client):
  275. super().__init__(pool, log_client)
  276. # 创建单个关注公众号任务
  277. async def create_follow_single_account_task(self, gh_id):
  278. response = await get_article_list_from_account(account_id=gh_id)
  279. code = response.get("code")
  280. match code:
  281. case 0:
  282. recent_articles = response["data"]["data"][0]["AppMsg"]["DetailInfo"]
  283. article_url = await self.get_sample_url(recent_articles)
  284. print(article_url)
  285. if article_url:
  286. await self.set_sample_url(gh_id, article_url)
  287. task_id = self.generate_task_id(task_name="follow", gh_id=gh_id)
  288. affected_rows = await self.insert_aigc_follow_account_task(
  289. task_id, article_url
  290. )
  291. if affected_rows:
  292. await self.update_follow_status(
  293. gh_id, self.INIT_STATUS, self.PROCESSING_STATUS
  294. )
  295. case 25013:
  296. await self.set_account_as_invalid(gh_id)
  297. case _:
  298. pass
  299. # 创建单个账号自动回复任务
  300. async def create_auto_reply_single_account_task(self, gh_id, account_name):
  301. task_id = self.generate_task_id(task_name="auto_reply", gh_id=gh_id)
  302. # 先插入 task, 再创建自动回复任务
  303. create_row = await self.create_auto_reply_task(task_id, gh_id)
  304. if create_row:
  305. affected_rows = await self.insert_aigc_auto_reply_task(
  306. task_id, account_name
  307. )
  308. if not affected_rows:
  309. print("发布任务至 AIGC 失败")
  310. else:
  311. await self.update_auto_reply_task_status(
  312. task_id, "task", self.INIT_STATUS, self.PROCESSING_STATUS
  313. )
  314. else:
  315. print("创建任务至 DB 失败")
  316. async def follow_gzh_task(self):
  317. account_list = self.get_monitor_account_list()
  318. for account in account_list:
  319. try:
  320. fetch_response = await self.fetch_account_status(account.公众号名)
  321. if not fetch_response:
  322. print("账号不存在", account)
  323. # todo 没有 gh_id, 暂时无法存储账号
  324. # affected_rows =await self.insert_accounts_task(account.公众号名, account.ghid)
  325. # if affected_rows:
  326. # await self.create_follow_account_task(account.ghid)
  327. else:
  328. account_detail = fetch_response[0]
  329. status = account_detail["status"]
  330. follow_status = account_detail["follow_status"]
  331. if not status:
  332. print("账号已经迁移或者封禁")
  333. continue
  334. match follow_status:
  335. case self.INIT_STATUS:
  336. await self.create_follow_single_account_task(
  337. account_detail["gh_id"]
  338. )
  339. case self.PROCESSING_STATUS:
  340. fetch_response = await self.fetch_follow_account_status(
  341. account_detail["gh_id"]
  342. )
  343. if not fetch_response:
  344. await self.update_follow_status(
  345. account_detail["gh_id"],
  346. self.PROCESSING_STATUS,
  347. self.INIT_STATUS,
  348. )
  349. task_status = fetch_response[0]["task_status"]
  350. match task_status:
  351. case self.FETCH_INIT_STATUS:
  352. continue
  353. case self.FETCH_PROCESSING_STATUS:
  354. continue
  355. case self.FETCH_SUCCESS_STATUS:
  356. await self.update_follow_status(
  357. account_detail["gh_id"],
  358. self.PROCESSING_STATUS,
  359. self.SUCCESS_STATUS,
  360. )
  361. case self.FETCH_FAIL_STATUS:
  362. await self.update_follow_status(
  363. account_detail["gh_id"],
  364. self.PROCESSING_STATUS,
  365. self.FAIL_STATUS,
  366. )
  367. case self.SUCCESS_STATUS:
  368. # 账号已经关注,创建获取自动回复任务
  369. await self.create_auto_reply_single_account_task(
  370. account_detail["gh_id"], account.公众号名
  371. )
  372. case _:
  373. print(f"{account.公众号名}账号状态异常")
  374. except Exception as e:
  375. print(f"处理账号{account.公众号名}异常", e)
  376. # 异步获取关注结果
  377. async def get_auto_reply_response(self):
  378. task_list = await self.fetch_auto_replying_tasks()
  379. if not task_list:
  380. return
  381. for task in task_list:
  382. try:
  383. task_id = task["task_id"]
  384. response = await self.get_auto_reply_task_result(task_id)
  385. if not response:
  386. continue
  387. task_status = response[0]["task_status"]
  388. task_result = response[0]["task_result"]
  389. update_timestamp = response[0]["update_timestamp"]
  390. match task_status:
  391. case self.FETCH_FAIL_STATUS:
  392. await self.update_auto_reply_task_status(
  393. task_id, "task", self.PROCESSING_STATUS, self.FAIL_STATUS
  394. )
  395. case self.FETCH_SUCCESS_STATUS:
  396. await self.set_auto_reply_result(
  397. task_id, update_timestamp, task_result
  398. )
  399. case _:
  400. continue
  401. except Exception as e:
  402. print(e)
  403. # 解析 xml 并且更新数据
  404. async def extract_task(self):
  405. pass
  406. # main function
  407. async def deal(self, task_name):
  408. match task_name:
  409. case "follow_gzh_task":
  410. await self.follow_gzh_task()
  411. case "get_auto_reply_task":
  412. await self.get_auto_reply_response()
  413. case "extract_task":
  414. await self.extract_task()
  415. case _:
  416. print("task_error")