import asyncio import os import json import time import traceback import uuid from typing import List, Dict import xml.etree.ElementTree as ET from tqdm import tqdm from datetime import datetime, timedelta from urllib.parse import unquote, parse_qs, urlparse import requests from requests.exceptions import RequestException from app.infra.shared.tools import upload_to_oss from app.infra.shared.tools import fetch_from_odps from app.infra.shared import AsyncHttpClient from app.infra.crawler.wechat import get_article_list_from_account from app.infra.crawler.wechat import get_article_detail class AutoReplyCardsMonitorConst: # fetch_status FETCH_INIT_STATUS = 0 FETCH_PROCESSING_STATUS = 1 FETCH_SUCCESS_STATUS = 2 FETCH_FAIL_STATUS = 3 # task_status INIT_STATUS = 0 PROCESSING_STATUS = 1 SUCCESS_STATUS = 2 FAIL_STATUS = 99 # account_status VALID_STATUS = 1 INVALID_STATUS = 0 class AutoReplyCardsMonitorUtils(AutoReplyCardsMonitorConst): @staticmethod def generate_task_id(task_name, gh_id): match task_name: case "follow": return f"{task_name}_{gh_id}" case _: return f"{task_name}_{uuid.uuid4()}" @staticmethod def parse_fields(root, fields, default=""): result = {} for key, path in fields.items(): elem = root.find(path) result[key] = elem.text if elem is not None and elem.text else default return result def extract_reply_cards(self, msg_type: str, root) -> List[Dict]: fields = { "title": ".//title", "page_path": ".//pagepath", "mini_program": ".//sourcedisplayname", "file_id": "appmsg/appattach/cdnthumburl", "file_size": "appmsg/appattach/cdnthumblength", "aes_key": "appmsg/appattach/aeskey", } data = self.parse_fields(root, fields) data["msg_type"] = msg_type results = [data] return results def extract_reply_articles(self, msg_type, root) -> Dict: fields = { "title": "appmsg/title", "url": "appmsg/url", "cover_url": "appmsg/thumburl", "account_name": "appmsg/sourcedisplayname", "gh_id": "appmsg/sourceusername", "desc": "appmsg/des", } data = self.parse_fields(root, fields) data["msg_type"] = msg_type return data @staticmethod def extract_group_reply_articles(msg_type, root) -> List[Dict]: items = [] for item in root.findall(".//item"): data = { "title": item.findtext("title"), "url": item.findtext("url"), "cover_url": item.findtext("cover"), "account_name": item.findtext("sources/source/name"), "gh_id": "", "desc": "", "msg_type": msg_type } items.append(data) return items # 解析 xml def extract_callback_xml(self, xml_text): try: root = ET.fromstring(xml_text) msg_type = root.find("appmsg/type").text match msg_type: case "5": # return self.extract_reply_articles(msg_type, root) return self.extract_group_reply_articles(msg_type, root) case "33": return self.extract_reply_cards(msg_type, root) case "36": return self.extract_reply_cards(msg_type, root) case _: return [] except Exception as e: print(xml_text) print(e) print(traceback.format_exc()) return [] # 解析 page_path @staticmethod def extract_page_path(page_path): # 解析外层 URL parsed_url = urlparse(page_path) outer_params = parse_qs(parsed_url.query) # 取出并解码 jumpPage jump_page = outer_params.get("jumpPage", [""])[0] if not jump_page: return None, None decoded_jump_page = unquote(jump_page) # 解析 jumpPage 内层参数 inner_query = urlparse(decoded_jump_page).query inner_params = parse_qs(inner_query) video_id = inner_params.get("id", [None])[0] root_source_id = inner_params.get("rootSourceId", [None])[0] return video_id, root_source_id @staticmethod async def get_cover_url(aes_key, total_size, file_id): url = "http://api.geweapi.com/gewe/v2/api/message/downloadCdn" data = { "appId": "wx_anFlUnezoUynU3SKcqTWk", "aesKey": aes_key, "totalSize": total_size, "fileId": file_id, "type": "3", "suffix": "jpg", } headers = { "X-GEWE-TOKEN": "d3fb918f-0f36-4769-b095-410181614231", "Content-Type": "application/json", } async with AsyncHttpClient() as client: response = await client.post(url, headers=headers, data=json.dumps(data)) return response @staticmethod async def get_sample_url(recent_articles): for article in recent_articles: link = article["ContentUrl"] response = await get_article_detail(article_link=link) if not response: continue code = response["code"] if code == 0 or code == 25006: return link return None # 获取检测的账号 list @staticmethod def get_monitor_account_list(): # dt = (datetime.today() - timedelta(days=1)).strftime("%Y%m%d") week_ago = (datetime.today() - timedelta(days=7)).strftime("%Y-%m-%d %H:%M:%S") query = f""" SELECT 公众号名, ghid, count(DISTINCT mid) AS uv FROM loghubods.opengid_base_data WHERE dt = MAX_PT('loghubods.opengid_base_data') AND hotsencetype = 1074 AND usersharedepth = 0 AND channel = '公众号合作-即转-稳定' AND 点击时间 >= '{week_ago}' GROUP BY 公众号名, ghid HAVING uv >= 100 ORDER BY uv DESC ; """ result = fetch_from_odps(query) return result @staticmethod def download_and_upload_cover(task_id, index, cover_obj, timeout=10): try: cover_url = cover_obj["data"]["fileUrl"] except (KeyError, TypeError): print(f"[WARN] Invalid cover_obj structure: {cover_obj}") return None file_name = f"{task_id}_{index}.jpg" save_dir = os.path.join(os.getcwd(), "static") save_path = os.path.join(save_dir, file_name) os.makedirs(save_dir, exist_ok=True) try: response = requests.get(cover_url, timeout=timeout) response.raise_for_status() except RequestException as e: print(f"[ERROR] Download failed ({cover_url}): {e}") return None try: with open(save_path, "wb") as f: f.write(response.content) except OSError as e: print(f"[ERROR] Write file failed ({save_path}): {e}") return None oss_dir = "auto_rely_cards_cover" oss_key = None try: oss_key = upload_to_oss(save_path, f"{oss_dir}/{file_name}") except Exception as e: print(f"[ERROR] Upload to OSS failed: {e}") if oss_key: try: os.remove(save_path) except OSError as e: print(f"[WARN] Failed to remove temp file {save_path}: {e}") return oss_key class AutoReplyCardsMonitorMapper(AutoReplyCardsMonitorUtils): def __init__(self, pool, log_client): self.pool = pool self.log_client = log_client # 获取自动回复任务结果 async def get_auto_reply_task_result(self, task_id): query = """ SELECT task_result, task_status, err_msg, update_timestamp FROM gzh_msg_record WHERE task_id = %s; """ return await self.pool.async_fetch( query=query, params=(task_id,), db_name="aigc" ) # 查询账号 async def fetch_account_status(self, account_name): query = """ SELECT partner_name, partner_id, gh_id, status, follow_status FROM cooperate_accounts WHERE account_name = %s; """ return await self.pool.async_fetch(query=query, params=(account_name,)) # 更新账号状态为无效 async def set_account_as_invalid(self, gh_id): query = """ UPDATE cooperate_accounts SET status = %s WHERE gh_id = %s; """ await self.pool.async_save(query=query, params=(self.INVALID_STATUS, gh_id)) # 插入AIGC关注公众号任务 async def insert_aigc_follow_account_task(self, task_id, link): timestamp = int(time.time() * 1000) query = """ INSERT INTO gzh_msg_record (task_id, biz_type, task_params, create_timestamp, update_timestamp) VALUES (%s, %s, %s, %s, %s); """ return await self.pool.async_save( query=query, params=(task_id, "follow", link, timestamp, timestamp), db_name="aigc", ) # 插入AIGC自动回复任务 async def insert_aigc_auto_reply_task(self, task_id, account_name): timestamp = int(time.time() * 1000) query = """ INSERT INTO gzh_msg_record (task_id, task_params, create_timestamp, update_timestamp) VALUES (%s, %s, %s, %s); """ return await self.pool.async_save( query=query, params=(task_id, account_name, timestamp, timestamp), db_name="aigc", ) # 为账号设置 sample_url async def set_sample_url(self, gh_id, sample_url): query = """ UPDATE cooperate_accounts SET sample_link = %s WHERE gh_id = %s; """ return await self.pool.async_save(query=query, params=(sample_url, gh_id)) # 修改账号的关注状态 async def update_follow_status(self, gh_id, ori_status, new_status): query = """ UPDATE cooperate_accounts SET follow_status = %s WHERE gh_id = %s and follow_status = %s; """ return await self.pool.async_save( query=query, params=(new_status, gh_id, ori_status) ) # 从 aigc 获取关注结果 async def fetch_follow_account_status(self, gh_id): query = """ SELECT task_status, err_msg FROM gzh_msg_record WHERE task_id = %s; """ return await self.pool.async_fetch( query=query, params=(f"follow_{gh_id}",), db_name="aigc" ) # 创建自动回复任务 async def create_auto_reply_task(self, task_id, gh_id): query = """ INSERT INTO cooperate_accounts_task (task_id, gh_id) VALUES (%s, %s); """ return await self.pool.async_save(query=query, params=(task_id, gh_id)) async def update_auto_reply_task_status( self, task_id, status_type, ori_status, new_status ): task_query = """ UPDATE cooperate_accounts_task SET task_status = %s WHERE task_id = %s AND task_status = %s; """ extract_query = """ UPDATE cooperate_accounts_task SET extract_status = %s WHERE task_id = %s AND extract_status = %s; """ match status_type: case "task": return await self.pool.async_save( query=task_query, params=(new_status, task_id, ori_status) ) case "extract": return await self.pool.async_save( query=extract_query, params=(new_status, task_id, ori_status) ) case _: print("status_type_error") return None # 获取正在自动回复卡片的任务 id async def fetch_auto_replying_tasks(self): query = """ SELECT task_id FROM cooperate_accounts_task WHERE task_status = %s; """ return await self.pool.async_fetch( query=query, params=(self.PROCESSING_STATUS,) ) # 设置自动回复结果 async def set_auto_reply_result(self, task_id, finish_timestamp, result): query = """ UPDATE cooperate_accounts_task SET finish_timestamp = %s, result = %s, task_status = %s WHERE task_id = %s and task_status = %s; """ return await self.pool.async_save( query=query, params=( finish_timestamp, result, self.SUCCESS_STATUS, task_id, self.PROCESSING_STATUS, ), ) # 获取带解析的任务 async def get_extract_tasks(self): query = """ SELECT task_id, result FROM cooperate_accounts_task WHERE extract_status = %s AND task_status = %s; """ return await self.pool.async_fetch( query=query, params=(self.INIT_STATUS, self.SUCCESS_STATUS) ) # 存储解析结果 async def store_extract_result(self, query, row_table): return await self.pool.async_save(query=query, params=row_table) # 从 growth 数据库获取账号信息,并且存储在 cooperate_accounts 表中 async def fetch_cooperate_accounts(self, account_name): fetch_query = """ SELECT t2.name AS partner_name, t2.channel AS partner_id, t1.name AS account_name, t1.gh_id FROM content_platform_gzh_account t1 JOIN content_platform_account t2 ON t1.create_account_id = t2.id WHERE t1.name = %s; """ fetch_response = await self.pool.async_fetch( query=fetch_query, db_name="growth", params=(account_name,) ) if not fetch_response: return 0 account_detail = fetch_response[0] save_query = """ INSERT INTO cooperate_accounts (partner_name, partner_id, account_name, gh_id) VALUES (%s, %s, %s, %s); """ return await self.pool.async_save( query=save_query, params=( account_detail["partner_name"], account_detail["partner_id"], account_detail["account_name"], account_detail["gh_id"], ), ) class AutoReplyCardsMonitor(AutoReplyCardsMonitorMapper): def __init__(self, pool, log_client): super().__init__(pool, log_client) # 存储卡片信息 async def store_card(self, task_id, index, msg_type, xml_obj): video_id, root_source_id = self.extract_page_path(xml_obj["page_path"]) cover_obj = await self.get_cover_url( xml_obj["aes_key"], xml_obj["file_size"], xml_obj["file_id"] ) cover_oss = self.download_and_upload_cover(task_id, index, cover_obj) query = """ INSERT INTO cooperate_auto_reply_detail ( task_id, position, msg_type, card_title, card_cover, video_id, root_source_id, mini_program_name, task_result ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s ); """ insert_row = ( task_id, index, msg_type, xml_obj["title"], cover_oss, video_id, root_source_id, xml_obj["mini_program"], json.dumps(xml_obj, ensure_ascii=False), ) await self.store_extract_result(query, insert_row) # 存储文章信息 async def store_article(self, task_id, index, msg_type, xml_obj): article_title = xml_obj.get("title") article_link = xml_obj.get("url") article_cover = xml_obj.get("cover_url") article_desc = xml_obj.get("desc") fetch_fail_status = False fetch_response = await get_article_detail( article_link=article_link, is_cache=False, is_count=True ) if not fetch_response: fetch_fail_status = True if not fetch_fail_status: if fetch_response.get("code") != 0: fetch_fail_status = True if fetch_fail_status: query = """ INSERT INTO cooperate_auto_reply_detail (task_id, position, msg_type, article_title, article_link, article_cover, article_desc, remark) VALUES (%s, %s, %s, %s, %s, %s, %s, %s); """ remark = "获取文章详情失败" insert_row = ( task_id, index, msg_type, article_title, article_link, article_cover, article_desc, remark, ) await self.store_extract_result(query, insert_row) else: article_detail = fetch_response["data"]["data"] article_text = article_detail["body_text"] article_images = article_detail["image_url_list"] read_cnt = article_detail["view_count"] like_cnt = article_detail["like_count"] publish_timestamp = int(article_detail["publish_timestamp"] / 1000) parsed = urlparse(article_detail["content_link"]) params = parse_qs(parsed.query) wx_sn = params.get("sn", [None])[0] print(params) print(wx_sn) mini_info = article_detail.get("mini_program") if not mini_info: # video_id, root_source_id = None, None query = """ INSERT INTO cooperate_auto_reply_detail ( task_id, position, msg_type, article_title, article_link, article_cover, article_text, article_images, article_desc, read_cnt, like_cnt, publish_timestamp, task_result, wx_sn ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s ); """ values = ( task_id, index, msg_type, article_title, article_link, article_cover, article_text, json.dumps(article_images, ensure_ascii=False), article_desc, read_cnt, like_cnt, publish_timestamp, json.dumps(fetch_response, ensure_ascii=False), wx_sn, ) await self.store_extract_result(query, values) else: for card_index, i in enumerate(mini_info, 1): try: video_id, root_source_id = self.extract_page_path(i["path"]) card_title = i["title"] card_cover = i["image_url"] mini_name = i["nike_name"] query = """ INSERT INTO cooperate_auto_reply_detail ( task_id, position, msg_type, card_title, card_cover, video_id, root_source_id, mini_program_name, article_title, article_link, article_cover, article_text, article_images, article_desc, read_cnt, like_cnt, publish_timestamp, task_result, wx_sn, card_index ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s ); """ values = ( task_id, index, msg_type, card_title, card_cover, video_id, root_source_id, mini_name, article_title, article_link, article_cover, article_text, json.dumps(article_images, ensure_ascii=False), article_desc, read_cnt, like_cnt, publish_timestamp, json.dumps(fetch_response, ensure_ascii=False), wx_sn, card_index, ) await self.store_extract_result(query, values) except Exception as e: print(traceback.format_exc()) print(e) # 创建单个关注公众号任务 async def create_follow_single_account_task(self, gh_id): response = await get_article_list_from_account(account_id=gh_id) code = response.get("code") match code: case 0: recent_articles = response["data"]["data"][0]["AppMsg"]["DetailInfo"] article_url = await self.get_sample_url(recent_articles) print(article_url) if article_url: await self.set_sample_url(gh_id, article_url) task_id = self.generate_task_id(task_name="follow", gh_id=gh_id) affected_rows = await self.insert_aigc_follow_account_task( task_id, article_url ) if affected_rows: await self.update_follow_status( gh_id, self.INIT_STATUS, self.PROCESSING_STATUS ) case 25013: await self.set_account_as_invalid(gh_id) case _: pass # 创建单个账号自动回复任务 async def create_auto_reply_single_account_task(self, gh_id, account_name): print(account_name) task_id = self.generate_task_id(task_name="auto_reply", gh_id=gh_id) # 先插入 task, 再创建自动回复任务 create_row = await self.create_auto_reply_task(task_id, gh_id) if create_row: affected_rows = await self.insert_aigc_auto_reply_task(task_id, gh_id) if not affected_rows: print("发布任务至 AIGC 失败") else: await self.update_auto_reply_task_status( task_id, "task", self.INIT_STATUS, self.PROCESSING_STATUS ) else: print("创建任务至 DB 失败") async def follow_gzh_task(self): account_list = self.get_monitor_account_list() for account in account_list: try: fetch_response = await self.fetch_account_status(account.公众号名) if not fetch_response: affected_rows = await self.fetch_cooperate_accounts( account.公众号名 ) if affected_rows: fetch_response = await self.fetch_account_status( account.公众号名 ) else: print(f"系统中无账号,跳过: {account.公众号名}") continue account_detail = fetch_response[0] status = account_detail["status"] if not status: print("账号已经迁移或者封禁") continue # 新逻辑,无需考虑账号是否关注 await self.create_auto_reply_single_account_task( account_detail["gh_id"], account.公众号名 ) except Exception as e: print(f"处理账号{account.公众号名}异常", e) # 异步获取关注结果 async def get_auto_reply_response(self): task_list = await self.fetch_auto_replying_tasks() if not task_list: print("No processing task yet") return for task in tqdm(task_list): try: task_id = task["task_id"] response = await self.get_auto_reply_task_result(task_id) if not response: continue task_status = response[0]["task_status"] task_result = response[0]["task_result"] update_timestamp = response[0]["update_timestamp"] match task_status: case self.FETCH_FAIL_STATUS: await self.update_auto_reply_task_status( task_id, "task", self.PROCESSING_STATUS, self.FAIL_STATUS ) case self.FETCH_SUCCESS_STATUS: await self.set_auto_reply_result( task_id, update_timestamp, task_result ) case _: continue except Exception as e: print(e) # 解析单个xml async def extract_single_xml(self, task): task_id = task["task_id"] result = task["result"] # acquire lock acquire_lock = await self.update_auto_reply_task_status( task_id, "extract", self.INIT_STATUS, self.PROCESSING_STATUS ) if not acquire_lock: return try: # parse xml xml_list = json.loads(result) if type(result) == str else result index = 0 for item in xml_list: xml_obj_list = self.extract_callback_xml(item) if xml_obj_list: for xml_obj in xml_obj_list: index += 1 msg_type = xml_obj.get("msg_type", None) match msg_type: case "33": await self.store_card(task_id, index, msg_type, xml_obj) case "5": await self.store_article(task_id, index, msg_type, xml_obj) case _: continue await asyncio.sleep(5) await self.update_auto_reply_task_status( task_id, "extract", self.PROCESSING_STATUS, self.SUCCESS_STATUS ) except Exception as e: print(e) print(traceback.format_exc()) await self.update_auto_reply_task_status( task_id, "extract", self.PROCESSING_STATUS, self.FAIL_STATUS ) # main function async def deal(self, task_name): match task_name: case "follow_gzh_task": await self.follow_gzh_task() case "get_auto_reply_task": await self.get_auto_reply_response() case "extract_task": task_list = await self.get_extract_tasks() if not task_list: print("No tasks to extract now") return for task in tqdm(task_list, desc="解析任务"): await self.extract_single_xml(task) await asyncio.sleep(10) case "re_extract_task": pass case _: print("task_error")