|
@@ -0,0 +1,379 @@
|
|
|
|
|
+import json
|
|
|
|
|
+import time
|
|
|
|
|
+import uuid
|
|
|
|
|
+import xml.etree.ElementTree as ET
|
|
|
|
|
+
|
|
|
|
|
+from datetime import datetime, timedelta
|
|
|
|
|
+from urllib.parse import unquote, parse_qs
|
|
|
|
|
+
|
|
|
|
|
+from applications.utils import fetch_from_odps
|
|
|
|
|
+from applications.utils import AsyncHttpClient
|
|
|
|
|
+from applications.crawler.wechat import get_article_list_from_account
|
|
|
|
|
+from applications.crawler.wechat import get_article_detail
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class AutoReplyCardsMonitorConst:
|
|
|
|
|
+ # fetch_status
|
|
|
|
|
+ FETCH_INIT_STATUS = 0
|
|
|
|
|
+ FETCH_PROCESSING_STATUS = 1
|
|
|
|
|
+ FETCH_SUCCESS_STATUS = 2
|
|
|
|
|
+ FETCH_FAIL_STATUS = 3
|
|
|
|
|
+
|
|
|
|
|
+ # task_status
|
|
|
|
|
+ INIT_STATUS = 0
|
|
|
|
|
+ PROCESSING_STATUS = 1
|
|
|
|
|
+ SUCCESS_STATUS = 2
|
|
|
|
|
+ FAIL_STATUS = 99
|
|
|
|
|
+
|
|
|
|
|
+ # account_status
|
|
|
|
|
+ VALID_STATUS = 1
|
|
|
|
|
+ INVALID_STATUS = 0
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class AutoReplyCardsMonitorUtils(AutoReplyCardsMonitorConst):
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def generate_task_id(task_name, gh_id):
|
|
|
|
|
+ match task_name:
|
|
|
|
|
+ case "follow":
|
|
|
|
|
+ return f"{task_name}_{gh_id}"
|
|
|
|
|
+ case _:
|
|
|
|
|
+ return f"{task_name}_{uuid.uuid4()}"
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def extract_reply_cards(msg_type, root):
|
|
|
|
|
+ page_path = root.find(".//pagepath").text
|
|
|
|
|
+ card_title = root.find(".//title").text
|
|
|
|
|
+ mini_program = root.find(".//sourcedisplayname").text
|
|
|
|
|
+ file_id = root.find("appmsg/appattach/cdnthumburl").text
|
|
|
|
|
+ ase_key = root.find("appmsg/appattach/aeskey").text
|
|
|
|
|
+ file_size = root.find("appmsg/appattach/cdnthumblength").text
|
|
|
|
|
+ return {
|
|
|
|
|
+ "title": card_title,
|
|
|
|
|
+ "page_path": page_path,
|
|
|
|
|
+ "msg_type": msg_type,
|
|
|
|
|
+ "mini_program": mini_program,
|
|
|
|
|
+ "file_id": file_id,
|
|
|
|
|
+ "file_size": file_size,
|
|
|
|
|
+ "ase_key": ase_key,
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def extract_reply_articles(msg_type, root):
|
|
|
|
|
+ title = root.find("appmsg/title").text
|
|
|
|
|
+ url = root.find("appmsg/url").text
|
|
|
|
|
+ cover_url = root.find("appmsg/thumburl").text
|
|
|
|
|
+ account_name = root.find("appmsg/sourcedisplayname").text
|
|
|
|
|
+ gh_id = root.find("appmsg/sourceusername").text
|
|
|
|
|
+ desc = root.find("appmsg/des").text
|
|
|
|
|
+ return {
|
|
|
|
|
+ "msg_type": msg_type,
|
|
|
|
|
+ "title": title,
|
|
|
|
|
+ "url": url,
|
|
|
|
|
+ "cover_url": cover_url,
|
|
|
|
|
+ "account_name": account_name,
|
|
|
|
|
+ "gh_id": gh_id,
|
|
|
|
|
+ "desc": desc,
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ # 解析 xml
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def extract_callback_xml(self, xml_text):
|
|
|
|
|
+ try:
|
|
|
|
|
+ root = ET.fromstring(xml_text)
|
|
|
|
|
+ msg_type = root.find("appmsg/type").text
|
|
|
|
|
+ match msg_type:
|
|
|
|
|
+ case "5":
|
|
|
|
|
+ return self.extract_reply_articles(msg_type, root)
|
|
|
|
|
+
|
|
|
|
|
+ case "33":
|
|
|
|
|
+ return self.extract_reply_cards(msg_type, root)
|
|
|
|
|
+
|
|
|
|
|
+ case "36":
|
|
|
|
|
+ return self.extract_reply_cards(msg_type, root)
|
|
|
|
|
+
|
|
|
|
|
+ case _:
|
|
|
|
|
+ return {
|
|
|
|
|
+ "msg_type": msg_type,
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ print(e)
|
|
|
|
|
+ return {}
|
|
|
|
|
+
|
|
|
|
|
+ # 解析 page_path
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def extract_page_path(page_path):
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ async def get_cover_url(aes_key, total_size, file_id):
|
|
|
|
|
+ url = "http://api.geweapi.com/gewe/v2/api/message/downloadCdn"
|
|
|
|
|
+ data = {
|
|
|
|
|
+ "appId": "wx_anFlUnezoUynU3SKcqTWk",
|
|
|
|
|
+ "aesKey": aes_key,
|
|
|
|
|
+ "totalSize": total_size,
|
|
|
|
|
+ "fileId": file_id,
|
|
|
|
|
+ "type": "3",
|
|
|
|
|
+ "suffix": "jpg",
|
|
|
|
|
+ }
|
|
|
|
|
+ headers = {
|
|
|
|
|
+ "X-GEWE-TOKEN": "d3fb918f-0f36-4769-b095-410181614231",
|
|
|
|
|
+ "Content-Type": "application/json",
|
|
|
|
|
+ }
|
|
|
|
|
+ async with AsyncHttpClient() as client:
|
|
|
|
|
+ response = await client.post(url, headers=headers, data=json.dumps(data))
|
|
|
|
|
+
|
|
|
|
|
+ return response
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ async def get_sample_url(recent_articles):
|
|
|
|
|
+ for article in recent_articles:
|
|
|
|
|
+ link = article["ContentUrl"]
|
|
|
|
|
+ print(link)
|
|
|
|
|
+ response = await get_article_detail(article_link=link)
|
|
|
|
|
+ print(response)
|
|
|
|
|
+ if not response:
|
|
|
|
|
+ continue
|
|
|
|
|
+ code = response["code"]
|
|
|
|
|
+ if code == 0 or code == 25006:
|
|
|
|
|
+ return link
|
|
|
|
|
+
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ # 获取检测的账号 list
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def get_monitor_account_list():
|
|
|
|
|
+ yesterday = (datetime.today() - timedelta(days=1)).strftime("%Y%m%d")
|
|
|
|
|
+ query = f"""
|
|
|
|
|
+ SELECT 公众号名, ghid, count(DISTINCT mid) AS uv
|
|
|
|
|
+ FROM loghubods.opengid_base_data
|
|
|
|
|
+ WHERE dt = {yesterday}
|
|
|
|
|
+ AND hotsencetype = 1074
|
|
|
|
|
+ AND usersharedepth = 0
|
|
|
|
|
+ AND channel = '公众号合作-即转-稳定'
|
|
|
|
|
+ GROUP BY 公众号名, ghid
|
|
|
|
|
+ HAVING uv > 100
|
|
|
|
|
+ ORDER BY uv DESC
|
|
|
|
|
+ ;
|
|
|
|
|
+ """
|
|
|
|
|
+ result = fetch_from_odps(query)
|
|
|
|
|
+ return result
|
|
|
|
|
+
|
|
|
|
|
+ # 下载封面图片
|
|
|
|
|
+ async def download_cover(self, url, file_path):
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+ # 上传封面至 oss
|
|
|
|
|
+ async def upload_cover(self, file_path):
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class AutoReplyCardsMonitorMapper(AutoReplyCardsMonitorUtils):
|
|
|
|
|
+ def __init__(self, pool, log_client):
|
|
|
|
|
+ self.pool = pool
|
|
|
|
|
+ self.log_client = log_client
|
|
|
|
|
+
|
|
|
|
|
+ # 获取关注公众号任务结果
|
|
|
|
|
+ async def get_follow_account_task_result(self, task_id):
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+ # 创建自动回复任务
|
|
|
|
|
+ async def create_auto_reply_task(self):
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+ # 获取自动回复任务结果
|
|
|
|
|
+ async def get_auto_reply_task_result(self, task_id):
|
|
|
|
|
+ query = """
|
|
|
|
|
+ SELECT task_result, task_status, err_msg,from_unixtime(update_timestamp / 1000) AS update_time
|
|
|
|
|
+ FROM gzh_msg_record
|
|
|
|
|
+ WHERE task_id = %s;
|
|
|
|
|
+ """
|
|
|
|
|
+ return await self.pool.async_fetch(
|
|
|
|
|
+ query=query, params=(task_id,), db_name="aigc"
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # 获取关注公众号任务列表
|
|
|
|
|
+ async def get_follow_account_task_list(self):
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+ # 获取自动回复任务列表
|
|
|
|
|
+ async def get_auto_reply_task_list(self):
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+ # 插入待关注公众号
|
|
|
|
|
+ async def insert_accounts_task(self, account_name, gh_id):
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+ # 查询账号
|
|
|
|
|
+ async def fetch_account_status(self, account_name):
|
|
|
|
|
+ query = """
|
|
|
|
|
+ SELECT partner_name, partner_id, gh_id, status, follow_status
|
|
|
|
|
+ FROM cooperate_accounts
|
|
|
|
|
+ WHERE account_name = %s;
|
|
|
|
|
+ """
|
|
|
|
|
+ return await self.pool.async_fetch(query=query, params=(account_name,))
|
|
|
|
|
+
|
|
|
|
|
+ # 更新账号状态为无效
|
|
|
|
|
+ async def set_account_as_invalid(self, gh_id):
|
|
|
|
|
+ query = """
|
|
|
|
|
+ UPDATE cooperate_accounts SET status = %s WHERE gh_id = %s;
|
|
|
|
|
+ """
|
|
|
|
|
+ await self.pool.async_save(query=query, params=(self.INVALID_STATUS, gh_id))
|
|
|
|
|
+
|
|
|
|
|
+ # 插入AIGC关注公众号任务
|
|
|
|
|
+ async def insert_aigc_follow_account_task(self, task_id, link):
|
|
|
|
|
+ timestamp = int(time.time() * 1000)
|
|
|
|
|
+ query = """
|
|
|
|
|
+ INSERT INTO gzh_msg_record (task_id, biz_type, task_params, create_timestamp, update_timestamp) VALUES (%s, %s, %s, %s, %s);
|
|
|
|
|
+ """
|
|
|
|
|
+ return await self.pool.async_save(
|
|
|
|
|
+ query=query,
|
|
|
|
|
+ params=(task_id, "follow", link, timestamp, timestamp),
|
|
|
|
|
+ db_name="aigc",
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # 插入AIGC自动回复任务
|
|
|
|
|
+ async def insert_aigc_auto_reply_task(self, task_id, account_name):
|
|
|
|
|
+ timestamp = int(time.time() * 1000)
|
|
|
|
|
+ query = """
|
|
|
|
|
+ INSERT INTO gzh_msg_record (task_id, task_params, create_timestamp, update_timestamp) VALUES (%s, %s, %s, %s);
|
|
|
|
|
+ """
|
|
|
|
|
+ return await self.pool.async_save(
|
|
|
|
|
+ query=query,
|
|
|
|
|
+ params=(task_id, account_name, timestamp, timestamp),
|
|
|
|
|
+ db_name="aigc",
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # 为账号设置 sample_url
|
|
|
|
|
+ async def set_sample_url(self, gh_id, sample_url):
|
|
|
|
|
+ query = """
|
|
|
|
|
+ UPDATE cooperate_accounts SET sample_link = %s WHERE gh_id = %s;
|
|
|
|
|
+ """
|
|
|
|
|
+ return await self.pool.async_save(query=query, params=(sample_url, gh_id))
|
|
|
|
|
+
|
|
|
|
|
+ # 修改账号的关注状态
|
|
|
|
|
+ async def update_follow_status(self, gh_id, ori_status, new_status):
|
|
|
|
|
+ query = """
|
|
|
|
|
+ UPDATE cooperate_accounts SET follow_status = %s WHERE gh_id = %s and follow_status = %s;
|
|
|
|
|
+ """
|
|
|
|
|
+ return await self.pool.async_save(
|
|
|
|
|
+ query=query, params=(new_status, gh_id, ori_status)
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # 从 aigc 获取关注结果
|
|
|
|
|
+ async def fetch_follow_account_status(self, gh_id):
|
|
|
|
|
+ query = """
|
|
|
|
|
+ SELECT task_status, err_msg
|
|
|
|
|
+ FROM gzh_msg_record
|
|
|
|
|
+ WHERE task_id = %s;
|
|
|
|
|
+ """
|
|
|
|
|
+ return await self.pool.async_fetch(
|
|
|
|
|
+ query=query, params=(f"follow_{gh_id}",), db_name="aigc"
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class AutoReplyCardsMonitor(AutoReplyCardsMonitorMapper):
|
|
|
|
|
+ def __init__(self, pool, log_client):
|
|
|
|
|
+ super().__init__(pool, log_client)
|
|
|
|
|
+
|
|
|
|
|
+ # 创建单个关注公众号任务
|
|
|
|
|
+ async def create_follow_account_task(self, gh_id):
|
|
|
|
|
+ response = await get_article_list_from_account(account_id=gh_id)
|
|
|
|
|
+ code = response.get("code")
|
|
|
|
|
+ match code:
|
|
|
|
|
+ case 0:
|
|
|
|
|
+ recent_articles = response["data"]["data"][0]["AppMsg"]["DetailInfo"]
|
|
|
|
|
+ article_url = await self.get_sample_url(recent_articles)
|
|
|
|
|
+ print(article_url)
|
|
|
|
|
+ if article_url:
|
|
|
|
|
+ await self.set_sample_url(gh_id, article_url)
|
|
|
|
|
+
|
|
|
|
|
+ task_id = self.generate_task_id(task_name="follow", gh_id=gh_id)
|
|
|
|
|
+ affected_rows = await self.insert_aigc_follow_account_task(
|
|
|
|
|
+ task_id, article_url
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ if affected_rows:
|
|
|
|
|
+ await self.update_follow_status(
|
|
|
|
|
+ gh_id, self.INIT_STATUS, self.PROCESSING_STATUS
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ case 25013:
|
|
|
|
|
+ await self.set_account_as_invalid(gh_id)
|
|
|
|
|
+
|
|
|
|
|
+ case _:
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+ async def follow_gzh_task(self):
|
|
|
|
|
+ account_list = self.get_monitor_account_list()
|
|
|
|
|
+ for account in account_list:
|
|
|
|
|
+ try:
|
|
|
|
|
+ fetch_response = await self.fetch_account_status(account.公众号名)
|
|
|
|
|
+ if not fetch_response:
|
|
|
|
|
+ print("账号不存在", account)
|
|
|
|
|
+ # todo 没有 gh_id, 暂时无法存储账号
|
|
|
|
|
+ # affected_rows =await self.insert_accounts_task(account.公众号名, account.ghid)
|
|
|
|
|
+ # if affected_rows:
|
|
|
|
|
+ # await self.create_follow_account_task(account.ghid)
|
|
|
|
|
+
|
|
|
|
|
+ else:
|
|
|
|
|
+ account_detail = fetch_response[0]
|
|
|
|
|
+ status = account_detail["status"]
|
|
|
|
|
+ follow_status = account_detail["follow_status"]
|
|
|
|
|
+ if not status:
|
|
|
|
|
+ print("账号已经迁移或者封禁")
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ match follow_status:
|
|
|
|
|
+ case self.INIT_STATUS:
|
|
|
|
|
+ await self.create_follow_account_task(
|
|
|
|
|
+ account_detail["gh_id"]
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ case self.PROCESSING_STATUS:
|
|
|
|
|
+ fetch_response = await self.fetch_follow_account_status(
|
|
|
|
|
+ account_detail["gh_id"]
|
|
|
|
|
+ )
|
|
|
|
|
+ if not fetch_response:
|
|
|
|
|
+ await self.update_follow_status(
|
|
|
|
|
+ account_detail["gh_id"],
|
|
|
|
|
+ self.PROCESSING_STATUS,
|
|
|
|
|
+ self.INIT_STATUS,
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ task_status = fetch_response[0]["task_status"]
|
|
|
|
|
+ match task_status:
|
|
|
|
|
+ case self.FETCH_INIT_STATUS:
|
|
|
|
|
+ continue
|
|
|
|
|
+ case self.FETCH_PROCESSING_STATUS:
|
|
|
|
|
+ continue
|
|
|
|
|
+ case self.FETCH_SUCCESS_STATUS:
|
|
|
|
|
+ await self.update_follow_status(
|
|
|
|
|
+ account_detail["gh_id"],
|
|
|
|
|
+ self.PROCESSING_STATUS,
|
|
|
|
|
+ self.SUCCESS_STATUS,
|
|
|
|
|
+ )
|
|
|
|
|
+ case self.FETCH_FAIL_STATUS:
|
|
|
|
|
+ await self.update_follow_status(
|
|
|
|
|
+ account_detail["gh_id"],
|
|
|
|
|
+ self.PROCESSING_STATUS,
|
|
|
|
|
+ self.FAIL_STATUS,
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ case self.SUCCESS_STATUS:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ case _:
|
|
|
|
|
+ print(f"{account.公众号名}账号状态异常")
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ print(f"处理账号{account.公众号名}异常", e)
|
|
|
|
|
+
|
|
|
|
|
+ # main function
|
|
|
|
|
+ async def deal(self, task_name):
|
|
|
|
|
+ match task_name:
|
|
|
|
|
+ case "follow_gzh_task":
|
|
|
|
|
+ await self.follow_gzh_task()
|
|
|
|
|
+
|
|
|
|
|
+ case _:
|
|
|
|
|
+ print("task_error")
|