import json import time import uuid import xml.etree.ElementTree as ET from datetime import datetime, timedelta from urllib.parse import unquote, parse_qs from applications.utils import fetch_from_odps from applications.utils import AsyncHttpClient from applications.crawler.wechat import get_article_list_from_account from applications.crawler.wechat import get_article_detail class AutoReplyCardsMonitorConst: # fetch_status FETCH_INIT_STATUS = 0 FETCH_PROCESSING_STATUS = 1 FETCH_SUCCESS_STATUS = 2 FETCH_FAIL_STATUS = 3 # task_status INIT_STATUS = 0 PROCESSING_STATUS = 1 SUCCESS_STATUS = 2 FAIL_STATUS = 99 # account_status VALID_STATUS = 1 INVALID_STATUS = 0 class AutoReplyCardsMonitorUtils(AutoReplyCardsMonitorConst): @staticmethod def generate_task_id(task_name, gh_id): match task_name: case "follow": return f"{task_name}_{gh_id}" case _: return f"{task_name}_{uuid.uuid4()}" @staticmethod def extract_reply_cards(msg_type, root): page_path = root.find(".//pagepath").text card_title = root.find(".//title").text mini_program = root.find(".//sourcedisplayname").text file_id = root.find("appmsg/appattach/cdnthumburl").text ase_key = root.find("appmsg/appattach/aeskey").text file_size = root.find("appmsg/appattach/cdnthumblength").text return { "title": card_title, "page_path": page_path, "msg_type": msg_type, "mini_program": mini_program, "file_id": file_id, "file_size": file_size, "ase_key": ase_key, } @staticmethod def extract_reply_articles(msg_type, root): title = root.find("appmsg/title").text url = root.find("appmsg/url").text cover_url = root.find("appmsg/thumburl").text account_name = root.find("appmsg/sourcedisplayname").text gh_id = root.find("appmsg/sourceusername").text desc = root.find("appmsg/des").text return { "msg_type": msg_type, "title": title, "url": url, "cover_url": cover_url, "account_name": account_name, "gh_id": gh_id, "desc": desc, } # 解析 xml @staticmethod def extract_callback_xml(self, xml_text): try: root = ET.fromstring(xml_text) msg_type = root.find("appmsg/type").text match msg_type: case "5": return self.extract_reply_articles(msg_type, root) case "33": return self.extract_reply_cards(msg_type, root) case "36": return self.extract_reply_cards(msg_type, root) case _: return { "msg_type": msg_type, } except Exception as e: print(e) return {} # 解析 page_path @staticmethod def extract_page_path(page_path): pass @staticmethod async def get_cover_url(aes_key, total_size, file_id): url = "http://api.geweapi.com/gewe/v2/api/message/downloadCdn" data = { "appId": "wx_anFlUnezoUynU3SKcqTWk", "aesKey": aes_key, "totalSize": total_size, "fileId": file_id, "type": "3", "suffix": "jpg", } headers = { "X-GEWE-TOKEN": "d3fb918f-0f36-4769-b095-410181614231", "Content-Type": "application/json", } async with AsyncHttpClient() as client: response = await client.post(url, headers=headers, data=json.dumps(data)) return response @staticmethod async def get_sample_url(recent_articles): for article in recent_articles: link = article["ContentUrl"] print(link) response = await get_article_detail(article_link=link) print(response) if not response: continue code = response["code"] if code == 0 or code == 25006: return link return None # 获取检测的账号 list @staticmethod def get_monitor_account_list(): yesterday = (datetime.today() - timedelta(days=1)).strftime("%Y%m%d") query = f""" SELECT 公众号名, ghid, count(DISTINCT mid) AS uv FROM loghubods.opengid_base_data WHERE dt = {yesterday} AND hotsencetype = 1074 AND usersharedepth = 0 AND channel = '公众号合作-即转-稳定' GROUP BY 公众号名, ghid HAVING uv > 100 ORDER BY uv DESC LIMIT 10 ; """ result = fetch_from_odps(query) return result # 下载封面图片 async def download_cover(self, url, file_path): pass # 上传封面至 oss async def upload_cover(self, file_path): pass class AutoReplyCardsMonitorMapper(AutoReplyCardsMonitorUtils): def __init__(self, pool, log_client): self.pool = pool self.log_client = log_client # 获取自动回复任务结果 async def get_auto_reply_task_result(self, task_id): query = """ SELECT task_result, task_status, err_msg, update_timestamp FROM gzh_msg_record WHERE task_id = %s; """ return await self.pool.async_fetch( query=query, params=(task_id,), db_name="aigc" ) # 查询账号 async def fetch_account_status(self, account_name): query = """ SELECT partner_name, partner_id, gh_id, status, follow_status FROM cooperate_accounts WHERE account_name = %s; """ return await self.pool.async_fetch(query=query, params=(account_name,)) # 更新账号状态为无效 async def set_account_as_invalid(self, gh_id): query = """ UPDATE cooperate_accounts SET status = %s WHERE gh_id = %s; """ await self.pool.async_save(query=query, params=(self.INVALID_STATUS, gh_id)) # 插入AIGC关注公众号任务 async def insert_aigc_follow_account_task(self, task_id, link): timestamp = int(time.time() * 1000) query = """ INSERT INTO gzh_msg_record (task_id, biz_type, task_params, create_timestamp, update_timestamp) VALUES (%s, %s, %s, %s, %s); """ return await self.pool.async_save( query=query, params=(task_id, "follow", link, timestamp, timestamp), db_name="aigc", ) # 插入AIGC自动回复任务 async def insert_aigc_auto_reply_task(self, task_id, account_name): timestamp = int(time.time() * 1000) query = """ INSERT INTO gzh_msg_record (task_id, task_params, create_timestamp, update_timestamp) VALUES (%s, %s, %s, %s); """ return await self.pool.async_save( query=query, params=(task_id, account_name, timestamp, timestamp), db_name="aigc", ) # 为账号设置 sample_url async def set_sample_url(self, gh_id, sample_url): query = """ UPDATE cooperate_accounts SET sample_link = %s WHERE gh_id = %s; """ return await self.pool.async_save(query=query, params=(sample_url, gh_id)) # 修改账号的关注状态 async def update_follow_status(self, gh_id, ori_status, new_status): query = """ UPDATE cooperate_accounts SET follow_status = %s WHERE gh_id = %s and follow_status = %s; """ return await self.pool.async_save( query=query, params=(new_status, gh_id, ori_status) ) # 从 aigc 获取关注结果 async def fetch_follow_account_status(self, gh_id): query = """ SELECT task_status, err_msg FROM gzh_msg_record WHERE task_id = %s; """ return await self.pool.async_fetch( query=query, params=(f"follow_{gh_id}",), db_name="aigc" ) # 创建自动回复任务 async def create_auto_reply_task(self, task_id, gh_id): query = """ INSERT INTO cooperate_accounts_task (task_id, gh_id) VALUES (%s, %s); """ return await self.pool.async_save(query=query, params=(task_id, gh_id)) async def update_auto_reply_task_status( self, task_id, status_type, ori_status, new_status ): task_query = """ UPDATE cooperate_accounts_task SET task_status = %s WHERE task_id = %s AND task_status = %s; """ extract_query = """ UPDATE cooperate_accounts_task SET extract_status = %s WHERE task_id = %s AND extract_status = %s; """ match status_type: case "task": return await self.pool.async_save( query=task_query, params=(new_status, task_id, ori_status) ) case "extract": return await self.pool.async_save( query=extract_query, params=(new_status, task_id, ori_status) ) case _: print("status_type_error") return None # 获取正在自动回复卡片的任务 id async def fetch_auto_replying_tasks(self): query = """ SELECT task_id FROM cooperate_accounts_task WHERE task_status = %s; """ return await self.pool.async_fetch( query=query, params=(self.PROCESSING_STATUS,) ) # 设置自动回复结果 async def set_auto_reply_result(self, task_id, finish_timestamp, result): query = """ UPDATE cooperate_accounts_task SET finish_timestamp = %s, result = %s, task_status = %s WHERE task_id = %s and task_status = %s; """ return await self.pool.async_save( query=query, params=( finish_timestamp, result, self.SUCCESS_STATUS, task_id, self.PROCESSING_STATUS, ), ) class AutoReplyCardsMonitor(AutoReplyCardsMonitorMapper): def __init__(self, pool, log_client): super().__init__(pool, log_client) # 创建单个关注公众号任务 async def create_follow_single_account_task(self, gh_id): response = await get_article_list_from_account(account_id=gh_id) code = response.get("code") match code: case 0: recent_articles = response["data"]["data"][0]["AppMsg"]["DetailInfo"] article_url = await self.get_sample_url(recent_articles) print(article_url) if article_url: await self.set_sample_url(gh_id, article_url) task_id = self.generate_task_id(task_name="follow", gh_id=gh_id) affected_rows = await self.insert_aigc_follow_account_task( task_id, article_url ) if affected_rows: await self.update_follow_status( gh_id, self.INIT_STATUS, self.PROCESSING_STATUS ) case 25013: await self.set_account_as_invalid(gh_id) case _: pass # 创建单个账号自动回复任务 async def create_auto_reply_single_account_task(self, gh_id, account_name): task_id = self.generate_task_id(task_name="auto_reply", gh_id=gh_id) # 先插入 task, 再创建自动回复任务 create_row = await self.create_auto_reply_task(task_id, gh_id) if create_row: affected_rows = await self.insert_aigc_auto_reply_task( task_id, account_name ) if not affected_rows: print("发布任务至 AIGC 失败") else: await self.update_auto_reply_task_status( task_id, "task", self.INIT_STATUS, self.PROCESSING_STATUS ) else: print("创建任务至 DB 失败") async def follow_gzh_task(self): account_list = self.get_monitor_account_list() for account in account_list: try: fetch_response = await self.fetch_account_status(account.公众号名) if not fetch_response: print("账号不存在", account) # todo 没有 gh_id, 暂时无法存储账号 # affected_rows =await self.insert_accounts_task(account.公众号名, account.ghid) # if affected_rows: # await self.create_follow_account_task(account.ghid) else: account_detail = fetch_response[0] status = account_detail["status"] follow_status = account_detail["follow_status"] if not status: print("账号已经迁移或者封禁") continue match follow_status: case self.INIT_STATUS: await self.create_follow_single_account_task( account_detail["gh_id"] ) case self.PROCESSING_STATUS: fetch_response = await self.fetch_follow_account_status( account_detail["gh_id"] ) if not fetch_response: await self.update_follow_status( account_detail["gh_id"], self.PROCESSING_STATUS, self.INIT_STATUS, ) task_status = fetch_response[0]["task_status"] match task_status: case self.FETCH_INIT_STATUS: continue case self.FETCH_PROCESSING_STATUS: continue case self.FETCH_SUCCESS_STATUS: await self.update_follow_status( account_detail["gh_id"], self.PROCESSING_STATUS, self.SUCCESS_STATUS, ) case self.FETCH_FAIL_STATUS: await self.update_follow_status( account_detail["gh_id"], self.PROCESSING_STATUS, self.FAIL_STATUS, ) case self.SUCCESS_STATUS: # 账号已经关注,创建获取自动回复任务 await self.create_auto_reply_single_account_task( account_detail["gh_id"], account.公众号名 ) case _: print(f"{account.公众号名}账号状态异常") except Exception as e: print(f"处理账号{account.公众号名}异常", e) # 异步获取关注结果 async def get_auto_reply_response(self): task_list = await self.fetch_auto_replying_tasks() if not task_list: return for task in task_list: try: task_id = task["task_id"] response = await self.get_auto_reply_task_result(task_id) if not response: continue task_status = response[0]["task_status"] task_result = response[0]["task_result"] update_timestamp = response[0]["update_timestamp"] match task_status: case self.FETCH_FAIL_STATUS: await self.update_auto_reply_task_status( task_id, "task", self.PROCESSING_STATUS, self.FAIL_STATUS ) case self.FETCH_SUCCESS_STATUS: await self.set_auto_reply_result( task_id, update_timestamp, task_result ) case _: continue except Exception as e: print(e) # 解析 xml 并且更新数据 async def extract_task(self): pass # main function async def deal(self, task_name): match task_name: case "follow_gzh_task": await self.follow_gzh_task() case "get_auto_reply_task": await self.get_auto_reply_response() case "extract_task": await self.extract_task() case _: print("task_error")