瀏覽代碼

新增抓公众号粉丝

luojunhui 2 月之前
父節點
當前提交
5cc900e178
共有 2 個文件被更改,包括 335 次插入88 次删除
  1. 320 88
      applications/tasks/monitor_tasks/auto_reply_cards_monitor.py
  2. 15 0
      applications/utils/common.py

+ 320 - 88
applications/tasks/monitor_tasks/auto_reply_cards_monitor.py

@@ -1,28 +1,15 @@
 import json
+import time
+import uuid
 import xml.etree.ElementTree as ET
+
+from datetime import datetime, timedelta
 from urllib.parse import unquote, parse_qs
-from pandas import DataFrame
 
-
-def parse_xml(xml_text):
-    # 1. 解析 XML
-    try:
-        root = ET.fromstring(xml_text)
-        page_path = root.find(".//pagepath").text
-        path, query = page_path.split("?", 1)
-        query_decoded = unquote(query)
-        params = parse_qs(query_decoded)
-        card_title = root.find(".//title").text
-        mini_program = root.find(".//sourcedisplayname").text
-        obj = {
-            "page_path": page_path,
-            "title": card_title,
-            "mini_program": mini_program,
-            "params": params,
-        }
-        return obj
-    except Exception:
-        return None
+from applications.utils import fetch_from_odps
+from applications.utils import AsyncHttpClient
+from applications.crawler.wechat import get_article_list_from_account
+from applications.crawler.wechat import get_article_detail
 
 
 class AutoReplyCardsMonitorConst:
@@ -38,83 +25,328 @@ class AutoReplyCardsMonitorConst:
     SUCCESS_STATUS = 2
     FAIL_STATUS = 99
 
+    # account_status
+    VALID_STATUS = 1
+    INVALID_STATUS = 0
+
+
+class AutoReplyCardsMonitorUtils(AutoReplyCardsMonitorConst):
+    @staticmethod
+    def generate_task_id(task_name, gh_id):
+        match task_name:
+            case "follow":
+                return f"{task_name}_{gh_id}"
+            case _:
+                return f"{task_name}_{uuid.uuid4()}"
+
+    @staticmethod
+    def extract_reply_cards(msg_type, root):
+        page_path = root.find(".//pagepath").text
+        card_title = root.find(".//title").text
+        mini_program = root.find(".//sourcedisplayname").text
+        file_id = root.find("appmsg/appattach/cdnthumburl").text
+        ase_key = root.find("appmsg/appattach/aeskey").text
+        file_size = root.find("appmsg/appattach/cdnthumblength").text
+        return {
+            "title": card_title,
+            "page_path": page_path,
+            "msg_type": msg_type,
+            "mini_program": mini_program,
+            "file_id": file_id,
+            "file_size": file_size,
+            "ase_key": ase_key,
+        }
+
+    @staticmethod
+    def extract_reply_articles(msg_type, root):
+        title = root.find("appmsg/title").text
+        url = root.find("appmsg/url").text
+        cover_url = root.find("appmsg/thumburl").text
+        account_name = root.find("appmsg/sourcedisplayname").text
+        gh_id = root.find("appmsg/sourceusername").text
+        desc = root.find("appmsg/des").text
+        return {
+            "msg_type": msg_type,
+            "title": title,
+            "url": url,
+            "cover_url": cover_url,
+            "account_name": account_name,
+            "gh_id": gh_id,
+            "desc": desc,
+        }
 
-class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
+    # 解析 xml
+    @staticmethod
+    def extract_callback_xml(self, xml_text):
+        try:
+            root = ET.fromstring(xml_text)
+            msg_type = root.find("appmsg/type").text
+            match msg_type:
+                case "5":
+                    return self.extract_reply_articles(msg_type, root)
+
+                case "33":
+                    return self.extract_reply_cards(msg_type, root)
+
+                case "36":
+                    return self.extract_reply_cards(msg_type, root)
+
+                case _:
+                    return {
+                        "msg_type": msg_type,
+                    }
+
+        except Exception as e:
+            print(e)
+            return {}
+
+    # 解析 page_path
+    @staticmethod
+    def extract_page_path(page_path):
+        pass
+
+    @staticmethod
+    async def get_cover_url(aes_key, total_size, file_id):
+        url = "http://api.geweapi.com/gewe/v2/api/message/downloadCdn"
+        data = {
+            "appId": "wx_anFlUnezoUynU3SKcqTWk",
+            "aesKey": aes_key,
+            "totalSize": total_size,
+            "fileId": file_id,
+            "type": "3",
+            "suffix": "jpg",
+        }
+        headers = {
+            "X-GEWE-TOKEN": "d3fb918f-0f36-4769-b095-410181614231",
+            "Content-Type": "application/json",
+        }
+        async with AsyncHttpClient() as client:
+            response = await client.post(url, headers=headers, data=json.dumps(data))
+
+        return response
+
+    @staticmethod
+    async def get_sample_url(recent_articles):
+        for article in recent_articles:
+            link = article["ContentUrl"]
+            print(link)
+            response = await get_article_detail(article_link=link)
+            print(response)
+            if not response:
+                continue
+            code = response["code"]
+            if code == 0 or code == 25006:
+                return link
+
+        return None
+
+    # 获取检测的账号 list
+    @staticmethod
+    def get_monitor_account_list():
+        yesterday = (datetime.today() - timedelta(days=1)).strftime("%Y%m%d")
+        query = f"""
+            SELECT  公众号名, ghid, count(DISTINCT mid) AS uv
+            FROM    loghubods.opengid_base_data
+            WHERE   dt = {yesterday}
+            AND     hotsencetype = 1074
+            AND     usersharedepth = 0
+            AND     channel = '公众号合作-即转-稳定'
+            GROUP BY 公众号名, ghid
+            HAVING uv > 100
+            ORDER BY uv DESC
+            ;
+        """
+        result = fetch_from_odps(query)
+        return result
+
+    # 下载封面图片
+    async def download_cover(self, url, file_path):
+        pass
+
+    # 上传封面至 oss
+    async def upload_cover(self, file_path):
+        pass
+
+
+class AutoReplyCardsMonitorMapper(AutoReplyCardsMonitorUtils):
     def __init__(self, pool, log_client):
         self.pool = pool
         self.log_client = log_client
 
-    async def get_tasks(self):
+    # 获取关注公众号任务结果
+    async def get_follow_account_task_result(self, task_id):
+        pass
+
+    # 创建自动回复任务
+    async def create_auto_reply_task(self):
+        pass
+
+    # 获取自动回复任务结果
+    async def get_auto_reply_task_result(self, task_id):
         query = """
-            SELECT * FROM auto_reply_tasks;
+            SELECT task_result, task_status, err_msg,from_unixtime(update_timestamp / 1000) AS update_time 
+            FROM gzh_msg_record
+            WHERE task_id = %s;
+        """
+        return await self.pool.async_fetch(
+            query=query, params=(task_id,), db_name="aigc"
+        )
+
+    # 获取关注公众号任务列表
+    async def get_follow_account_task_list(self):
+        pass
+
+    # 获取自动回复任务列表
+    async def get_auto_reply_task_list(self):
+        pass
+
+    # 插入待关注公众号
+    async def insert_accounts_task(self, account_name, gh_id):
+        pass
+
+    # 查询账号
+    async def fetch_account_status(self, account_name):
+        query = """
+            SELECT partner_name, partner_id, gh_id, status, follow_status
+            FROM cooperate_accounts
+            WHERE account_name = %s;
+        """
+        return await self.pool.async_fetch(
+            query=query, params=(account_name,)
+        )
+
+    # 更新账号状态为无效
+    async def set_account_as_invalid(self, gh_id):
+        query = """
+            UPDATE cooperate_accounts SET status = %s WHERE gh_id = %s;
         """
-        return await self.pool.async_fetch(query)
+        await self.pool.async_save(query=query, params=(self.INVALID_STATUS, gh_id))
 
-    async def get_result_from_aigc(self, task_id):
+    # 插入AIGC关注公众号任务
+    async def insert_aigc_follow_account_task(self, task_id, link):
+        timestamp = int(time.time() * 1000)
         query = """
-            SELECT gzh_name, task_status, task_result, from_unixtime(create_timestamp / 1000) as create_time,
-            from_unixtime(update_timestamp / 1000) as update_time,
-             err_msg
+            INSERT INTO gzh_msg_record (task_id, biz_type, task_params, create_timestamp, update_timestamp) VALUES (%s, %s, %s, %s, %s); 
+        """
+        return await self.pool.async_save(query=query, params=(task_id, "follow", link, timestamp, timestamp), db_name="aigc")
+
+    # 插入AIGC自动回复任务
+    async def insert_aigc_auto_reply_task(self, task_id, account_name):
+        timestamp = int(time.time() * 1000)
+        query = """
+            INSERT INTO gzh_msg_record (task_id, task_params, create_timestamp, update_timestamp) VALUES (%s, %s, %s, %s); 
+        """
+        return await self.pool.async_save(query=query, params=(task_id,  account_name, timestamp, timestamp), db_name="aigc")
+
+    # 为账号设置 sample_url
+    async def set_sample_url(self, gh_id, sample_url):
+        query = """
+            UPDATE cooperate_accounts SET sample_link = %s WHERE gh_id = %s;
+        """
+        return await self.pool.async_save(query=query, params=(sample_url, gh_id))
+
+    # 修改账号的关注状态
+    async def update_follow_status(self, gh_id, ori_status, new_status):
+        query = """
+            UPDATE cooperate_accounts SET follow_status = %s WHERE gh_id = %s and follow_status = %s;
+        """
+        return await self.pool.async_save(query=query, params=(new_status, gh_id, ori_status))
+
+    # 从 aigc 获取关注结果
+    async def fetch_follow_account_status(self, gh_id):
+        query = """
+            SELECT task_status, err_msg 
             FROM gzh_msg_record
             WHERE task_id = %s;
         """
-        return await self.pool.async_fetch(query=query, params=(task_id,), db_name="aigc")
-
-    # async def save_to_each_record(self, record: dict):
-    #     update_query = """
-    #
-    #     """
-    #     pass
-
-    async def deal(self):
-        # tasks = await self.get_tasks()
-        task_ids = [i for i in range(1, 9)]
-        L = []
-        for task_id in task_ids:
-            record = await self.get_result_from_aigc(task_id)
-            if not record:
-                continue
-            task_status = record[0]['task_status']
-            match task_status:
-                case self.FETCH_INIT_STATUS:
-                    continue
-
-                case self.FETCH_PROCESSING_STATUS:
-                    continue
-
-                case self.FETCH_SUCCESS_STATUS:
-                    fetch_result = record[0]['task_result']
-                    fetch_detail_list = json.loads(fetch_result)
-                    for index, xml_txt in enumerate(fetch_detail_list, 1):
-                        extract_info = parse_xml(xml_txt)
-                        if not extract_info:
+        return await self.pool.async_fetch(
+            query=query, params=(f"follow_{gh_id}",), db_name="aigc"
+        )
+
+
+class AutoReplyCardsMonitor(AutoReplyCardsMonitorMapper):
+    def __init__(self, pool, log_client):
+        super().__init__(pool, log_client)
+
+    # 创建单个关注公众号任务
+    async def create_follow_account_task(self, gh_id):
+        response = await get_article_list_from_account(account_id=gh_id)
+        code = response.get("code")
+        match code:
+            case 0:
+                recent_articles = response['data']['data'][0]['AppMsg']['DetailInfo']
+                article_url = await self.get_sample_url(recent_articles)
+                print(article_url)
+                if article_url:
+                    await self.set_sample_url(gh_id, article_url)
+
+                    task_id = self.generate_task_id(task_name="follow", gh_id=gh_id)
+                    affected_rows = await self.insert_aigc_follow_account_task(task_id, article_url)
+
+                    if affected_rows:
+                        await self.update_follow_status(gh_id, self.INIT_STATUS, self.PROCESSING_STATUS)
+
+            case 25013:
+                await self.set_account_as_invalid(gh_id)
+
+            case _:
+                pass
+
+    async def follow_gzh_task(self):
+        account_list = self.get_monitor_account_list()
+        for account in account_list:
+            try:
+                fetch_response = await self.fetch_account_status(account.公众号名)
+                if not fetch_response:
+                    print("账号不存在", account)
+                    # todo 没有 gh_id, 暂时无法存储账号
+                    # affected_rows =await self.insert_accounts_task(account.公众号名, account.ghid)
+                    # if affected_rows:
+                    #     await self.create_follow_account_task(account.ghid)
+
+                else:
+                    account_detail = fetch_response[0]
+                    status = account_detail["status"]
+                    follow_status = account_detail["follow_status"]
+                    if not status:
+                        print("账号已经迁移或者封禁")
+                        continue
+
+                    match follow_status:
+                        case self.INIT_STATUS:
+                            await self.create_follow_account_task(account_detail["gh_id"])
+
+                        case self.PROCESSING_STATUS:
+                            fetch_response = await self.fetch_follow_account_status(account_detail["gh_id"])
+                            if not fetch_response:
+                                await self.update_follow_status(account_detail["gh_id"], self.PROCESSING_STATUS, self.INIT_STATUS)
+
+                            task_status = fetch_response[0]["task_status"]
+                            match task_status:
+                                case self.FETCH_INIT_STATUS:
+                                    continue
+                                case self.FETCH_PROCESSING_STATUS:
+                                    continue
+                                case self.FETCH_SUCCESS_STATUS:
+                                    await self.update_follow_status(account_detail["gh_id"], self.PROCESSING_STATUS, self.SUCCESS_STATUS)
+                                case self.FETCH_FAIL_STATUS:
+                                    await self.update_follow_status(account_detail["gh_id"], self.PROCESSING_STATUS, self.FAIL_STATUS)
+
+                        case self.SUCCESS_STATUS:
                             continue
-                        _id = extract_info.get('params', {}).get('id')
-                        _video_id = extract_info.get('params', {}).get('video_id')
-                        if _id:
-                            vid = _id[0]
-                        elif _video_id:
-                            vid = _video_id[0]
-                        else:
-                            vid = ''
-
-                        temp = [
-                            record[0]['gzh_name'],
-                            extract_info.get('title'),
-                            extract_info.get('mini_program'),
-                            vid,
-                            extract_info.get('params', {}).get('rootSourceId', [''])[0],
-                            extract_info.get('params', {}).get('rootShareId', [''])[0],
-                            index,
-                            record[0]['create_time'],
-                            record[0]['update_time'],
-                            extract_info['page_path']
-                        ]
-                        L.append(temp)
-
-                case self.FETCH_FAIL_STATUS:
-                    error_msg = record['err_msg']
-                    print(error_msg)
-
-        df = DataFrame(L, columns=['账号名称', '标题', '小程序', '视频 id', 'rootSourceId', 'rootShareId', 'index', 'create_time', 'update_time', 'page_path'])
-        df.to_csv("local_data.csv", index=False)
+
+                        case _:
+                            print(f"{account.公众号名}账号状态异常")
+
+            except Exception as e:
+                print(f"处理账号{account.公众号名}异常", e)
+
+
+    # main function
+    async def deal(self, task_name):
+        match task_name:
+            case "follow_gzh_task":
+                await self.follow_gzh_task()
+
+            case _:
+                print("task_error")

+ 15 - 0
applications/utils/common.py

@@ -8,6 +8,7 @@ import hashlib
 import math
 import statistics
 from scipy.stats import t
+from odps import ODPS
 
 from datetime import datetime, timezone, date, timedelta
 from typing import List
@@ -249,3 +250,17 @@ def get_task_chinese_name(data):
         return f"{task_name_chinese}\t{platform}\t{crawler_methods}\t{category_list}\t{strategy}"
     else:
         return task_name_chinese
+
+
+def fetch_from_odps(query):
+    client = ODPS(
+        access_id="LTAIWYUujJAm7CbH",
+        secret_access_key="RfSjdiWwED1sGFlsjXv0DlfTnZTG1P",
+        endpoint="http://service.cn.maxcompute.aliyun.com/api",
+        project="loghubods",
+    )
+    with client.execute_sql(query).open_reader() as reader:
+        if reader:
+            return [item for item in reader]
+        else:
+            return []