import asyncio import os import json import time import traceback import uuid import xml.etree.ElementTree as ET from tqdm import tqdm from datetime import datetime, timedelta from urllib.parse import unquote, parse_qs, urlparse import requests from requests.exceptions import RequestException from applications.utils import upload_to_oss from applications.utils import fetch_from_odps from applications.utils import AsyncHttpClient from applications.crawler.wechat import get_article_list_from_account from applications.crawler.wechat import get_article_detail class AutoReplyCardsMonitorConst: # fetch_status FETCH_INIT_STATUS = 0 FETCH_PROCESSING_STATUS = 1 FETCH_SUCCESS_STATUS = 2 FETCH_FAIL_STATUS = 3 # task_status INIT_STATUS = 0 PROCESSING_STATUS = 1 SUCCESS_STATUS = 2 FAIL_STATUS = 99 # account_status VALID_STATUS = 1 INVALID_STATUS = 0 class AutoReplyCardsMonitorUtils(AutoReplyCardsMonitorConst): @staticmethod def generate_task_id(task_name, gh_id): match task_name: case "follow": return f"{task_name}_{gh_id}" case _: return f"{task_name}_{uuid.uuid4()}" @staticmethod def parse_fields(root, fields, default=""): result = {} for key, path in fields.items(): elem = root.find(path) result[key] = elem.text if elem is not None and elem.text else default return result def extract_reply_cards(self, msg_type, root): fields = { "title": ".//title", "page_path": ".//pagepath", "mini_program": ".//sourcedisplayname", "file_id": "appmsg/appattach/cdnthumburl", "file_size": "appmsg/appattach/cdnthumblength", "aes_key": "appmsg/appattach/aeskey", } data = self.parse_fields(root, fields) data["msg_type"] = msg_type return data def extract_reply_articles(self, msg_type, root): fields = { "title": "appmsg/title", "url": "appmsg/url", "cover_url": "appmsg/thumburl", "account_name": "appmsg/sourcedisplayname", "gh_id": "appmsg/sourceusername", "desc": "appmsg/des", } data = self.parse_fields(root, fields) data["msg_type"] = msg_type return data # 解析 xml def extract_callback_xml(self, xml_text): try: root = ET.fromstring(xml_text) msg_type = root.find("appmsg/type").text match msg_type: case "5": return self.extract_reply_articles(msg_type, root) case "33": return self.extract_reply_cards(msg_type, root) case "36": return self.extract_reply_cards(msg_type, root) case _: return {} except Exception as e: print(xml_text) print(e) print(traceback.format_exc()) return {} # 解析 page_path @staticmethod def extract_page_path(page_path): # 解析外层 URL parsed_url = urlparse(page_path) outer_params = parse_qs(parsed_url.query) # 取出并解码 jumpPage jump_page = outer_params.get("jumpPage", [""])[0] if not jump_page: return None, None decoded_jump_page = unquote(jump_page) # 解析 jumpPage 内层参数 inner_query = urlparse(decoded_jump_page).query inner_params = parse_qs(inner_query) video_id = inner_params.get("id", [None])[0] root_source_id = inner_params.get("rootSourceId", [None])[0] return video_id, root_source_id @staticmethod async def get_cover_url(aes_key, total_size, file_id): url = "http://api.geweapi.com/gewe/v2/api/message/downloadCdn" data = { "appId": "wx_anFlUnezoUynU3SKcqTWk", "aesKey": aes_key, "totalSize": total_size, "fileId": file_id, "type": "3", "suffix": "jpg", } headers = { "X-GEWE-TOKEN": "d3fb918f-0f36-4769-b095-410181614231", "Content-Type": "application/json", } async with AsyncHttpClient() as client: response = await client.post(url, headers=headers, data=json.dumps(data)) return response @staticmethod async def get_sample_url(recent_articles): for article in recent_articles: link = article["ContentUrl"] response = await get_article_detail(article_link=link) if not response: continue code = response["code"] if code == 0 or code == 25006: return link return None # 获取检测的账号 list @staticmethod def get_monitor_account_list(): # dt = (datetime.today() - timedelta(days=1)).strftime("%Y%m%d") week_ago = (datetime.today() - timedelta(days=7)).strftime("%Y-%m-%d %H:%M:%S") query = f""" SELECT 公众号名, ghid, count(DISTINCT mid) AS uv FROM loghubods.opengid_base_data WHERE dt = MAX_PT('loghubods.opengid_base_data') AND hotsencetype = 1074 AND usersharedepth = 0 AND channel = '公众号合作-即转-稳定' AND 点击时间 >= '{week_ago}' GROUP BY 公众号名, ghid HAVING uv >= 100 ORDER BY uv DESC ; """ result = fetch_from_odps(query) return result @staticmethod def download_and_upload_cover(task_id, index, cover_obj, timeout=10): try: cover_url = cover_obj["data"]["fileUrl"] except (KeyError, TypeError): print(f"[WARN] Invalid cover_obj structure: {cover_obj}") return None file_name = f"{task_id}_{index}.jpg" save_dir = os.path.join(os.getcwd(), "static") save_path = os.path.join(save_dir, file_name) os.makedirs(save_dir, exist_ok=True) try: response = requests.get(cover_url, timeout=timeout) response.raise_for_status() except RequestException as e: print(f"[ERROR] Download failed ({cover_url}): {e}") return None try: with open(save_path, "wb") as f: f.write(response.content) except OSError as e: print(f"[ERROR] Write file failed ({save_path}): {e}") return None oss_dir = "auto_rely_cards_cover" oss_key = None try: oss_key = upload_to_oss(save_path, f"{oss_dir}/{file_name}") except Exception as e: print(f"[ERROR] Upload to OSS failed: {e}") if oss_key: try: os.remove(save_path) except OSError as e: print(f"[WARN] Failed to remove temp file {save_path}: {e}") return oss_key class AutoReplyCardsMonitorMapper(AutoReplyCardsMonitorUtils): def __init__(self, pool, log_client): self.pool = pool self.log_client = log_client # 获取自动回复任务结果 async def get_auto_reply_task_result(self, task_id): query = """ SELECT task_result, task_status, err_msg, update_timestamp FROM gzh_msg_record WHERE task_id = %s; """ return await self.pool.async_fetch( query=query, params=(task_id,), db_name="aigc" ) # 查询账号 async def fetch_account_status(self, account_name): query = """ SELECT partner_name, partner_id, gh_id, status, follow_status FROM cooperate_accounts WHERE account_name = %s; """ return await self.pool.async_fetch(query=query, params=(account_name,)) # 更新账号状态为无效 async def set_account_as_invalid(self, gh_id): query = """ UPDATE cooperate_accounts SET status = %s WHERE gh_id = %s; """ await self.pool.async_save(query=query, params=(self.INVALID_STATUS, gh_id)) # 插入AIGC关注公众号任务 async def insert_aigc_follow_account_task(self, task_id, link): timestamp = int(time.time() * 1000) query = """ INSERT INTO gzh_msg_record (task_id, biz_type, task_params, create_timestamp, update_timestamp) VALUES (%s, %s, %s, %s, %s); """ return await self.pool.async_save( query=query, params=(task_id, "follow", link, timestamp, timestamp), db_name="aigc", ) # 插入AIGC自动回复任务 async def insert_aigc_auto_reply_task(self, task_id, account_name): timestamp = int(time.time() * 1000) query = """ INSERT INTO gzh_msg_record (task_id, task_params, create_timestamp, update_timestamp) VALUES (%s, %s, %s, %s); """ return await self.pool.async_save( query=query, params=(task_id, account_name, timestamp, timestamp), db_name="aigc", ) # 为账号设置 sample_url async def set_sample_url(self, gh_id, sample_url): query = """ UPDATE cooperate_accounts SET sample_link = %s WHERE gh_id = %s; """ return await self.pool.async_save(query=query, params=(sample_url, gh_id)) # 修改账号的关注状态 async def update_follow_status(self, gh_id, ori_status, new_status): query = """ UPDATE cooperate_accounts SET follow_status = %s WHERE gh_id = %s and follow_status = %s; """ return await self.pool.async_save( query=query, params=(new_status, gh_id, ori_status) ) # 从 aigc 获取关注结果 async def fetch_follow_account_status(self, gh_id): query = """ SELECT task_status, err_msg FROM gzh_msg_record WHERE task_id = %s; """ return await self.pool.async_fetch( query=query, params=(f"follow_{gh_id}",), db_name="aigc" ) # 创建自动回复任务 async def create_auto_reply_task(self, task_id, gh_id): query = """ INSERT INTO cooperate_accounts_task (task_id, gh_id) VALUES (%s, %s); """ return await self.pool.async_save(query=query, params=(task_id, gh_id)) async def update_auto_reply_task_status( self, task_id, status_type, ori_status, new_status ): task_query = """ UPDATE cooperate_accounts_task SET task_status = %s WHERE task_id = %s AND task_status = %s; """ extract_query = """ UPDATE cooperate_accounts_task SET extract_status = %s WHERE task_id = %s AND extract_status = %s; """ match status_type: case "task": return await self.pool.async_save( query=task_query, params=(new_status, task_id, ori_status) ) case "extract": return await self.pool.async_save( query=extract_query, params=(new_status, task_id, ori_status) ) case _: print("status_type_error") return None # 获取正在自动回复卡片的任务 id async def fetch_auto_replying_tasks(self): query = """ SELECT task_id FROM cooperate_accounts_task WHERE task_status = %s; """ return await self.pool.async_fetch( query=query, params=(self.PROCESSING_STATUS,) ) # 设置自动回复结果 async def set_auto_reply_result(self, task_id, finish_timestamp, result): query = """ UPDATE cooperate_accounts_task SET finish_timestamp = %s, result = %s, task_status = %s WHERE task_id = %s and task_status = %s; """ return await self.pool.async_save( query=query, params=( finish_timestamp, result, self.SUCCESS_STATUS, task_id, self.PROCESSING_STATUS, ), ) # 获取带解析的任务 async def get_extract_tasks(self): query = """ SELECT task_id, result FROM cooperate_accounts_task WHERE extract_status = %s AND task_status = %s; """ return await self.pool.async_fetch( query=query, params=(self.INIT_STATUS, self.SUCCESS_STATUS) ) # 存储解析结果 async def store_extract_result(self, query, row_table): return await self.pool.async_save(query=query, params=row_table) # 从 growth 数据库获取账号信息,并且存储在 cooperate_accounts 表中 async def fetch_cooperate_accounts(self, account_name): fetch_query = """ SELECT t2.name AS partner_name, t2.channel AS partner_id, t1.name AS account_name, t1.gh_id FROM content_platform_gzh_account t1 JOIN content_platform_account t2 ON t1.create_account_id = t2.id WHERE t1.name = %s; """ fetch_response = await self.pool.async_fetch( query=fetch_query, db_name="growth", params=(account_name,) ) if not fetch_response: return 0 account_detail = fetch_response[0] save_query = """ INSERT INTO cooperate_accounts (partner_name, partner_id, account_name, gh_id) VALUES (%s, %s, %s, %s); """ return await self.pool.async_save( query=save_query, params=( account_detail["partner_name"], account_detail["partner_id"], account_detail["account_name"], account_detail["gh_id"], ), ) class AutoReplyCardsMonitor(AutoReplyCardsMonitorMapper): def __init__(self, pool, log_client): super().__init__(pool, log_client) # 存储卡片信息 async def store_card(self, task_id, index, msg_type, xml_obj): video_id, root_source_id = self.extract_page_path(xml_obj["page_path"]) cover_obj = await self.get_cover_url( xml_obj["aes_key"], xml_obj["file_size"], xml_obj["file_id"] ) cover_oss = self.download_and_upload_cover(task_id, index, cover_obj) query = """ INSERT INTO cooperate_auto_reply_detail ( task_id, position, msg_type, card_title, card_cover, video_id, root_source_id, mini_program_name, task_result ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s ); """ insert_row = ( task_id, index, msg_type, xml_obj["title"], cover_oss, video_id, root_source_id, xml_obj["mini_program"], json.dumps(xml_obj, ensure_ascii=False), ) await self.store_extract_result(query, insert_row) # 存储文章信息 async def store_article(self, task_id, index, msg_type, xml_obj): article_title = xml_obj.get("title") article_link = xml_obj.get("url") article_cover = xml_obj.get("cover_url") article_desc = xml_obj.get("desc") fetch_fail_status = False fetch_response = await get_article_detail( article_link=article_link, is_cache=False, is_count=True ) if not fetch_response: fetch_fail_status = True if not fetch_fail_status: if fetch_response.get("code") != 0: fetch_fail_status = True if fetch_fail_status: query = """ INSERT INTO cooperate_auto_reply_detail (task_id, position, msg_type, article_title, article_link, article_cover, article_desc, remark) VALUES (%s, %s, %s, %s, %s, %s, %s, %s); """ remark = "获取文章详情失败" insert_row = ( task_id, index, msg_type, article_title, article_link, article_cover, article_desc, remark, ) await self.store_extract_result(query, insert_row) else: print(article_link) article_detail = fetch_response["data"]["data"] article_text = article_detail["body_text"] article_images = article_detail["image_url_list"] read_cnt = article_detail["view_count"] like_cnt = article_detail["like_count"] publish_timestamp = int(article_detail["publish_timestamp"] / 1000) parsed = urlparse(article_detail["content_link"]) params = parse_qs(parsed.query) wx_sn = params.get("sn", [None])[0] print(params) print(wx_sn) mini_info = article_detail.get("mini_program") if not mini_info: # video_id, root_source_id = None, None query = """ INSERT INTO cooperate_auto_reply_detail ( task_id, position, msg_type, article_title, article_link, article_cover, article_text, article_images, article_desc, read_cnt, like_cnt, publish_timestamp, task_result, wx_sn ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s ); """ values = ( task_id, index, msg_type, article_title, article_link, article_cover, article_text, json.dumps(article_images, ensure_ascii=False), article_desc, read_cnt, like_cnt, publish_timestamp, json.dumps(fetch_response, ensure_ascii=False), wx_sn, ) await self.store_extract_result(query, values) else: for card_index, i in enumerate(mini_info, 1): try: video_id, root_source_id = self.extract_page_path(i["path"]) card_title = i["title"] card_cover = i["image_url"] mini_name = i["nike_name"] query = """ INSERT INTO cooperate_auto_reply_detail ( task_id, position, msg_type, card_title, card_cover, video_id, root_source_id, mini_program_name, article_title, article_link, article_cover, article_text, article_images, article_desc, read_cnt, like_cnt, publish_timestamp, task_result, wx_sn, card_index ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s ); """ values = ( task_id, index, msg_type, card_title, card_cover, video_id, root_source_id, mini_name, article_title, article_link, article_cover, article_text, json.dumps(article_images, ensure_ascii=False), article_desc, read_cnt, like_cnt, publish_timestamp, json.dumps(fetch_response, ensure_ascii=False), wx_sn, card_index, ) await self.store_extract_result(query, values) except Exception as e: print(traceback.format_exc()) print(e) # 创建单个关注公众号任务 async def create_follow_single_account_task(self, gh_id): response = await get_article_list_from_account(account_id=gh_id) code = response.get("code") match code: case 0: recent_articles = response["data"]["data"][0]["AppMsg"]["DetailInfo"] article_url = await self.get_sample_url(recent_articles) print(article_url) if article_url: await self.set_sample_url(gh_id, article_url) task_id = self.generate_task_id(task_name="follow", gh_id=gh_id) affected_rows = await self.insert_aigc_follow_account_task( task_id, article_url ) if affected_rows: await self.update_follow_status( gh_id, self.INIT_STATUS, self.PROCESSING_STATUS ) case 25013: await self.set_account_as_invalid(gh_id) case _: pass # 创建单个账号自动回复任务 async def create_auto_reply_single_account_task(self, gh_id, account_name): print(account_name) task_id = self.generate_task_id(task_name="auto_reply", gh_id=gh_id) # 先插入 task, 再创建自动回复任务 create_row = await self.create_auto_reply_task(task_id, gh_id) if create_row: affected_rows = await self.insert_aigc_auto_reply_task(task_id, gh_id) if not affected_rows: print("发布任务至 AIGC 失败") else: await self.update_auto_reply_task_status( task_id, "task", self.INIT_STATUS, self.PROCESSING_STATUS ) else: print("创建任务至 DB 失败") async def follow_gzh_task(self): account_list = self.get_monitor_account_list() for account in account_list: try: fetch_response = await self.fetch_account_status(account.公众号名) if not fetch_response: affected_rows = await self.fetch_cooperate_accounts( account.公众号名 ) if affected_rows: fetch_response = await self.fetch_account_status( account.公众号名 ) else: print(f"系统中无账号,跳过: {account.公众号名}") continue account_detail = fetch_response[0] status = account_detail["status"] if not status: print("账号已经迁移或者封禁") continue # 新逻辑,无需考虑账号是否关注 await self.create_auto_reply_single_account_task( account_detail["gh_id"], account.公众号名 ) except Exception as e: print(f"处理账号{account.公众号名}异常", e) # 异步获取关注结果 async def get_auto_reply_response(self): task_list = await self.fetch_auto_replying_tasks() if not task_list: print("No processing task yet") return for task in tqdm(task_list): try: task_id = task["task_id"] response = await self.get_auto_reply_task_result(task_id) if not response: continue task_status = response[0]["task_status"] task_result = response[0]["task_result"] update_timestamp = response[0]["update_timestamp"] match task_status: case self.FETCH_FAIL_STATUS: await self.update_auto_reply_task_status( task_id, "task", self.PROCESSING_STATUS, self.FAIL_STATUS ) case self.FETCH_SUCCESS_STATUS: await self.set_auto_reply_result( task_id, update_timestamp, task_result ) case _: continue except Exception as e: print(e) # 解析单个xml async def extract_single_xml(self, task): task_id = task["task_id"] result = task["result"] # acquire lock acquire_lock = await self.update_auto_reply_task_status( task_id, "extract", self.INIT_STATUS, self.PROCESSING_STATUS ) if not acquire_lock: return try: # parse xml xml_list = json.loads(result) if type(result) == str else result for index, item in enumerate(xml_list, 1): xml_obj = self.extract_callback_xml(item) if xml_obj: msg_type = xml_obj.get("msg_type", None) match msg_type: case "33": await self.store_card(task_id, index, msg_type, xml_obj) case "5": await self.store_article(task_id, index, msg_type, xml_obj) case _: continue await asyncio.sleep(5) await self.update_auto_reply_task_status( task_id, "extract", self.PROCESSING_STATUS, self.SUCCESS_STATUS ) except Exception as e: print(e) print(traceback.format_exc()) await self.update_auto_reply_task_status( task_id, "extract", self.PROCESSING_STATUS, self.FAIL_STATUS ) # main function async def deal(self, task_name): match task_name: case "follow_gzh_task": await self.follow_gzh_task() case "get_auto_reply_task": await self.get_auto_reply_response() case "extract_task": task_list = await self.get_extract_tasks() if not task_list: print("No tasks to extract now") return for task in tqdm(task_list, desc="解析任务"): await self.extract_single_xml(task) await asyncio.sleep(10) case "re_extract_task": pass case _: print("task_error")