| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352 |
- import json
- import time
- import uuid
- import xml.etree.ElementTree as ET
- from datetime import datetime, timedelta
- from urllib.parse import unquote, parse_qs
- from applications.utils import fetch_from_odps
- from applications.utils import AsyncHttpClient
- from applications.crawler.wechat import get_article_list_from_account
- from applications.crawler.wechat import get_article_detail
- class AutoReplyCardsMonitorConst:
- # fetch_status
- FETCH_INIT_STATUS = 0
- FETCH_PROCESSING_STATUS = 1
- FETCH_SUCCESS_STATUS = 2
- FETCH_FAIL_STATUS = 3
- # task_status
- INIT_STATUS = 0
- PROCESSING_STATUS = 1
- SUCCESS_STATUS = 2
- FAIL_STATUS = 99
- # account_status
- VALID_STATUS = 1
- INVALID_STATUS = 0
- class AutoReplyCardsMonitorUtils(AutoReplyCardsMonitorConst):
- @staticmethod
- def generate_task_id(task_name, gh_id):
- match task_name:
- case "follow":
- return f"{task_name}_{gh_id}"
- case _:
- return f"{task_name}_{uuid.uuid4()}"
- @staticmethod
- def extract_reply_cards(msg_type, root):
- page_path = root.find(".//pagepath").text
- card_title = root.find(".//title").text
- mini_program = root.find(".//sourcedisplayname").text
- file_id = root.find("appmsg/appattach/cdnthumburl").text
- ase_key = root.find("appmsg/appattach/aeskey").text
- file_size = root.find("appmsg/appattach/cdnthumblength").text
- return {
- "title": card_title,
- "page_path": page_path,
- "msg_type": msg_type,
- "mini_program": mini_program,
- "file_id": file_id,
- "file_size": file_size,
- "ase_key": ase_key,
- }
- @staticmethod
- def extract_reply_articles(msg_type, root):
- title = root.find("appmsg/title").text
- url = root.find("appmsg/url").text
- cover_url = root.find("appmsg/thumburl").text
- account_name = root.find("appmsg/sourcedisplayname").text
- gh_id = root.find("appmsg/sourceusername").text
- desc = root.find("appmsg/des").text
- return {
- "msg_type": msg_type,
- "title": title,
- "url": url,
- "cover_url": cover_url,
- "account_name": account_name,
- "gh_id": gh_id,
- "desc": desc,
- }
- # 解析 xml
- @staticmethod
- def extract_callback_xml(self, xml_text):
- try:
- root = ET.fromstring(xml_text)
- msg_type = root.find("appmsg/type").text
- match msg_type:
- case "5":
- return self.extract_reply_articles(msg_type, root)
- case "33":
- return self.extract_reply_cards(msg_type, root)
- case "36":
- return self.extract_reply_cards(msg_type, root)
- case _:
- return {
- "msg_type": msg_type,
- }
- except Exception as e:
- print(e)
- return {}
- # 解析 page_path
- @staticmethod
- def extract_page_path(page_path):
- pass
- @staticmethod
- async def get_cover_url(aes_key, total_size, file_id):
- url = "http://api.geweapi.com/gewe/v2/api/message/downloadCdn"
- data = {
- "appId": "wx_anFlUnezoUynU3SKcqTWk",
- "aesKey": aes_key,
- "totalSize": total_size,
- "fileId": file_id,
- "type": "3",
- "suffix": "jpg",
- }
- headers = {
- "X-GEWE-TOKEN": "d3fb918f-0f36-4769-b095-410181614231",
- "Content-Type": "application/json",
- }
- async with AsyncHttpClient() as client:
- response = await client.post(url, headers=headers, data=json.dumps(data))
- return response
- @staticmethod
- async def get_sample_url(recent_articles):
- for article in recent_articles:
- link = article["ContentUrl"]
- print(link)
- response = await get_article_detail(article_link=link)
- print(response)
- if not response:
- continue
- code = response["code"]
- if code == 0 or code == 25006:
- return link
- return None
- # 获取检测的账号 list
- @staticmethod
- def get_monitor_account_list():
- yesterday = (datetime.today() - timedelta(days=1)).strftime("%Y%m%d")
- query = f"""
- SELECT 公众号名, ghid, count(DISTINCT mid) AS uv
- FROM loghubods.opengid_base_data
- WHERE dt = {yesterday}
- AND hotsencetype = 1074
- AND usersharedepth = 0
- AND channel = '公众号合作-即转-稳定'
- GROUP BY 公众号名, ghid
- HAVING uv > 100
- ORDER BY uv DESC
- ;
- """
- result = fetch_from_odps(query)
- return result
- # 下载封面图片
- async def download_cover(self, url, file_path):
- pass
- # 上传封面至 oss
- async def upload_cover(self, file_path):
- pass
- class AutoReplyCardsMonitorMapper(AutoReplyCardsMonitorUtils):
- def __init__(self, pool, log_client):
- self.pool = pool
- self.log_client = log_client
- # 获取关注公众号任务结果
- async def get_follow_account_task_result(self, task_id):
- pass
- # 创建自动回复任务
- async def create_auto_reply_task(self):
- pass
- # 获取自动回复任务结果
- async def get_auto_reply_task_result(self, task_id):
- query = """
- SELECT task_result, task_status, err_msg,from_unixtime(update_timestamp / 1000) AS update_time
- FROM gzh_msg_record
- WHERE task_id = %s;
- """
- return await self.pool.async_fetch(
- query=query, params=(task_id,), db_name="aigc"
- )
- # 获取关注公众号任务列表
- async def get_follow_account_task_list(self):
- pass
- # 获取自动回复任务列表
- async def get_auto_reply_task_list(self):
- pass
- # 插入待关注公众号
- async def insert_accounts_task(self, account_name, gh_id):
- pass
- # 查询账号
- async def fetch_account_status(self, account_name):
- query = """
- SELECT partner_name, partner_id, gh_id, status, follow_status
- FROM cooperate_accounts
- WHERE account_name = %s;
- """
- return await self.pool.async_fetch(
- query=query, params=(account_name,)
- )
- # 更新账号状态为无效
- async def set_account_as_invalid(self, gh_id):
- query = """
- UPDATE cooperate_accounts SET status = %s WHERE gh_id = %s;
- """
- await self.pool.async_save(query=query, params=(self.INVALID_STATUS, gh_id))
- # 插入AIGC关注公众号任务
- async def insert_aigc_follow_account_task(self, task_id, link):
- timestamp = int(time.time() * 1000)
- query = """
- INSERT INTO gzh_msg_record (task_id, biz_type, task_params, create_timestamp, update_timestamp) VALUES (%s, %s, %s, %s, %s);
- """
- return await self.pool.async_save(query=query, params=(task_id, "follow", link, timestamp, timestamp), db_name="aigc")
- # 插入AIGC自动回复任务
- async def insert_aigc_auto_reply_task(self, task_id, account_name):
- timestamp = int(time.time() * 1000)
- query = """
- INSERT INTO gzh_msg_record (task_id, task_params, create_timestamp, update_timestamp) VALUES (%s, %s, %s, %s);
- """
- return await self.pool.async_save(query=query, params=(task_id, account_name, timestamp, timestamp), db_name="aigc")
- # 为账号设置 sample_url
- async def set_sample_url(self, gh_id, sample_url):
- query = """
- UPDATE cooperate_accounts SET sample_link = %s WHERE gh_id = %s;
- """
- return await self.pool.async_save(query=query, params=(sample_url, gh_id))
- # 修改账号的关注状态
- async def update_follow_status(self, gh_id, ori_status, new_status):
- query = """
- UPDATE cooperate_accounts SET follow_status = %s WHERE gh_id = %s and follow_status = %s;
- """
- return await self.pool.async_save(query=query, params=(new_status, gh_id, ori_status))
- # 从 aigc 获取关注结果
- async def fetch_follow_account_status(self, gh_id):
- query = """
- SELECT task_status, err_msg
- FROM gzh_msg_record
- WHERE task_id = %s;
- """
- return await self.pool.async_fetch(
- query=query, params=(f"follow_{gh_id}",), db_name="aigc"
- )
- class AutoReplyCardsMonitor(AutoReplyCardsMonitorMapper):
- def __init__(self, pool, log_client):
- super().__init__(pool, log_client)
- # 创建单个关注公众号任务
- async def create_follow_account_task(self, gh_id):
- response = await get_article_list_from_account(account_id=gh_id)
- code = response.get("code")
- match code:
- case 0:
- recent_articles = response['data']['data'][0]['AppMsg']['DetailInfo']
- article_url = await self.get_sample_url(recent_articles)
- print(article_url)
- if article_url:
- await self.set_sample_url(gh_id, article_url)
- task_id = self.generate_task_id(task_name="follow", gh_id=gh_id)
- affected_rows = await self.insert_aigc_follow_account_task(task_id, article_url)
- if affected_rows:
- await self.update_follow_status(gh_id, self.INIT_STATUS, self.PROCESSING_STATUS)
- case 25013:
- await self.set_account_as_invalid(gh_id)
- case _:
- pass
- async def follow_gzh_task(self):
- account_list = self.get_monitor_account_list()
- for account in account_list:
- try:
- fetch_response = await self.fetch_account_status(account.公众号名)
- if not fetch_response:
- print("账号不存在", account)
- # todo 没有 gh_id, 暂时无法存储账号
- # affected_rows =await self.insert_accounts_task(account.公众号名, account.ghid)
- # if affected_rows:
- # await self.create_follow_account_task(account.ghid)
- else:
- account_detail = fetch_response[0]
- status = account_detail["status"]
- follow_status = account_detail["follow_status"]
- if not status:
- print("账号已经迁移或者封禁")
- continue
- match follow_status:
- case self.INIT_STATUS:
- await self.create_follow_account_task(account_detail["gh_id"])
- case self.PROCESSING_STATUS:
- fetch_response = await self.fetch_follow_account_status(account_detail["gh_id"])
- if not fetch_response:
- await self.update_follow_status(account_detail["gh_id"], self.PROCESSING_STATUS, self.INIT_STATUS)
- task_status = fetch_response[0]["task_status"]
- match task_status:
- case self.FETCH_INIT_STATUS:
- continue
- case self.FETCH_PROCESSING_STATUS:
- continue
- case self.FETCH_SUCCESS_STATUS:
- await self.update_follow_status(account_detail["gh_id"], self.PROCESSING_STATUS, self.SUCCESS_STATUS)
- case self.FETCH_FAIL_STATUS:
- await self.update_follow_status(account_detail["gh_id"], self.PROCESSING_STATUS, self.FAIL_STATUS)
- case self.SUCCESS_STATUS:
- continue
- case _:
- print(f"{account.公众号名}账号状态异常")
- except Exception as e:
- print(f"处理账号{account.公众号名}异常", e)
- # main function
- async def deal(self, task_name):
- match task_name:
- case "follow_gzh_task":
- await self.follow_gzh_task()
- case _:
- print("task_error")
|