Sfoglia il codice sorgente

Merge branch 'feature/luojunhui/20260108-create-auto-reply-task' of Server/LongArticleTaskServer into master

luojunhui 1 mese fa
parent
commit
3efde059a9

+ 2 - 0
applications/config/task_chinese_name.py

@@ -22,4 +22,6 @@ name_map = {
     "update_account_open_rate_avg": "更新账号平均打开率",
     "update_limited_account_info": "更新限流账号信息",
     "update_account_read_avg": "更新账号平均阅读率",
+    "get_follow_result": "获取自动关注回复",
+    "extract_reply_result": "解析自动回复结果",
 }

+ 377 - 99
applications/tasks/monitor_tasks/auto_reply_cards_monitor.py

@@ -1,11 +1,19 @@
+import asyncio
+import os
 import json
 import time
+import traceback
 import uuid
 import xml.etree.ElementTree as ET
 
+from tqdm import tqdm
 from datetime import datetime, timedelta
-from urllib.parse import unquote, parse_qs
+from urllib.parse import unquote, parse_qs, urlparse
 
+import requests
+from requests.exceptions import RequestException
+
+from applications.utils import upload_to_oss
 from applications.utils import fetch_from_odps
 from applications.utils import AsyncHttpClient
 from applications.crawler.wechat import get_article_list_from_account
@@ -40,43 +48,41 @@ class AutoReplyCardsMonitorUtils(AutoReplyCardsMonitorConst):
                 return f"{task_name}_{uuid.uuid4()}"
 
     @staticmethod
-    def extract_reply_cards(msg_type, root):
-        page_path = root.find(".//pagepath").text
-        card_title = root.find(".//title").text
-        mini_program = root.find(".//sourcedisplayname").text
-        file_id = root.find("appmsg/appattach/cdnthumburl").text
-        ase_key = root.find("appmsg/appattach/aeskey").text
-        file_size = root.find("appmsg/appattach/cdnthumblength").text
-        return {
-            "title": card_title,
-            "page_path": page_path,
-            "msg_type": msg_type,
-            "mini_program": mini_program,
-            "file_id": file_id,
-            "file_size": file_size,
-            "ase_key": ase_key,
+    def parse_fields(root, fields, default=""):
+        result = {}
+        for key, path in fields.items():
+            elem = root.find(path)
+            result[key] = elem.text if elem is not None and elem.text else default
+        return result
+
+    def extract_reply_cards(self, msg_type, root):
+        fields = {
+            "title": ".//title",
+            "page_path": ".//pagepath",
+            "mini_program": ".//sourcedisplayname",
+            "file_id": "appmsg/appattach/cdnthumburl",
+            "file_size": "appmsg/appattach/cdnthumblength",
+            "aes_key": "appmsg/appattach/aeskey",
         }
 
-    @staticmethod
-    def extract_reply_articles(msg_type, root):
-        title = root.find("appmsg/title").text
-        url = root.find("appmsg/url").text
-        cover_url = root.find("appmsg/thumburl").text
-        account_name = root.find("appmsg/sourcedisplayname").text
-        gh_id = root.find("appmsg/sourceusername").text
-        desc = root.find("appmsg/des").text
-        return {
-            "msg_type": msg_type,
-            "title": title,
-            "url": url,
-            "cover_url": cover_url,
-            "account_name": account_name,
-            "gh_id": gh_id,
-            "desc": desc,
+        data = self.parse_fields(root, fields)
+        data["msg_type"] = msg_type
+        return data
+
+    def extract_reply_articles(self, msg_type, root):
+        fields = {
+            "title": "appmsg/title",
+            "url": "appmsg/url",
+            "cover_url": "appmsg/thumburl",
+            "account_name": "appmsg/sourcedisplayname",
+            "gh_id": "appmsg/sourceusername",
+            "desc": "appmsg/des",
         }
+        data = self.parse_fields(root, fields)
+        data["msg_type"] = msg_type
+        return data
 
     # 解析 xml
-    @staticmethod
     def extract_callback_xml(self, xml_text):
         try:
             root = ET.fromstring(xml_text)
@@ -92,18 +98,36 @@ class AutoReplyCardsMonitorUtils(AutoReplyCardsMonitorConst):
                     return self.extract_reply_cards(msg_type, root)
 
                 case _:
-                    return {
-                        "msg_type": msg_type,
-                    }
+                    return {}
 
         except Exception as e:
+            print(xml_text)
             print(e)
+            print(traceback.format_exc())
             return {}
 
     # 解析 page_path
     @staticmethod
     def extract_page_path(page_path):
-        pass
+        # 解析外层 URL
+        parsed_url = urlparse(page_path)
+        outer_params = parse_qs(parsed_url.query)
+
+        # 取出并解码 jumpPage
+        jump_page = outer_params.get("jumpPage", [""])[0]
+        if not jump_page:
+            return None, None
+
+        decoded_jump_page = unquote(jump_page)
+
+        # 解析 jumpPage 内层参数
+        inner_query = urlparse(decoded_jump_page).query
+        inner_params = parse_qs(inner_query)
+
+        video_id = inner_params.get("id", [None])[0]
+        root_source_id = inner_params.get("rootSourceId", [None])[0]
+
+        return video_id, root_source_id
 
     @staticmethod
     async def get_cover_url(aes_key, total_size, file_id):
@@ -129,9 +153,7 @@ class AutoReplyCardsMonitorUtils(AutoReplyCardsMonitorConst):
     async def get_sample_url(recent_articles):
         for article in recent_articles:
             link = article["ContentUrl"]
-            print(link)
             response = await get_article_detail(article_link=link)
-            print(response)
             if not response:
                 continue
             code = response["code"]
@@ -152,21 +174,55 @@ class AutoReplyCardsMonitorUtils(AutoReplyCardsMonitorConst):
             AND     usersharedepth = 0
             AND     channel = '公众号合作-即转-稳定'
             GROUP BY 公众号名, ghid
-            HAVING uv > 100
+            HAVING uv >= 100
             ORDER BY uv DESC
-            LIMIT 10
             ;
         """
         result = fetch_from_odps(query)
         return result
 
-    # 下载封面图片
-    async def download_cover(self, url, file_path):
-        pass
+    @staticmethod
+    def download_and_upload_cover(task_id, index, cover_obj, timeout=10):
+        try:
+            cover_url = cover_obj["data"]["fileUrl"]
+        except (KeyError, TypeError):
+            print(f"[WARN] Invalid cover_obj structure: {cover_obj}")
+            return None
+
+        file_name = f"{task_id}_{index}.jpg"
+        save_dir = os.path.join(os.getcwd(), "static")
+        save_path = os.path.join(save_dir, file_name)
+
+        os.makedirs(save_dir, exist_ok=True)
+
+        try:
+            response = requests.get(cover_url, timeout=timeout)
+            response.raise_for_status()
+        except RequestException as e:
+            print(f"[ERROR] Download failed ({cover_url}): {e}")
+            return None
+
+        try:
+            with open(save_path, "wb") as f:
+                f.write(response.content)
+        except OSError as e:
+            print(f"[ERROR] Write file failed ({save_path}): {e}")
+            return None
+
+        oss_dir = "auto_rely_cards_cover"
+        oss_key = None
+        try:
+            oss_key = upload_to_oss(save_path, f"{oss_dir}/{file_name}")
+        except Exception as e:
+            print(f"[ERROR] Upload to OSS failed: {e}")
+
+        if oss_key:
+            try:
+                os.remove(save_path)
+            except OSError as e:
+                print(f"[WARN] Failed to remove temp file {save_path}: {e}")
 
-    # 上传封面至 oss
-    async def upload_cover(self, file_path):
-        pass
+        return oss_key
 
 
 class AutoReplyCardsMonitorMapper(AutoReplyCardsMonitorUtils):
@@ -308,11 +364,177 @@ class AutoReplyCardsMonitorMapper(AutoReplyCardsMonitorUtils):
             ),
         )
 
+    # 获取带解析的任务
+    async def get_extract_tasks(self):
+        query = """
+            SELECT task_id, result FROM cooperate_accounts_task WHERE extract_status = %s AND task_status = %s;
+        """
+        return await self.pool.async_fetch(
+            query=query, params=(self.INIT_STATUS, self.SUCCESS_STATUS)
+        )
+
+    # 存储解析结果
+    async def store_extract_result(self, query, row_table):
+        return await self.pool.async_save(query=query, params=row_table)
+
 
 class AutoReplyCardsMonitor(AutoReplyCardsMonitorMapper):
     def __init__(self, pool, log_client):
         super().__init__(pool, log_client)
 
+    # 存储卡片信息
+    async def store_card(self, task_id, index, msg_type, xml_obj):
+        video_id, root_source_id = self.extract_page_path(xml_obj["page_path"])
+        cover_obj = await self.get_cover_url(xml_obj["aes_key"], xml_obj["file_size"], xml_obj["file_id"])
+        cover_oss = self.download_and_upload_cover(task_id, index, cover_obj)
+        query = """
+            INSERT INTO cooperate_auto_reply_detail
+            (
+                task_id, position, msg_type, card_title, card_cover, 
+                video_id, root_source_id, mini_program_name, task_result
+            ) VALUES 
+            (
+                %s, %s, %s, %s, %s, 
+                %s, %s, %s, %s
+            );
+        """
+        insert_row = (
+            task_id,
+            index,
+            msg_type,
+            xml_obj["title"],
+            cover_oss,
+            video_id,
+            root_source_id,
+            xml_obj["mini_program"],
+            json.dumps(xml_obj, ensure_ascii=False),
+        )
+        await self.store_extract_result(query, insert_row)
+
+    # 存储文章信息
+    async def store_article(self, task_id, index, msg_type, xml_obj):
+        article_title = xml_obj.get("title")
+        article_link = xml_obj.get("url")
+        article_cover = xml_obj.get("cover_url")
+        article_desc = xml_obj.get("desc")
+
+        fetch_fail_status = False
+
+        fetch_response = await get_article_detail(article_link=article_link, is_cache=False, is_count=True)
+        if not fetch_response:
+            fetch_fail_status = True
+
+        if not fetch_fail_status:
+            if fetch_response.get("code") != 0:
+                fetch_fail_status = True
+
+        if fetch_fail_status:
+            query = """
+                INSERT INTO cooperate_auto_reply_detail
+                    (task_id, position, msg_type, article_title, article_link, article_cover, article_desc, remark)
+                VALUES
+                (%s, %s, %s, %s, %s, %s, %s, %s);
+            """
+            remark = "获取文章详情失败"
+            insert_row = (task_id, index, msg_type, article_title, article_link, article_cover, article_desc, remark)
+            await self.store_extract_result(query, insert_row)
+
+        else:
+            print(article_link)
+            article_detail = fetch_response["data"]["data"]
+            article_text = article_detail["body_text"]
+            article_images = article_detail["image_url_list"]
+            read_cnt = article_detail["view_count"]
+            like_cnt = article_detail["like_count"]
+            publish_timestamp = int(article_detail["publish_timestamp"] / 1000)
+            parsed = urlparse(article_detail["content_link"])
+            params = parse_qs(parsed.query)
+            wx_sn = params.get("sn", [None])[0]
+            print(params)
+            print(wx_sn)
+            mini_info = article_detail.get("mini_program")
+            if not mini_info:
+                # video_id, root_source_id = None, None
+                query = """
+                    INSERT INTO cooperate_auto_reply_detail 
+                    (
+                        task_id, position, msg_type, article_title, article_link, 
+                        article_cover, article_text, article_images, article_desc, read_cnt, 
+                        like_cnt, publish_timestamp, task_result, wx_sn
+                    ) VALUES 
+                    (
+                        %s, %s, %s, %s, %s, 
+                        %s, %s, %s, %s, %s, 
+                        %s, %s, %s, %s
+                    );
+                """
+                values = (
+                    task_id,
+                    index,
+                    msg_type,
+                    article_title,
+                    article_link,
+                    article_cover,
+                    article_text,
+                    json.dumps(article_images, ensure_ascii=False),
+                    article_desc,
+                    read_cnt,
+                    like_cnt,
+                    publish_timestamp,
+                    json.dumps(fetch_response, ensure_ascii=False),
+                    wx_sn,
+                )
+                await self.store_extract_result(query, values)
+
+            else:
+                for card_index, i in enumerate(mini_info, 1):
+                    try:
+                        video_id, root_source_id = self.extract_page_path(i["path"])
+                        card_title = i["title"]
+                        card_cover = i["image_url"]
+                        mini_name = i["nike_name"]
+                        query = """
+                            INSERT INTO cooperate_auto_reply_detail 
+                                (
+                                    task_id, position, msg_type, card_title, card_cover, 
+                                    video_id, root_source_id, mini_program_name, article_title, article_link, 
+                                    article_cover, article_text, article_images, article_desc, read_cnt, 
+                                    like_cnt, publish_timestamp, task_result, wx_sn, card_index
+                                ) VALUES
+                                (
+                                    %s, %s, %s, %s, %s, 
+                                    %s, %s, %s, %s, %s, 
+                                    %s, %s, %s, %s, %s,
+                                    %s, %s, %s, %s, %s
+                                );
+                        """
+                        values = (
+                            task_id,
+                            index,
+                            msg_type,
+                            card_title,
+                            card_cover,
+                            video_id,
+                            root_source_id,
+                            mini_name,
+                            article_title,
+                            article_link,
+                            article_cover,
+                            article_text,
+                            json.dumps(article_images, ensure_ascii=False),
+                            article_desc,
+                            read_cnt,
+                            like_cnt,
+                            publish_timestamp,
+                            json.dumps(fetch_response, ensure_ascii=False),
+                            wx_sn,
+                            card_index,
+                        )
+                        await self.store_extract_result(query, values)
+                    except Exception as e:
+                        print(traceback.format_exc())
+                        print(e)
+
     # 创建单个关注公众号任务
     async def create_follow_single_account_task(self, gh_id):
         response = await get_article_list_from_account(account_id=gh_id)
@@ -343,13 +565,12 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorMapper):
 
     # 创建单个账号自动回复任务
     async def create_auto_reply_single_account_task(self, gh_id, account_name):
+        print(account_name)
         task_id = self.generate_task_id(task_name="auto_reply", gh_id=gh_id)
         # 先插入 task, 再创建自动回复任务
         create_row = await self.create_auto_reply_task(task_id, gh_id)
         if create_row:
-            affected_rows = await self.insert_aigc_auto_reply_task(
-                task_id, account_name
-            )
+            affected_rows = await self.insert_aigc_auto_reply_task(task_id, gh_id)
             if not affected_rows:
                 print("发布任务至 AIGC 失败")
             else:
@@ -374,55 +595,62 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorMapper):
                 else:
                     account_detail = fetch_response[0]
                     status = account_detail["status"]
-                    follow_status = account_detail["follow_status"]
+
                     if not status:
                         print("账号已经迁移或者封禁")
                         continue
 
-                    match follow_status:
-                        case self.INIT_STATUS:
-                            await self.create_follow_single_account_task(
-                                account_detail["gh_id"]
-                            )
-
-                        case self.PROCESSING_STATUS:
-                            fetch_response = await self.fetch_follow_account_status(
-                                account_detail["gh_id"]
-                            )
-                            if not fetch_response:
-                                await self.update_follow_status(
-                                    account_detail["gh_id"],
-                                    self.PROCESSING_STATUS,
-                                    self.INIT_STATUS,
-                                )
-
-                            task_status = fetch_response[0]["task_status"]
-                            match task_status:
-                                case self.FETCH_INIT_STATUS:
-                                    continue
-                                case self.FETCH_PROCESSING_STATUS:
-                                    continue
-                                case self.FETCH_SUCCESS_STATUS:
-                                    await self.update_follow_status(
-                                        account_detail["gh_id"],
-                                        self.PROCESSING_STATUS,
-                                        self.SUCCESS_STATUS,
-                                    )
-                                case self.FETCH_FAIL_STATUS:
-                                    await self.update_follow_status(
-                                        account_detail["gh_id"],
-                                        self.PROCESSING_STATUS,
-                                        self.FAIL_STATUS,
-                                    )
-
-                        case self.SUCCESS_STATUS:
-                            # 账号已经关注,创建获取自动回复任务
-                            await self.create_auto_reply_single_account_task(
-                                account_detail["gh_id"], account.公众号名
-                            )
+                    # 新逻辑,无需考虑账号是否关注
+                    await self.create_auto_reply_single_account_task(
+                        account_detail["gh_id"], account.公众号名
+                    )
 
-                        case _:
-                            print(f"{account.公众号名}账号状态异常")
+                    # # 旧逻辑,考虑账号是否关注
+                    # follow_status = account_detail["follow_status"]
+                    # match follow_status:
+                    #     case self.INIT_STATUS:
+                    #         await self.create_follow_single_account_task(
+                    #             account_detail["gh_id"]
+                    #         )
+                    #
+                    #     case self.PROCESSING_STATUS:
+                    #         fetch_response = await self.fetch_follow_account_status(
+                    #             account_detail["gh_id"]
+                    #         )
+                    #         if not fetch_response:
+                    #             await self.update_follow_status(
+                    #                 account_detail["gh_id"],
+                    #                 self.PROCESSING_STATUS,
+                    #                 self.INIT_STATUS,
+                    #             )
+                    #
+                    #         task_status = fetch_response[0]["task_status"]
+                    #         match task_status:
+                    #             case self.FETCH_INIT_STATUS:
+                    #                 continue
+                    #             case self.FETCH_PROCESSING_STATUS:
+                    #                 continue
+                    #             case self.FETCH_SUCCESS_STATUS:
+                    #                 await self.update_follow_status(
+                    #                     account_detail["gh_id"],
+                    #                     self.PROCESSING_STATUS,
+                    #                     self.SUCCESS_STATUS,
+                    #                 )
+                    #             case self.FETCH_FAIL_STATUS:
+                    #                 await self.update_follow_status(
+                    #                     account_detail["gh_id"],
+                    #                     self.PROCESSING_STATUS,
+                    #                     self.FAIL_STATUS,
+                    #                 )
+                    #
+                    #     case self.SUCCESS_STATUS:
+                    #         # 账号已经关注,创建获取自动回复任务
+                    #         await self.create_auto_reply_single_account_task(
+                    #             account_detail["gh_id"], account.公众号名
+                    #         )
+                    #
+                    #     case _:
+                    #         print(f"{account.公众号名}账号状态异常")
 
             except Exception as e:
                 print(f"处理账号{account.公众号名}异常", e)
@@ -431,9 +659,10 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorMapper):
     async def get_auto_reply_response(self):
         task_list = await self.fetch_auto_replying_tasks()
         if not task_list:
+            print("No processing task yet")
             return
 
-        for task in task_list:
+        for task in tqdm(task_list):
             try:
                 task_id = task["task_id"]
                 response = await self.get_auto_reply_task_result(task_id)
@@ -443,6 +672,7 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorMapper):
                 task_status = response[0]["task_status"]
                 task_result = response[0]["task_result"]
                 update_timestamp = response[0]["update_timestamp"]
+
                 match task_status:
                     case self.FETCH_FAIL_STATUS:
                         await self.update_auto_reply_task_status(
@@ -460,9 +690,47 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorMapper):
             except Exception as e:
                 print(e)
 
-    # 解析 xml 并且更新数据
-    async def extract_task(self):
-        pass
+    # 解析单个xml
+    async def extract_single_xml(self, task):
+        task_id = task["task_id"]
+        result = task["result"]
+
+        # acquire lock
+        acquire_lock = await self.update_auto_reply_task_status(
+            task_id, "extract", self.INIT_STATUS, self.PROCESSING_STATUS
+        )
+        if not acquire_lock:
+            return
+
+        try:
+            # parse xml
+            xml_list = json.loads(result) if type(result) == str else result
+            for index, item in enumerate(xml_list, 1):
+                xml_obj = self.extract_callback_xml(item)
+                if xml_obj:
+                    msg_type = xml_obj.get("msg_type", None)
+                    match msg_type:
+                        case "33":
+                            await self.store_card(task_id, index, msg_type, xml_obj)
+
+                        case "5":
+                            await self.store_article(task_id, index, msg_type, xml_obj)
+
+                        case _:
+                            continue
+
+                await asyncio.sleep(5)
+
+            await self.update_auto_reply_task_status(
+                task_id, "extract", self.PROCESSING_STATUS, self.SUCCESS_STATUS
+            )
+
+        except Exception as e:
+            print(e)
+            print(traceback.format_exc())
+            await self.update_auto_reply_task_status(
+                task_id, "extract", self.PROCESSING_STATUS, self.FAIL_STATUS
+            )
 
     # main function
     async def deal(self, task_name):
@@ -474,7 +742,17 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorMapper):
                 await self.get_auto_reply_response()
 
             case "extract_task":
-                await self.extract_task()
+                task_list = await self.get_extract_tasks()
+                if not task_list:
+                    print("No tasks to extract now")
+                    return
+
+                for task in tqdm(task_list, desc="解析任务"):
+                    await self.extract_single_xml(task)
+                    await asyncio.sleep(10)
+
+            case "re_extract_task":
+                pass
 
             case _:
                 print("task_error")

+ 13 - 3
applications/tasks/task_handler.py

@@ -277,11 +277,21 @@ class TaskHandler(TaskMapper):
 
     # 自动关注公众号账号
     async def _auto_follow_account_handler(self) -> int:
-        task = AutoReplyCardsMonitor(
-            pool=self.db_client, log_client=self.log_client
-        )
+        task = AutoReplyCardsMonitor(pool=self.db_client, log_client=self.log_client)
         await task.deal(task_name="follow_gzh_task")
         return self.TASK_SUCCESS_STATUS
 
+    # 获取自动关注回复
+    async def _get_follow_result_handler(self) -> int:
+        task = AutoReplyCardsMonitor(pool=self.db_client, log_client=self.log_client)
+        await task.deal(task_name="get_auto_reply_task")
+        return self.TASK_SUCCESS_STATUS
+
+    # 解析自动回复结果
+    async def _extract_reply_result_handler(self) -> int:
+        task = AutoReplyCardsMonitor(pool=self.db_client, log_client=self.log_client)
+        await task.deal(task_name="extract_task")
+        return self.TASK_SUCCESS_STATUS
+
 
 __all__ = ["TaskHandler"]

+ 4 - 0
applications/tasks/task_scheduler.py

@@ -211,6 +211,10 @@ class TaskScheduler(TaskHandler):
             "update_account_open_rate_avg": self._update_account_open_rate_avg_handler,
             # 自动关注公众号账号
             "auto_follow_account": self._auto_follow_account_handler,
+            # 获取自动关注回复
+            "get_follow_result": self._get_follow_result_handler,
+            # 解析自动回复结果
+            "extract_reply_result": self._extract_reply_result_handler,
         }
 
         if task_name not in handlers:

+ 17 - 1
applications/utils/common.py

@@ -1,7 +1,7 @@
 """
 @author: luojunhui
 """
-
+import oss2
 import random
 import string
 import hashlib
@@ -264,3 +264,19 @@ def fetch_from_odps(query):
             return [item for item in reader]
         else:
             return []
+
+
+def upload_to_oss(local_video_path, oss_key):
+    """
+    把视频上传到 oss
+    :return:
+    """
+    access_key_id = "LTAIP6x1l3DXfSxm"
+    access_key_secret = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon"
+    endpoint = "oss-cn-hangzhou.aliyuncs.com"
+    bucket_name = "art-pubbucket"
+    bucket = oss2.Bucket(
+        oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name
+    )
+    bucket.put_object_from_file(key=oss_key, filename=local_video_path)
+    return oss_key

+ 1 - 0
requirements.txt

@@ -23,4 +23,5 @@ pydantic~=2.10.6
 scipy~=1.15.2
 quart-cors~=0.8.0
 statsmodels
+oss2~=2.19.1